Skip to content

Commit 66da0e1

Browse files
authored
Feat: dlt.destination as named destination factory (#3204)
* dlt.destination as factory initializer, existing test adjusted * Additional test adjustments * test fix in test_duckdb_client.py * Test readability improvements * Main destination function in decorators.py improved * Docs improvements with separate named destination section * Undo unnecessary adjustments in tests * custom destination test added * fix in destination snippet example * Fix broken links in docs
1 parent 4d629cc commit 66da0e1

File tree

6 files changed

+418
-102
lines changed

6 files changed

+418
-102
lines changed

dlt/destinations/decorators.py

Lines changed: 178 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import functools
2-
31
import inspect
42
from typing import Any, Type, Optional, Callable, Union, overload
53
from typing_extensions import Concatenate
6-
from dlt.common.destination.reference import DestinationReference
4+
from dlt.common.destination.reference import DestinationReference, AnyDestination, Destination
75
from dlt.common.reflection.spec import get_spec_name_from_f
86
from dlt.common.typing import AnyFun
97

@@ -23,28 +21,11 @@
2321
)
2422

2523

26-
@overload
27-
def destination(
28-
func: Callable[
29-
Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any
30-
],
31-
/,
32-
loader_file_format: TLoaderFileFormat = None,
33-
batch_size: int = 10,
34-
name: str = None,
35-
naming_convention: str = "direct",
36-
skip_dlt_columns_and_tables: bool = True,
37-
max_table_nesting: int = 0,
38-
spec: Type[CustomDestinationClientConfiguration] = None,
39-
max_parallel_load_jobs: Optional[int] = None,
40-
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
41-
) -> Callable[TDestinationCallableParams, _destination]: ...
42-
43-
4424
@overload
4525
def destination(
4626
func: None = ...,
4727
/,
28+
*,
4829
loader_file_format: TLoaderFileFormat = None,
4930
batch_size: int = 10,
5031
name: str = None,
@@ -57,12 +38,71 @@ def destination(
5738
) -> Callable[
5839
[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]],
5940
Callable[TDestinationCallableParams, _destination],
60-
]: ...
41+
]:
42+
"""A decorator that transforms a function into a custom destination with configuration parameters.
43+
44+
This overload handles parameterized decorator usage where configuration is provided
45+
in the decorator call, and the function to be decorated is applied later.
46+
47+
#### Example Usage:
48+
49+
>>> @dlt.destination(batch_size=100, loader_file_format="parquet")
50+
>>> def my_destination(items, table, api_url: str = dlt.config.value, api_secret = dlt.secrets.value):
51+
>>> print(table["name"])
52+
>>> print(items)
53+
>>>
54+
>>> p = dlt.pipeline("chess_pipeline", destination=my_destination)
55+
56+
Args:
57+
func (None): Must be None for this overload. The actual function will be provided
58+
when the decorator is applied.
59+
60+
loader_file_format (TLoaderFileFormat, optional): Defines the format in which files are stored
61+
in the load package before being sent to the destination function. Defaults to None.
62+
63+
batch_size (int, optional): Defines how many items per function call are batched together
64+
and sent as an array. If set to 0, instead of passing actual data items, you will receive
65+
one call per load job with the path of the file as the "items" argument. Defaults to 10.
66+
67+
name (str, optional): Defines the name of the destination. If not provided, defaults to
68+
the name of the decorated function.
6169
70+
naming_convention (str, optional): Controls how table and column names are normalized.
71+
The default value, "direct", keeps all names unchanged.
6272
73+
skip_dlt_columns_and_tables (bool, optional): Defines whether internal dlt tables and columns
74+
are included in the custom destination function. Defaults to True.
75+
76+
max_table_nesting (int, optional): Defines how deep the normalizer will go to flatten nested
77+
fields in your data to create subtables. This overrides any source settings and defaults
78+
to 0, meaning no nested tables are created.
79+
80+
spec (Type[CustomDestinationClientConfiguration], optional): Defines a configuration spec used
81+
to inject arguments into the decorated function. Arguments not included in the spec will
82+
not be injected. Defaults to None.
83+
84+
max_parallel_load_jobs (Optional[int], optional): Defines the maximum number of load jobs
85+
that can run concurrently during the load process. Defaults to None.
86+
87+
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy], optional): Determines the
88+
load job parallelism strategy. Can be "sequential" (equivalent to max_parallel_load_jobs=1),
89+
"table-sequential" (one load job per table at a time), or "parallel". Defaults to None.
90+
91+
Returns:
92+
Callable[[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]], Callable[TDestinationCallableParams, _destination]]:
93+
A decorator function that accepts the destination function and returns a callable that can
94+
be used to create a custom dlt destination instance.
95+
"""
96+
...
97+
98+
99+
@overload
63100
def destination(
64-
func: Optional[AnyFun] = None,
101+
func: Callable[
102+
Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any
103+
],
65104
/,
105+
*,
66106
loader_file_format: TLoaderFileFormat = None,
67107
batch_size: int = 10,
68108
name: str = None,
@@ -72,44 +112,114 @@ def destination(
72112
spec: Type[CustomDestinationClientConfiguration] = None,
73113
max_parallel_load_jobs: Optional[int] = None,
74114
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
75-
) -> Any:
76-
"""A decorator that transforms a function that takes two positional arguments "table" and "items" and any number of keyword arguments with defaults
77-
into a callable that will create a custom destination. The function does not return anything, the keyword arguments can be configuration and secrets values.
115+
) -> Callable[TDestinationCallableParams, _destination]:
116+
"""Creates a destination factory from a function by directly passing it as an argument.
78117
79-
#### Example Usage with Configuration and Secrets:
118+
This overload handles direct function calls where the destination function is passed
119+
as the first argument, rather than using decorator syntax.
120+
121+
#### Example Usage:
80122
81-
>>> @dlt.destination(batch_size=100, loader_file_format="parquet")
82123
>>> def my_destination(items, table, api_url: str = dlt.config.value, api_secret = dlt.secrets.value):
83124
>>> print(table["name"])
84125
>>> print(items)
85126
>>>
86-
>>> p = dlt.pipeline("chess_pipeline", destination=my_destination)
87-
88-
Here all incoming data will be sent to the destination function with the items in the requested format and the dlt table schema.
89-
The config and secret values will be resolved from the path destination.my_destination.api_url and destination.my_destination.api_secret.
127+
>>> dest = dlt.destination(my_destination, batch_size=100, loader_file_format="parquet")
128+
>>> p = dlt.pipeline("chess_pipeline", destination=dest)
90129
91130
Args:
92-
func (Optional[AnyFun]): A function that takes two positional arguments "table" and "items" and any number of keyword arguments with defaults which will process the incoming data.
131+
func (Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]):
132+
A callable that takes two positional arguments "items" and "table",
133+
followed by any number of keyword arguments with default values. This function
134+
will process the incoming data and does not need to return anything. The keyword
135+
arguments can represent configuration or secret values.
136+
137+
loader_file_format (TLoaderFileFormat): Defines the format in which files are stored in the load package
138+
before being sent to the destination function.
139+
140+
batch_size (int): Defines how many items per function call are batched together and sent as an array.
141+
If set to 0, instead of passing actual data items, you will receive one call per load job
142+
with the path of the file as the "items" argument. You can then open and process that file as needed.
143+
144+
name (str): Defines the name of the destination created by the destination decorator.
145+
Defaults to the name of the function.
93146
94-
loader_file_format (TLoaderFileFormat): defines in which format files are stored in the load package before being sent to the destination function, this can be puae-jsonl or parquet.
147+
naming_convention (str): Controls how table and column names are normalized.
148+
The default value, "direct", keeps all names unchanged.
95149
96-
batch_size (int): defines how many items per function call are batched together and sent as an array. If you set a batch-size of 0, instead of passing in actual dataitems, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file in any way you like.
150+
skip_dlt_columns_and_tables (bool): Defines whether internal dlt tables and columns
151+
are included in the custom destination function. Defaults to True.
97152
98-
name (str): defines the name of the destination that gets created by the destination decorator, defaults to the name of the function
153+
max_table_nesting (int): Defines how deep the normalizer will go to flatten nested fields
154+
in your data to create subtables. This overrides any source settings and defaults to 0,
155+
meaning no nested tables are created.
99156
100-
naming_convention (str): defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is direct which will keep all names the same.
157+
spec (Type[CustomDestinationClientConfiguration]): Defines a configuration spec used
158+
to inject arguments into the decorated function. Arguments not included in the spec will not be injected.
159+
160+
max_parallel_load_jobs (Optional[int]): Defines the maximum number of load jobs
161+
that can run concurrently during the load process.
162+
163+
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy]): Determines the load job parallelism strategy.
164+
Can be "sequential" (equivalent to max_parallel_load_jobs=1), "table-sequential" (one load job per table at a time),
165+
or "parallel".
166+
167+
Returns:
168+
Callable[TDestinationCallableParams, _destination]: A callable that can be used to create a custom dlt destination instance.
169+
"""
170+
...
171+
172+
173+
@overload
174+
def destination(
175+
destination_name: str,
176+
/,
177+
*,
178+
destination_type: Optional[str] = None,
179+
credentials: Optional[Any] = None,
180+
**kwargs: Any,
181+
) -> AnyDestination:
182+
"""Instantiates a destination from the provided destination name and type by retrieving the corresponding
183+
destination factory and initializing it with the given credentials and keyword arguments.
101184
102-
skip_dlt_columns_and_tables (bool): defines wether internal tables and columns will be fed into the custom destination function. This is set to True by default.
185+
Args:
186+
destination_name (str): The name of the destination instance to initialize.
103187
104-
max_table_nesting (int): defines how deep the normalizer will go to normalize nested fields on your data to create subtables. This overwrites any settings on your source and is set to zero to not create any nested tables by default.
188+
destination_type (Optional[str]): The type of the destination to instantiate.
105189
106-
spec (Type[CustomDestinationClientConfiguration]): defines a configuration spec that will be used to to inject arguments into the decorated functions. Argument not in spec will not be injected
190+
credentials (Optional[Any]): Credentials used to connect to the destination.
191+
May be an instance of a credential class supported by the respective destination.
107192
108-
max_parallel_load_jobs (Optional[int]): how many load jobs at most will be running during the load
193+
**kwargs (Any): Additional keyword arguments passed to the destination factory.
109194
110-
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy]): Can be "sequential" which equals max_parallel_load_jobs=1, "table-sequential" where each table will have at most one loadjob at any given time and "parallel"
111195
Returns:
112-
Any: A callable that can be used to create a dlt custom destination instance
196+
AnyDestination: An initialized destination.
197+
"""
198+
...
199+
200+
201+
def destination(
202+
func_or_name: Union[Optional[AnyFun], str] = None,
203+
/,
204+
*,
205+
loader_file_format: TLoaderFileFormat = None,
206+
batch_size: int = 10,
207+
name: str = None,
208+
naming_convention: str = "direct",
209+
skip_dlt_columns_and_tables: bool = True,
210+
max_table_nesting: int = 0,
211+
spec: Type[CustomDestinationClientConfiguration] = None,
212+
max_parallel_load_jobs: Optional[int] = None,
213+
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
214+
**kwargs: Any,
215+
) -> Any:
216+
"""When used as a decorator, transforms a function that takes two positional arguments, "items" and "table",
217+
along with any number of keyword arguments with default values, into a callable that will create a custom destination.
218+
The function itself does not return anything, the keyword arguments can represent configuration or secret values.
219+
220+
When used as a function with the first argument being a string (the destination name),
221+
instantiates a destination from the provided destination name and type by retrieving the corresponding
222+
destination factory and initializing it with the given credentials and keyword arguments.
113223
"""
114224

115225
def decorator(
@@ -165,9 +275,31 @@ def wrapper(
165275
setattr(wrapper, "_factory", D) # noqa
166276
return wrapper
167277

168-
if func is None:
278+
if func_or_name is None:
169279
# we're called with parens.
170280
return decorator
281+
elif not isinstance(func_or_name, str):
282+
# we're called as @dlt.destination without parens.
283+
return decorator(func_or_name)
284+
285+
# Factory mode: create destination instance from given string
286+
destination_type = kwargs.pop("destination_type", None)
287+
if destination_type:
288+
destination = Destination.from_reference(
289+
ref=destination_type,
290+
destination_name=func_or_name,
291+
**kwargs,
292+
)
293+
elif kwargs.get("destination_callable"):
294+
destination = Destination.from_reference(
295+
ref="destination",
296+
destination_name=func_or_name,
297+
**kwargs,
298+
)
299+
else:
300+
destination = Destination.from_reference(
301+
ref=func_or_name,
302+
**kwargs,
303+
)
171304

172-
# we're called as @dlt.destination without parens.
173-
return decorator(func)
305+
return destination

docs/website/docs/dlt-ecosystem/destinations/destination.md

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,35 +129,31 @@ There are multiple ways to pass the custom destination function to the `dlt` pip
129129
p = dlt.pipeline("my_pipe", destination=my_destination(api_key=os.getenv("API_KEY"))) # type: ignore[call-arg]
130130
```
131131

132-
- Directly via destination reference. In this case, don't use the decorator for the destination function.
132+
- Via the `dlt.destination()` function that initializes the destination. In this case, don't use the decorator for the destination function.
133133
```py
134134
# File my_destination.py
135135

136-
from dlt.common.destination import Destination
137-
138136
# Don't use the decorator
139137
def local_destination_func(items: TDataItems, table: TTableSchema) -> None:
140138
...
141139

142-
# Via destination reference
140+
# Via dlt.destination() that initializes the destination
143141
p = dlt.pipeline(
144142
"my_pipe",
145-
destination=Destination.from_reference(
146-
"destination", destination_callable=local_destination_func
143+
destination=dlt.destination(
144+
"my_destination", destination_callable=local_destination_func
147145
)
148146
)
149147
```
150148
- Via a fully qualified string to function location (this can be set in `config.toml` or through environment variables). The destination function should be located in another file.
151149
```py
152150
# File my_pipeline.py
153151

154-
from dlt.common.destination import Destination
155-
156152
# Fully qualified string to function location
157153
p = dlt.pipeline(
158154
"my_pipe",
159-
destination=Destination.from_reference(
160-
"destination", destination_callable="my_destination.local_destination_func"
155+
destination=dlt.destination(
156+
"my_destination", destination_callable="my_destination.local_destination_func"
161157
)
162158
)
163159
```

docs/website/docs/general-usage/credentials/setup.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ You have additional options for using multiple instances of the same source:
747747

748748
1. Use the `clone()` method as explained in the [sql_database documentation](../../dlt-ecosystem/verified-sources/sql_database/advanced.md#configure-many-sources-side-by-side-with-custom-sections).
749749

750-
2. Create [named destinations](../destination.md#configure-multiple-destinations-in-a-pipeline) to use the same destination type with different configurations.
750+
2. Create [named destinations](../destination.md#configure-multiple-destinations-of-the-same-type) to use the same destination type with different configurations.
751751
:::
752752

753753
## Troubleshoot configuration errors

0 commit comments

Comments
 (0)