diff --git a/dbt_core_integration.py b/dbt_core_integration.py index 91e45c504..30cc0ec72 100644 --- a/dbt_core_integration.py +++ b/dbt_core_integration.py @@ -278,6 +278,7 @@ def __init__( favor_state: Optional[bool] = False, # dict in 1.5.x onwards, json string before. vars: Optional[Union[Dict[str, Any], str]] = {} if DBT_MAJOR_VER >= 1 and DBT_MINOR_VER >= 5 else "{}", + full_refresh: Optional[bool] = False, ): self.threads = threads self.target = target if target else os.environ.get("DBT_TARGET") @@ -291,6 +292,7 @@ def __init__( self.defer = defer self.state = state self.favor_state = favor_state + self.full_refresh = full_refresh # dict in 1.5.x onwards, json string before. if DBT_MAJOR_VER >= 1 and DBT_MINOR_VER >= 5: self.vars = vars if vars else json.loads(os.environ.get("DBT_VARS", "{}")) @@ -358,6 +360,7 @@ def __init__( manifest_path: Optional[str] = None, favor_state: bool = False, vars: Optional[Dict[str, Any]] = {}, + full_refresh: bool = False, ): self.args = ConfigInterface( threads=threads, @@ -370,6 +373,7 @@ def __init__( state=manifest_path, favor_state=favor_state, vars=vars, + full_refresh=full_refresh, ) # Utilities @@ -674,21 +678,28 @@ def execute_macro( """Wraps adapter execute_macro. Execute a macro like a function.""" return self.get_macro_function(macro, compiled_code)(kwargs=kwargs) - def execute_sql(self, raw_sql: str, original_node: Optional[Union["ManifestNode", str]] = None) -> DbtAdapterExecutionResult: + def execute_sql(self, raw_sql: str, original_node: Optional[Union["ManifestNode", str]] = None, full_refresh: bool = False) -> DbtAdapterExecutionResult: """Execute dbt SQL statement against database""" - with self.adapter.connection_named("master"): - # if no jinja chars then these are synonymous - compiled_sql = raw_sql - if has_jinja(raw_sql): - # jinja found, compile it - compilation_result = self._compile_sql(raw_sql, original_node) - compiled_sql = compilation_result.compiled_sql - - return DbtAdapterExecutionResult( - *self.adapter_execute(compiled_sql, fetch=True), - raw_sql, - compiled_sql, - ) + prev = getattr(self.args, "full_refresh", False) + self.args.full_refresh = full_refresh + set_from_args(self.args, None) + try: + with self.adapter.connection_named("master"): + # if no jinja chars then these are synonymous + compiled_sql = raw_sql + if has_jinja(raw_sql): + # jinja found, compile it + compilation_result = self._compile_sql(raw_sql, original_node) + compiled_sql = compilation_result.compiled_sql + + return DbtAdapterExecutionResult( + *self.adapter_execute(compiled_sql, fetch=True), + raw_sql, + compiled_sql, + ) + finally: + self.args.full_refresh = prev + set_from_args(self.args, None) def execute_node(self, node: "ManifestNode") -> DbtAdapterExecutionResult: """Execute dbt SQL statement against database from a"ManifestNode""" @@ -707,12 +718,18 @@ def execute_node(self, node: "ManifestNode") -> DbtAdapterExecutionResult: except Exception as e: raise Exception(str(e)) - def compile_sql(self, raw_sql: str, original_node: Optional["ManifestNode"] = None) -> DbtAdapterCompilationResult: + def compile_sql(self, raw_sql: str, original_node: Optional["ManifestNode"] = None, full_refresh: bool = False) -> DbtAdapterCompilationResult: try: + prev = getattr(self.args, "full_refresh", False) + self.args.full_refresh = full_refresh + set_from_args(self.args, None) with self.adapter.connection_named("master"): return self._compile_sql(raw_sql, original_node) except Exception as e: raise Exception(str(e)) + finally: + self.args.full_refresh = prev + set_from_args(self.args, None) def compile_node( self, node: "ManifestNode" diff --git a/package.json b/package.json index 25a6c8269..1676c470b 100644 --- a/package.json +++ b/package.json @@ -216,6 +216,11 @@ "default": 500, "minimum": 1 }, + "dbt.previewFullRefresh": { + "type": "boolean", + "description": "When enabled, compiled SQL previews use full refresh mode so `is_incremental()` evaluates to false.", + "default": false + }, "dbt.perspectiveTheme": { "type": "string", "description": "Theme for perspective viewer in query results panel", diff --git a/src/dbt_client/dbtCloudIntegration.ts b/src/dbt_client/dbtCloudIntegration.ts index 831de4fae..8a4bb81aa 100644 --- a/src/dbt_client/dbtCloudIntegration.ts +++ b/src/dbt_client/dbtCloudIntegration.ts @@ -249,6 +249,7 @@ export class DBTCloudProjectIntegration query: string, limit: number, modelName: string, + fullRefresh = false, ): Promise { this.throwIfNotAuthenticated(); this.throwBridgeErrorIfAvailable(); @@ -267,6 +268,9 @@ export class DBTCloudProjectIntegration "json", ]), ); + if (fullRefresh) { + showCommand.addArgument("--full-refresh"); + } const cancellationTokenSource = new CancellationTokenSource(); showCommand.setToken(cancellationTokenSource.token); return new QueryExecution( @@ -657,7 +661,11 @@ export class DBTCloudProjectIntegration return compiledLine[0].data.compiled; } - async unsafeCompileQuery(query: string): Promise { + async unsafeCompileQuery( + query: string, + _originalModelName: string | undefined = undefined, + fullRefresh = false, + ): Promise { this.throwIfNotAuthenticated(); this.throwBridgeErrorIfAvailable(); const compileQueryCommand = this.dbtCloudCommand( @@ -671,6 +679,9 @@ export class DBTCloudProjectIntegration "json", ]), ); + if (fullRefresh) { + compileQueryCommand.addArgument("--full-refresh"); + } const { stdout, stderr } = await compileQueryCommand.execute(); const compiledLine = stdout .trim() diff --git a/src/dbt_client/dbtCoreCommandIntegration.ts b/src/dbt_client/dbtCoreCommandIntegration.ts index 8468eebdf..6c7a09ee2 100644 --- a/src/dbt_client/dbtCoreCommandIntegration.ts +++ b/src/dbt_client/dbtCoreCommandIntegration.ts @@ -33,6 +33,7 @@ export class DBTCoreCommandProjectIntegration extends DBTCoreProjectIntegration query: string, limit: number, modelName: string, + fullRefresh = false, ): Promise { this.throwBridgeErrorIfAvailable(); const showCommand = this.dbtCoreCommand( @@ -50,6 +51,9 @@ export class DBTCoreCommandProjectIntegration extends DBTCoreProjectIntegration "json", ]), ); + if (fullRefresh) { + showCommand.addArgument("--full-refresh"); + } const cancellationTokenSource = new CancellationTokenSource(); showCommand.setToken(cancellationTokenSource.token); return new QueryExecution( @@ -151,7 +155,11 @@ export class DBTCoreCommandProjectIntegration extends DBTCoreProjectIntegration return compiledLine[0].data.compiled; } - async unsafeCompileQuery(query: string): Promise { + async unsafeCompileQuery( + query: string, + _originalModelName: string | undefined = undefined, + fullRefresh = false, + ): Promise { this.throwBridgeErrorIfAvailable(); const compileQueryCommand = this.dbtCoreCommand( new DBTCommand("Compiling sql...", [ @@ -164,6 +172,9 @@ export class DBTCoreCommandProjectIntegration extends DBTCoreProjectIntegration "json", ]), ); + if (fullRefresh) { + compileQueryCommand.addArgument("--full-refresh"); + } const { stdout, stderr } = await compileQueryCommand.execute(); const compiledLine = stdout .trim() diff --git a/src/dbt_client/dbtCoreIntegration.ts b/src/dbt_client/dbtCoreIntegration.ts index 2bd5f7179..17dc2c19d 100644 --- a/src/dbt_client/dbtCoreIntegration.ts +++ b/src/dbt_client/dbtCoreIntegration.ts @@ -413,6 +413,7 @@ export class DBTCoreProjectIntegration query: string, limit: number, modelName: string, + fullRefresh = false, ): Promise { this.throwBridgeErrorIfAvailable(); const { limitQuery } = await this.getQuery(query, limit); @@ -432,11 +433,13 @@ export class DBTCoreProjectIntegration const compiledQuery = await this.unsafeCompileQuery( limitQuery, modelName, + fullRefresh, ); try { // execute query result = await queryThread!.lock( - (python) => python`to_dict(project.execute_sql(${compiledQuery}))`, + (python) => + python`to_dict(project.execute_sql(${compiledQuery}, ${fullRefresh}))`, ); const { manifestPathType } = this.deferToProdService.getDeferConfigByProjectRoot( @@ -881,11 +884,12 @@ export class DBTCoreProjectIntegration async unsafeCompileQuery( query: string, originalModelName: string | undefined = undefined, + fullRefresh = false, ): Promise { this.throwBridgeErrorIfAvailable(); const output = await this.python?.lock( (python) => - python!`to_dict(project.compile_sql(${query}, ${originalModelName}))`, + python!`to_dict(project.compile_sql(${query}, ${originalModelName}, ${fullRefresh}))`, ); return output.compiled_sql; } diff --git a/src/dbt_client/dbtIntegration.ts b/src/dbt_client/dbtIntegration.ts index 5c98a3086..bb8049144 100644 --- a/src/dbt_client/dbtIntegration.ts +++ b/src/dbt_client/dbtIntegration.ts @@ -345,6 +345,7 @@ export interface DBTProjectIntegration extends Disposable { query: string, limit: number, modelName: string, + fullRefresh?: boolean, ): Promise; // dbt commands runModel(command: DBTCommand): Promise; @@ -362,6 +363,7 @@ export interface DBTProjectIntegration extends Disposable { unsafeCompileQuery( query: string, originalModelName: string | undefined, + fullRefresh?: boolean, ): Promise; validateSql( query: string, diff --git a/src/manifest/dbtProject.ts b/src/manifest/dbtProject.ts index f4035e328..7bb016a6c 100644 --- a/src/manifest/dbtProject.ts +++ b/src/manifest/dbtProject.ts @@ -848,10 +848,14 @@ export class DBTProject implements Disposable { originalModelName: string | undefined = undefined, ): Promise { this.telemetry.sendTelemetryEvent("compileQuery"); + const fullRefresh = workspace + .getConfiguration("dbt") + .get("previewFullRefresh", false); try { return await this.dbtProjectIntegration.unsafeCompileQuery( query, originalModelName, + fullRefresh, ); } catch (exc: any) { if (exc instanceof PythonException) { @@ -899,10 +903,12 @@ export class DBTProject implements Disposable { async unsafeCompileQuery( query: string, originalModelName: string | undefined = undefined, + fullRefresh = false, ) { return this.dbtProjectIntegration.unsafeCompileQuery( query, originalModelName, + fullRefresh, ); } @@ -1203,11 +1209,16 @@ export class DBTProject implements Disposable { limit: limit.toString(), }); + const fullRefresh = workspace + .getConfiguration("dbt") + .get("previewFullRefresh", false); + if (returnImmediately) { const execution = await this.dbtProjectIntegration.executeSQL( query, limit, modelName, + fullRefresh, ); const result = await execution.executeQuery(); if (returnRawResults) { @@ -1234,7 +1245,12 @@ export class DBTProject implements Disposable { command: "executeQuery", payload: { query, - fn: this.dbtProjectIntegration.executeSQL(query, limit, modelName), + fn: this.dbtProjectIntegration.executeSQL( + query, + limit, + modelName, + fullRefresh, + ), projectName: this.getProjectName(), }, });