Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 110 additions & 14 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,18 +456,6 @@ var (
}
)

func setApplyIntervalModifiers(c *cli.Command) bool {
fullRefresh := c.Bool("full-refresh")
applyIntervalModifiers := c.Bool("apply-interval-modifiers")

if fullRefresh && applyIntervalModifiers {
warningPrinter.Println("Warning: --apply-interval-modifiers flag is ignored when --full-refresh is enabled.")
return false
}

return applyIntervalModifiers
}

func Run(isDebug *bool) *cli.Command {
return &cli.Command{
Name: "run",
Expand Down Expand Up @@ -604,9 +592,15 @@ func Run(isDebug *bool) *cli.Command {
Action: func(ctx context.Context, c *cli.Command) error {
defer RecoverFromPanic()

logger := makeLogger(*isDebug)
// Validate mutual exclusivity of full-refresh and apply-interval-modifiers
fullRefresh := c.Bool("full-refresh")
applyIntervalModifiers := setApplyIntervalModifiers(c)
applyIntervalModifiers := c.Bool("apply-interval-modifiers")

if fullRefresh && applyIntervalModifiers {
return cli.Exit("flags --full-refresh and --apply-interval-modifiers are mutually exclusive and cannot be used together", 1)
}

logger := makeLogger(*isDebug)

runConfig := &scheduler.RunConfig{
Downstream: c.Bool("downstream"),
Expand Down Expand Up @@ -763,6 +757,12 @@ func Run(isDebug *bool) *cli.Command {
return err
}

// Validate interval modifiers with finalized dates
if err := validateIntervalModifiersWithFinalizedDates(runCtx, pipelineInfo.Pipeline, logger); err != nil {
errorPrinter.Printf("Interval modifier validation failed: %v\n", err)
return cli.Exit("", 1)
}

// Update renderer with the finalized start/end dates
renderer = jinja.NewRendererWithStartEndDates(&startDate, &endDate, pipelineInfo.Pipeline.Name, runID, nil)
DefaultPipelineBuilder.AddAssetMutator(renderAssetParamsMutator(renderer))
Expand All @@ -771,6 +771,8 @@ func Run(isDebug *bool) *cli.Command {
runCtx = context.WithValue(runCtx, pipeline.RunConfigStartDate, startDate)
runCtx = context.WithValue(runCtx, pipeline.RunConfigEndDate, endDate)



// handle log files
executionStartLog := "Starting execution..."
if !c.Bool("minimal-logs") {
Expand Down Expand Up @@ -1070,6 +1072,100 @@ func DetermineStartDate(cliStartDate string, pipeline *pipeline.Pipeline, fullRe
return startDate, nil
}

// validateIntervalModifiersWithFinalizedDates validates interval modifiers using the finalized start/end dates
func validateIntervalModifiersWithFinalizedDates(ctx context.Context, p *pipeline.Pipeline, logger logger.Logger) error {
// Get the finalized dates from context
startDate, ok := ctx.Value(pipeline.RunConfigStartDate).(time.Time)
if !ok {
return fmt.Errorf("start date not found in context")
}

endDate, ok := ctx.Value(pipeline.RunConfigEndDate).(time.Time)
if !ok {
return fmt.Errorf("end date not found in context")
}

applyModifiers, ok := ctx.Value(pipeline.RunConfigApplyIntervalModifiers).(bool)
if !ok {
applyModifiers = false
}

logger.Debugf("Validating interval modifiers with finalized dates - startDate=%s, endDate=%s, applyModifiers=%t",
startDate.Format(time.RFC3339), endDate.Format(time.RFC3339), applyModifiers)

// Check if start date is after end date (basic validation)
if startDate.After(endDate) {
return fmt.Errorf("start date (%s) must be before end date (%s)",
startDate.Format("2006-01-02 15:04:05"),
endDate.Format("2006-01-02 15:04:05"))
}

// Validate each asset's interval modifiers
for _, asset := range p.Assets {
if err := validateAssetIntervalModifiers(ctx, asset, startDate, endDate, applyModifiers, logger); err != nil {
return fmt.Errorf("asset %s: %w", asset.Name, err)
}
}

return nil
}

// validateAssetIntervalModifiers validates interval modifiers for a single asset
func validateAssetIntervalModifiers(ctx context.Context, asset *pipeline.Asset, startDate, endDate time.Time, applyModifiers bool, logger logger.Logger) error {
// Skip validation if no interval modifiers are defined
if asset.IntervalModifiers.Start.Template == "" && asset.IntervalModifiers.End.Template == "" &&
asset.IntervalModifiers.Start.Hours == 0 && asset.IntervalModifiers.Start.Minutes == 0 &&
asset.IntervalModifiers.Start.Seconds == 0 && asset.IntervalModifiers.Start.Days == 0 &&
asset.IntervalModifiers.Start.Months == 0 &&
asset.IntervalModifiers.End.Hours == 0 && asset.IntervalModifiers.End.Minutes == 0 &&
asset.IntervalModifiers.End.Seconds == 0 && asset.IntervalModifiers.End.Days == 0 &&
asset.IntervalModifiers.End.Months == 0 {
return nil
}

logger.Debugf("Validating interval modifiers for asset %s", asset.Name)

// If templates are used and applyModifiers is true, we need to render them
if (asset.IntervalModifiers.Start.Template != "" || asset.IntervalModifiers.End.Template != "") && applyModifiers {
// Create a temporary renderer to resolve templates
renderer := jinja.NewRendererWithStartEndDates(&startDate, &endDate, "validation", "validation-run", nil)

// Resolve start modifier template
if asset.IntervalModifiers.Start.Template != "" {
resolvedStartModifier, err := asset.IntervalModifiers.Start.ResolveTemplateToNew(renderer)
if err != nil {
return fmt.Errorf("failed to resolve start interval modifier template: %w", err)
}
asset.IntervalModifiers.Start = resolvedStartModifier
}

// Resolve end modifier template
if asset.IntervalModifiers.End.Template != "" {
resolvedEndModifier, err := asset.IntervalModifiers.End.ResolveTemplateToNew(renderer)
if err != nil {
return fmt.Errorf("failed to resolve end interval modifier template: %w", err)
}
asset.IntervalModifiers.End = resolvedEndModifier
}
}

// Apply modifiers to get final dates
finalStartDate := pipeline.ModifyDate(startDate, asset.IntervalModifiers.Start)
finalEndDate := pipeline.ModifyDate(endDate, asset.IntervalModifiers.End)

// Check if end date is earlier than start date
if finalEndDate.Before(finalStartDate) {
return fmt.Errorf("interval modifiers result in end date (%s) being earlier than start date (%s)",
finalEndDate.Format("2006-01-02 15:04:05"),
finalStartDate.Format("2006-01-02 15:04:05"))
}

logger.Debugf("Asset %s interval modifiers validated successfully - final start: %s, final end: %s",
asset.Name, finalStartDate.Format(time.RFC3339), finalEndDate.Format(time.RFC3339))

return nil
}

func ValidateRunConfig(runConfig *scheduler.RunConfig, inputPath string, logger logger.Logger) (time.Time, time.Time, string, error) {
if inputPath == "" {
inputPath = "."
Expand Down
20 changes: 19 additions & 1 deletion integration-tests/.bruin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ environments:
- name: "duckdb-interval-modifiers"
path: "duckdb-files/interval-modifiers.db"

env-static-invalid-interval:
connections:
duckdb:
- name: "duckdb-static-invalid-interval"
path: "duckdb-files/static-invalid-interval.db"

env-render-template-this:
connections:
duckdb:
Expand Down Expand Up @@ -241,4 +247,16 @@ environments:
connections:
duckdb:
- name: "duckdb-scd2-by-time"
path: "duckdb-files/scd2-by-time.db"
path: "duckdb-files/scd2-by-time.db"

env-static-modifiers:
connections:
duckdb:
- name: "duckdb-static-modifiers"
path: "duckdb-files/static-modifiers.db"

env-valid-time-window:
connections:
duckdb:
- name: "duckdb-valid-time-window"
path: "duckdb-files/valid-time-window.db"
33 changes: 33 additions & 0 deletions integration-tests/expected_connections.json
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,39 @@
]
},
"schema_prefix": ""
},
"env-static-modifiers": {
"connections": {
"duckdb": [
{
"name": "duckdb-static-modifiers",
"path": "duckdb-files/static-modifiers.db"
}
]
},
"schema_prefix": ""
},
"env-valid-time-window": {
"connections": {
"duckdb": [
{
"name": "duckdb-valid-time-window",
"path": "duckdb-files/valid-time-window.db"
}
]
},
"schema_prefix": ""
},
"env-static-invalid-interval": {
"connections": {
"duckdb": [
{
"name": "duckdb-static-invalid-interval",
"path": "duckdb-files/static-invalid-interval.db"
}
]
},
"schema_prefix": ""
}
}
}
54 changes: 54 additions & 0 deletions integration-tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,60 @@ func TestIndividualTasks(t *testing.T) {
},
},
},
{
name: "interval-modifiers-validation",
task: e2e.Task{
Name: "interval-modifiers-validation",
Command: binary,
Args: []string{"validate", filepath.Join(currentFolder, "test-pipelines/validate-interval-modifiers")},
Env: []string{},
Expected: e2e.Output{
ExitCode: 1,
Contains: []string{"4 issues", "interval-modifiers-valid-dates", "end date (2025-09-22 22:59:59) being earlier than start date (2025-09-23 02:00:00)"},
},
WorkingDir: currentFolder,
Asserts: []func(*e2e.Task) error{
e2e.AssertByExitCode,
e2e.AssertByContains,
},
},
},
{
name: "valid-time-window",
task: e2e.Task{
Name: "valid-time-window",
Command: binary,
Args: []string{"run", "--apply-interval-modifiers", "--start-date", "2025-09-20", "--end-date", "2025-09-20", "--env", "env-valid-time-window", filepath.Join(currentFolder, "test-pipelines/validate-interval-modifiers/assets/month_end_asset.sql")},
Env: []string{},
Expected: e2e.Output{
ExitCode: 0,
Contains: []string{"2025-09-20T02:00:00Z - 2025-09-19T00:00:00Z"},
},
WorkingDir: currentFolder,
Asserts: []func(*e2e.Task) error{
e2e.AssertByExitCode,
e2e.AssertByContains,
},
},
},
{
name: "full-refresh-interval-modifiers-mutual-exclusivity",
task: e2e.Task{
Name: "full-refresh-interval-modifiers-mutual-exclusivity",
Command: binary,
Args: []string{"run", "--full-refresh", "--apply-interval-modifiers", "--start-date", "2025-09-20", "--end-date", "2025-09-25", filepath.Join(currentFolder, "test-pipelines/validate-interval-modifiers/assets/month_end_asset.sql")},
Env: []string{},
Expected: e2e.Output{
ExitCode: 1,
Contains: []string{"flags --full-refresh and --apply-interval-modifiers are mutually exclusive and cannot be used together"},
},
WorkingDir: currentFolder,
Asserts: []func(*e2e.Task) error{
e2e.AssertByExitCode,
e2e.AssertByContains,
},
},
},
{
name: "parse-whole-pipeline",
task: e2e.Task{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* @bruin
name: month_end_asset
type: duckdb.sql

materialization:
type: table
strategy: create+replace

interval_modifiers:
start: "2h"
end: "-24h"

columns:
- name: id
type: INTEGER
description: "Unique identifier"
primary_key: true
- name: name
type: VARCHAR
description: "Name of the item"
- name: created_at
type: TIMESTAMP
description: "When the item was created"
@bruin */
SELECT
1 AS id,
'Test Item' AS name,
TIMESTAMP '2025-01-15 12:00:00' AS created_at
WHERE created_at BETWEEN '{{start_timestamp}}' AND '{{end_timestamp}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* @bruin
name: static.invalid
type: duckdb.sql
materialization:
type: table
strategy: create+replace
interval_modifiers:
start: "25h" # Shift start forward 25 hours
end: "-2h" # Shift end backward 2 hours
columns:
- name: id
type: INTEGER
description: "Unique identifier"
primary_key: true
- name: name
type: VARCHAR
description: "Name of the item"
- name: created_at
type: TIMESTAMP
description: "When the item was created"
@bruin */

SELECT 1 AS id, 'Test Item' AS name, TIMESTAMP '2025-01-15 12:00:00' AS created_at;
-- SELECT
-- 1 AS id,
-- 'Test Item' AS name,
-- TIMESTAMP '2025-01-15 12:00:00' AS created_at
-- WHERE created_at BETWEEN '{{start_timestamp}}' AND '{{end_timestamp}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name: validate-interval-modifiers
start_date: "2024-01-01"
default_connections:
duckdb: "duckdb-valid-time-window"
22 changes: 22 additions & 0 deletions pkg/lint/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,28 @@ func GetRules(fs afero.Fs, finder repoFinder, excludeWarnings bool, parser *sqlp
CrossPipelineValidator: ValidateCrossPipelineURIDependencies,
ApplicableLevels: []Level{LevelCrossPipeline},
},
// Interval modifier validation is now done in run.go after dates are finalized
// &SimpleRule{
// Identifier: "interval-modifiers-valid-dates",
// Fast: true, // No template rendering - just date logic validation
// Severity: ValidatorSeverityCritical,
// AssetValidator: ValidateIntervalModifiersDates,
// ApplicableLevels: []Level{LevelAsset},
// },
// &SimpleRule{
// Identifier: "interval-modifiers-valid-templates",
// Fast: true, // Template syntax validation without rendering
// Severity: ValidatorSeverityCritical,
// AssetValidator: ValidateIntervalModifierTemplates,
// ApplicableLevels: []Level{LevelAsset},
// },
&SimpleRule{
Identifier: "valid-time-window",
Fast: true,
Severity: ValidatorSeverityCritical,
Validator: EnsureValidTimeWindow,
ApplicableLevels: []Level{LevelPipeline},
},
}

if parser != nil {
Expand Down
Loading
Loading