Skip to content

Conversation

zilto
Copy link
Collaborator

@zilto zilto commented Sep 16, 2025

Users of dlt.Dataset want a simple way to write data back to the dataset.

Use cases:

  • manually review data and push corrected records
  • simple way to add records if you don't have access to the original dlt.Pipeline used to create the dataset

Other motivations

This interface will simplify data-centric operations involved in:

  • storing data quality checks results on destination
  • creating a graph of datasets where the "internal pipeline" of dataset is used
  • integrate with orchestration frameworks

Specs

  • Look at WritableDataset.save() from dlt-plus
  • Add Dataset.write() in dlt (this aligns with pipeline.run() operation)
    • Alternatives: .write_to(), .load_into(), .load_table()
  • create an internal dlt.Pipeline named _dlt_dataset_{dataset_name}
  • find a way for the internal pipeline to use the dlt.Schema from the dlt.Dataset instance; this way, this schema should evolve when Dataset.load() is used
  • potential API
    def write(
      self: dlt.Dataset,
      data: TDataItems,
      *,
      table_name: str,
      write_disposition: TWriteDisposition = "append",
      normalize: bool = False,
    ) -> LoadInfo: ...
    • write_disposition is useful to determine if we should append or modify existing records
    • normalize allows the user to decide to enable normalization (which might create more tables)
  • can accept a dlt.Relation as input

Out of scope

  • Dataset.load() doesn't have to support 1-to-1 the dlt.Pipeline.run() method; if user needs full range of config, then they should create a pipeline

Copy link

netlify bot commented Sep 16, 2025

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 2c08e88
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68cdfa69f43aca0008136715
😎 Deploy Preview https://deploy-preview-3092--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@zilto zilto requested review from rudolfix and sh-rp September 16, 2025 21:02
@zilto zilto self-assigned this Sep 16, 2025
@zilto zilto added the enhancement New feature or request label Sep 16, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take on the interface.

  1. write is OK!
  2. make it a clearly specialized method to write a data chunk: current simple interface is good.
  3. allowing Relation as data is a good idea. so there's certain symmetry (querying works with relations). in that case you can make table_name optional (relation has table name AFAIK?)

Implementation details:
my take would be to make internal pipeline used in write as invisible as possible:

  1. disable destination sync, state sync and schema evolution (a total freeze on a table via contract)
  2. possibly use pipelines-dir to hide it from command line and dashboard.

in essence we pretend that this pipeline does not exist

WDYT?

a helper class to get write pipeline is cool. I'd make it a public helper method. another helper method would be to convert Dataset into Source but that's another problem

  1. I'd keep the internal pipeline that just loads data super simple

Copy link

cloudflare-workers-and-pages bot commented Sep 20, 2025

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
❌ Deployment failed
View logs
docs 2c08e88 Sep 20 2025, 12:55 AM

@zilto
Copy link
Collaborator Author

zilto commented Sep 20, 2025

my take would be to make internal pipeline used in write as invisible as possible
possibly use pipelines-dir to hide it from command line and dashboard.
in essence we pretend that this pipeline does not exist

I changed the internal pipeline to be a context manager that uses a temporary directory as pipelines_dir

disable destination sync, state sync and schema evolution (a total freeze on a table via contract)

I don't know exactly what I need to change / configure for destination and state sync (it doesn't seem to be in the kwargs for dlt.pipeline() and pipeline.run()).

For schema evolution, users should be able to modify schema. For example, someone wants to add a column or cast types. Though, I would have frozen schema as default and require users to explicitly change it.

table_name = "bar"
items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}]

# TODO this is currently odd because the tables exists on the `Schema`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schemas create the default tables on initialization. We could consider changing this, but that is the reason why they will always exist on any Schema instance regardless of wether anything was materialized.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current behavior is ok and I wouldn't change it. Just wanted to leave a note in the test because the assertion could be surprising.

Passing a `pipelines_dir` allows you to set a
"""
with tempfile.TemporaryDirectory() as tmp_dir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not run in this in a temporary directory but give the pipeline a predictable name and store it with the other pipeline metadata, this way the user can debug the run like any other pipeline run. This is up for debate though.

data: TDataItems,
*,
table_name: str,
write_disposition: TWriteDisposition = "append",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the columns and loader_file_format args from the run method would also be good canditates here. You can also consider a pipeline_kwargs argument that get's forwarded to the internal pipeline instantiation. But maybe we do not need this and can add it if requested.

del state["data_item_normalizer"]
return state

def __eq__(self, other: Any) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

@@ -0,0 +1,93 @@
import pathlib
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for writing into tables that already exist, and reading back from those tables with our database reader methods to see whether the schema was updated properly and works.<

We should also make sure that all the args provided to the write methods are forwarded properly.

Passing a `pipelines_dir` allows you to set a
"""
with tempfile.TemporaryDirectory() as tmp_dir:
pipeline = _get_internal_pipeline(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to forward the staging destination here too, which probably includes knowing the staging destination on the dataset already. Alternatively one would have to provide the staging_destination via the run_kwargs. For working within notebooks where you often get the dataset from a pipeline instace it seems to me it would be good to always have it set on a dataset when you get it from the pipeline.

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a few thoughts to consider :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants