-
Notifications
You must be signed in to change notification settings - Fork 347
feat: Dataset.write()
#3092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
feat: Dataset.write()
#3092
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My take on the interface.
write
is OK!- make it a clearly specialized method to write a data chunk: current simple interface is good.
- allowing
Relation
as data is a good idea. so there's certain symmetry (querying works with relations). in that case you can maketable_name
optional (relation has table name AFAIK?)
Implementation details:
my take would be to make internal pipeline used in write
as invisible as possible:
- disable destination sync, state sync and schema evolution (a total freeze on a table via contract)
- possibly use
pipelines-dir
to hide it from command line and dashboard.
in essence we pretend that this pipeline does not exist
WDYT?
a helper class to get write pipeline is cool. I'd make it a public helper method. another helper method would be to convert Dataset into Source but that's another problem
- I'd keep the internal pipeline that just loads data super simple
Deploying with
|
Status | Name | Latest Commit | Updated (UTC) |
---|---|---|---|
❌ Deployment failed View logs |
docs | 2c08e88 | Sep 20 2025, 12:55 AM |
I changed the internal pipeline to be a context manager that uses a temporary directory as
I don't know exactly what I need to change / configure for destination and state sync (it doesn't seem to be in the kwargs for For schema evolution, users should be able to modify schema. For example, someone wants to add a column or cast types. Though, I would have frozen schema as default and require users to explicitly change it. |
table_name = "bar" | ||
items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}] | ||
|
||
# TODO this is currently odd because the tables exists on the `Schema` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schemas create the default tables on initialization. We could consider changing this, but that is the reason why they will always exist on any Schema instance regardless of wether anything was materialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the current behavior is ok and I wouldn't change it. Just wanted to leave a note in the test because the assertion could be surprising.
Passing a `pipelines_dir` allows you to set a | ||
""" | ||
with tempfile.TemporaryDirectory() as tmp_dir: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not run in this in a temporary directory but give the pipeline a predictable name and store it with the other pipeline metadata, this way the user can debug the run like any other pipeline run. This is up for debate though.
data: TDataItems, | ||
*, | ||
table_name: str, | ||
write_disposition: TWriteDisposition = "append", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the columns
and loader_file_format
args from the run method would also be good canditates here. You can also consider a pipeline_kwargs
argument that get's forwarded to the internal pipeline instantiation. But maybe we do not need this and can add it if requested.
del state["data_item_normalizer"] | ||
return state | ||
|
||
def __eq__(self, other: Any) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
@@ -0,0 +1,93 @@ | |||
import pathlib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need tests for writing into tables that already exist, and reading back from those tables with our database reader methods to see whether the schema was updated properly and works.<
We should also make sure that all the args provided to the write methods are forwarded properly.
Passing a `pipelines_dir` allows you to set a | ||
""" | ||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
pipeline = _get_internal_pipeline( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to forward the staging destination here too, which probably includes knowing the staging destination on the dataset already. Alternatively one would have to provide the staging_destination via the run_kwargs. For working within notebooks where you often get the dataset from a pipeline instace it seems to me it would be good to always have it set on a dataset when you get it from the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a few thoughts to consider :)
Users of
dlt.Dataset
want a simple way to write data back to the dataset.Use cases:
dlt.Pipeline
used to create the datasetOther motivations
This interface will simplify data-centric operations involved in:
Specs
WritableDataset.save()
fromdlt-plus
Dataset.write()
indlt
(this aligns withpipeline.run()
operation).write_to()
,.load_into()
,.load_table()
dlt.Pipeline
named_dlt_dataset_{dataset_name}
dlt.Schema
from thedlt.Dataset
instance; this way, this schema should evolve whenDataset.load()
is usedwrite_disposition
is useful to determine if we should append or modify existing recordsnormalize
allows the user to decide to enable normalization (which might create more tables)dlt.Relation
as inputOut of scope
Dataset.load()
doesn't have to support 1-to-1 thedlt.Pipeline.run()
method; if user needs full range of config, then they should create a pipeline