Skip to content

Commit d6fbc7d

Browse files
committed
dlt.destination as factory initializer, existing test adjusted
1 parent 036aab3 commit d6fbc7d

File tree

3 files changed

+217
-63
lines changed

3 files changed

+217
-63
lines changed

dlt/destinations/decorators.py

Lines changed: 177 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import functools
2-
31
import inspect
42
from typing import Any, Type, Optional, Callable, Union, overload
53
from typing_extensions import Concatenate
6-
from dlt.common.destination.reference import DestinationReference
4+
from dlt.common.destination.reference import DestinationReference, AnyDestination, Destination
75
from dlt.common.reflection.spec import get_spec_name_from_f
86
from dlt.common.typing import AnyFun
97

@@ -23,28 +21,11 @@
2321
)
2422

2523

26-
@overload
27-
def destination(
28-
func: Callable[
29-
Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any
30-
],
31-
/,
32-
loader_file_format: TLoaderFileFormat = None,
33-
batch_size: int = 10,
34-
name: str = None,
35-
naming_convention: str = "direct",
36-
skip_dlt_columns_and_tables: bool = True,
37-
max_table_nesting: int = 0,
38-
spec: Type[CustomDestinationClientConfiguration] = None,
39-
max_parallel_load_jobs: Optional[int] = None,
40-
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
41-
) -> Callable[TDestinationCallableParams, _destination]: ...
42-
43-
4424
@overload
4525
def destination(
4626
func: None = ...,
4727
/,
28+
*,
4829
loader_file_format: TLoaderFileFormat = None,
4930
batch_size: int = 10,
5031
name: str = None,
@@ -57,12 +38,71 @@ def destination(
5738
) -> Callable[
5839
[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]],
5940
Callable[TDestinationCallableParams, _destination],
60-
]: ...
41+
]:
42+
"""A decorator that transforms a function into a custom destination with configuration parameters.
43+
44+
This overload handles parameterized decorator usage where configuration is provided
45+
in the decorator call, and the function to be decorated is applied later.
46+
47+
#### Example Usage:
48+
49+
>>> @dlt.destination(batch_size=100, loader_file_format="parquet")
50+
>>> def my_destination(items, table, api_url: str = dlt.config.value, api_secret = dlt.secrets.value):
51+
>>> print(table["name"])
52+
>>> print(items)
53+
>>>
54+
>>> p = dlt.pipeline("chess_pipeline", destination=my_destination)
55+
56+
Args:
57+
func (None): Must be None for this overload. The actual function will be provided
58+
when the decorator is applied.
59+
60+
loader_file_format (TLoaderFileFormat, optional): Defines the format in which files are stored
61+
in the load package before being sent to the destination function. Defaults to None.
62+
63+
batch_size (int, optional): Defines how many items per function call are batched together
64+
and sent as an array. If set to 0, instead of passing actual data items, you will receive
65+
one call per load job with the path of the file as the "items" argument. Defaults to 10.
66+
67+
name (str, optional): Defines the name of the destination. If not provided, defaults to
68+
the name of the decorated function.
69+
70+
naming_convention (str, optional): Controls how table and column names are normalized.
71+
The default value, "direct", keeps all names unchanged.
72+
73+
skip_dlt_columns_and_tables (bool, optional): Defines whether internal dlt tables and columns
74+
are included in the custom destination function. Defaults to True.
75+
76+
max_table_nesting (int, optional): Defines how deep the normalizer will go to flatten nested
77+
fields in your data to create subtables. This overrides any source settings and defaults
78+
to 0, meaning no nested tables are created.
79+
80+
spec (Type[CustomDestinationClientConfiguration], optional): Defines a configuration spec used
81+
to inject arguments into the decorated function. Arguments not included in the spec will
82+
not be injected. Defaults to None.
83+
84+
max_parallel_load_jobs (Optional[int], optional): Defines the maximum number of load jobs
85+
that can run concurrently during the load process. Defaults to None.
86+
87+
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy], optional): Determines the
88+
load job parallelism strategy. Can be "sequential" (equivalent to max_parallel_load_jobs=1),
89+
"table-sequential" (one load job per table at a time), or "parallel". Defaults to None.
90+
91+
Returns:
92+
Callable[[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]], Callable[TDestinationCallableParams, _destination]]:
93+
A decorator function that accepts the destination function and returns a callable that can
94+
be used to create a custom dlt destination instance.
95+
"""
96+
...
6197

6298

99+
@overload
63100
def destination(
64-
func: Optional[AnyFun] = None,
101+
func: Callable[
102+
Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any
103+
],
65104
/,
105+
*,
66106
loader_file_format: TLoaderFileFormat = None,
67107
batch_size: int = 10,
68108
name: str = None,
@@ -72,45 +112,116 @@ def destination(
72112
spec: Type[CustomDestinationClientConfiguration] = None,
73113
max_parallel_load_jobs: Optional[int] = None,
74114
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
75-
) -> Any:
76-
"""A decorator that transforms a function that takes two positional arguments "table" and "items" and any number of keyword arguments with defaults
77-
into a callable that will create a custom destination. The function does not return anything, the keyword arguments can be configuration and secrets values.
115+
) -> Callable[TDestinationCallableParams, _destination]:
116+
"""Creates a destination factory from a function by directly passing it as an argument.
78117
79-
#### Example Usage with Configuration and Secrets:
118+
This overload handles direct function calls where the destination function is passed
119+
as the first argument, rather than using decorator syntax.
120+
121+
#### Example Usage:
80122
81-
>>> @dlt.destination(batch_size=100, loader_file_format="parquet")
82123
>>> def my_destination(items, table, api_url: str = dlt.config.value, api_secret = dlt.secrets.value):
83124
>>> print(table["name"])
84125
>>> print(items)
85126
>>>
86-
>>> p = dlt.pipeline("chess_pipeline", destination=my_destination)
87-
88-
Here all incoming data will be sent to the destination function with the items in the requested format and the dlt table schema.
89-
The config and secret values will be resolved from the path destination.my_destination.api_url and destination.my_destination.api_secret.
127+
>>> dest = dlt.destination(my_destination, batch_size=100, loader_file_format="parquet")
128+
>>> p = dlt.pipeline("chess_pipeline", destination=dest)
90129
91130
Args:
92-
func (Optional[AnyFun]): A function that takes two positional arguments "table" and "items" and any number of keyword arguments with defaults which will process the incoming data.
131+
func (Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]):
132+
A callable that takes two positional arguments "items" and "table",
133+
followed by any number of keyword arguments with default values. This function
134+
will process the incoming data and does not need to return anything. The keyword
135+
arguments can represent configuration or secret values.
93136
94-
loader_file_format (TLoaderFileFormat): defines in which format files are stored in the load package before being sent to the destination function, this can be puae-jsonl or parquet.
137+
loader_file_format (TLoaderFileFormat): Defines the format in which files are stored in the load package
138+
before being sent to the destination function.
95139
96-
batch_size (int): defines how many items per function call are batched together and sent as an array. If you set a batch-size of 0, instead of passing in actual dataitems, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file in any way you like.
140+
batch_size (int): Defines how many items per function call are batched together and sent as an array.
141+
If set to 0, instead of passing actual data items, you will receive one call per load job
142+
with the path of the file as the "items" argument. You can then open and process that file as needed.
143+
144+
name (str): Defines the name of the destination created by the destination decorator.
145+
Defaults to the name of the function.
146+
147+
naming_convention (str): Controls how table and column names are normalized.
148+
The default value, "direct", keeps all names unchanged.
149+
150+
skip_dlt_columns_and_tables (bool): Defines whether internal dlt tables and columns
151+
are included in the custom destination function. Defaults to True.
152+
153+
max_table_nesting (int): Defines how deep the normalizer will go to flatten nested fields
154+
in your data to create subtables. This overrides any source settings and defaults to 0,
155+
meaning no nested tables are created.
156+
157+
spec (Type[CustomDestinationClientConfiguration]): Defines a configuration spec used
158+
to inject arguments into the decorated function. Arguments not included in the spec will not be injected.
159+
160+
max_parallel_load_jobs (Optional[int]): Defines the maximum number of load jobs
161+
that can run concurrently during the load process.
162+
163+
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy]): Determines the load job parallelism strategy.
164+
Can be "sequential" (equivalent to max_parallel_load_jobs=1), "table-sequential" (one load job per table at a time),
165+
or "parallel".
166+
167+
Returns:
168+
Callable[TDestinationCallableParams, _destination]: A callable that can be used to create a custom dlt destination instance.
169+
"""
170+
...
97171

98-
name (str): defines the name of the destination that gets created by the destination decorator, defaults to the name of the function
99172

100-
naming_convention (str): defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is direct which will keep all names the same.
173+
@overload
174+
def destination(
175+
destination_name: str,
176+
/,
177+
*,
178+
destination_type: Optional[str] = None,
179+
credentials: Optional[Any] = None,
180+
**kwargs: Any,
181+
) -> AnyDestination:
182+
"""Instantiates a destination from the provided destination name and type by retrieving the corresponding
183+
destination factory and initializing it with the given credentials and keyword arguments.
101184
102-
skip_dlt_columns_and_tables (bool): defines wether internal tables and columns will be fed into the custom destination function. This is set to True by default.
185+
Args:
186+
destination_name (str): The name of the destination instance to initialize.
103187
104-
max_table_nesting (int): defines how deep the normalizer will go to normalize nested fields on your data to create subtables. This overwrites any settings on your source and is set to zero to not create any nested tables by default.
188+
destination_type (Optional[str]): The type of the destination to instantiate.
105189
106-
spec (Type[CustomDestinationClientConfiguration]): defines a configuration spec that will be used to to inject arguments into the decorated functions. Argument not in spec will not be injected
190+
credentials (Optional[Any]): Credentials used to connect to the destination.
191+
May be an instance of a credential class supported by the respective destination.
107192
108-
max_parallel_load_jobs (Optional[int]): how many load jobs at most will be running during the load
193+
**kwargs (Any): Additional keyword arguments passed to the destination factory.
109194
110-
loader_parallelism_strategy (Optional[TLoaderParallelismStrategy]): Can be "sequential" which equals max_parallel_load_jobs=1, "table-sequential" where each table will have at most one loadjob at any given time and "parallel"
111195
Returns:
112-
Any: A callable that can be used to create a dlt custom destination instance
196+
AnyDestination: An initialized destination.
197+
"""
198+
...
199+
200+
201+
def destination(
202+
func_or_name: Union[Optional[AnyFun], str] = None,
203+
/,
204+
loader_file_format: TLoaderFileFormat = None,
205+
batch_size: int = 10,
206+
name: str = None,
207+
naming_convention: str = "direct",
208+
skip_dlt_columns_and_tables: bool = True,
209+
max_table_nesting: int = 0,
210+
spec: Type[CustomDestinationClientConfiguration] = None,
211+
max_parallel_load_jobs: Optional[int] = None,
212+
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None,
213+
**kwargs: Any,
214+
) -> Any:
215+
"""When used as a decorator, transforms a function that takes two positional arguments, "items" and "table",
216+
along with any number of keyword arguments with default values, into a callable that will create a custom destination.
217+
The function itself does not return anything, the keyword arguments can represent configuration or secret values.
218+
219+
When used as a function with the first argument being a string (the destination name),
220+
instantiates a destination from the provided destination name and type by retrieving the corresponding
221+
destination factory and initializing it with the given credentials and keyword arguments.
113222
"""
223+
if func_or_name is None and "func" in kwargs:
224+
func_or_name = kwargs.pop("func")
114225

115226
def decorator(
116227
destination_callable: Callable[
@@ -165,9 +276,30 @@ def wrapper(
165276
setattr(wrapper, "_factory", D) # noqa
166277
return wrapper
167278

168-
if func is None:
279+
if func_or_name is None:
169280
# we're called with parens.
170281
return decorator
282+
elif isinstance(func_or_name, str):
283+
# Factory mode: create destination instance
284+
destination_type = kwargs.pop("destination_type", None)
285+
if destination_type:
286+
return Destination.from_reference(
287+
ref=destination_type,
288+
destination_name=func_or_name,
289+
**kwargs,
290+
)
291+
elif kwargs.get("destination_callable"):
292+
destination_name = None if func_or_name == "destination" else func_or_name
293+
return Destination.from_reference(
294+
ref="destination",
295+
destination_name=destination_name,
296+
**kwargs,
297+
)
298+
else:
299+
return Destination.from_reference(
300+
ref=func_or_name,
301+
**kwargs,
302+
)
171303

172304
# we're called as @dlt.destination without parens.
173-
return decorator(func)
305+
return decorator(func_or_name)

tests/destinations/test_custom_destination.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
from tests.load.utils import (
2626
assert_all_data_types_row,
2727
)
28+
from tests.utils import preserve_environ
29+
2830

2931
SUPPORTED_LOADER_FORMATS = ["parquet", "typed-jsonl"]
3032

@@ -158,7 +160,12 @@ def test_capabilities() -> None:
158160
assert dict(caps) == dict(client_caps)
159161

160162

161-
def test_instantiation() -> None:
163+
@pytest.mark.parametrize(
164+
"use_dest_decorator",
165+
[True, False],
166+
ids=["using_dest_decorator", "without_using_dest_decorator"],
167+
)
168+
def test_instantiation(use_dest_decorator: bool) -> None:
162169
# also tests DESTINATIONS registry
163170
calls: List[Tuple[TDataItems, TTableSchema]] = []
164171

@@ -180,11 +187,15 @@ def local_sink_func(items: TDataItems, table: TTableSchema, my_val=dlt.config.va
180187
DestinationReference.find("local_sink_func")
181188
assert "dlt.destinations.local_sink_func" not in DestinationReference.DESTINATIONS
182189

190+
# NOTE: using dlt.destination as factory initializer and Destination.from_reference
191+
# should behave the same
192+
dest_ref_func = dlt.destination if use_dest_decorator else Destination.from_reference
193+
183194
# test passing via from_reference
184195
calls = []
185196
p = dlt.pipeline(
186197
"sink_test",
187-
destination=Destination.from_reference("destination", destination_callable=local_sink_func),
198+
destination=dest_ref_func("destination", destination_callable=local_sink_func),
188199
dev_mode=True,
189200
)
190201
p.run([1, 2, 3], table_name="items")
@@ -200,9 +211,7 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
200211

201212
p = dlt.pipeline(
202213
"sink_test",
203-
destination=Destination.from_reference(
204-
"destination", destination_callable=local_sink_func_no_params
205-
),
214+
destination=dest_ref_func("destination", destination_callable=local_sink_func_no_params),
206215
dev_mode=True,
207216
)
208217
p.run([1, 2, 3], table_name="items")
@@ -211,10 +220,11 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
211220
global global_calls
212221
global_calls = []
213222
# this is technically possible but should not be used
214-
dest_ref = Destination.from_reference(
223+
dest_ref = dest_ref_func(
215224
"destination",
216225
destination_callable="tests.destinations.test_custom_destination.global_sink_func",
217226
)
227+
218228
assert dest_ref.destination_name == "global_sink_func"
219229
# type comes from the "destination" wrapper destination
220230
assert dest_ref.destination_type == "dlt.destinations.destination"
@@ -244,10 +254,11 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
244254
assert len(global_calls) == 2
245255

246256
# we can import type (it is not a ref)
247-
dest_ref = Destination.from_reference(
257+
dest_ref = dest_ref_func(
248258
"tests.destinations.test_custom_destination.GlobalSinkFuncDestination",
249259
destination_name="alt_name",
250260
)
261+
251262
assert dest_ref.destination_name == "alt_name"
252263
assert (
253264
dest_ref.destination_type
@@ -263,9 +274,7 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
263274
assert len(global_calls) == 3
264275

265276
# now import by ref
266-
dest_ref = Destination.from_reference(
267-
"tests.destinations.test_custom_destination.global_sink_func"
268-
)
277+
dest_ref = dest_ref_func("tests.destinations.test_custom_destination.global_sink_func")
269278
assert dest_ref.destination_name == "global_sink_func"
270279
assert (
271280
dest_ref.destination_type
@@ -283,7 +292,7 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
283292
# pass None as callable arg will fail on load
284293
p = dlt.pipeline(
285294
"sink_test",
286-
destination=Destination.from_reference("destination", destination_callable=None),
295+
destination=dest_ref_func("destination", destination_callable=None),
287296
dev_mode=True,
288297
)
289298
with pytest.raises(ConfigurationValueError):
@@ -293,9 +302,7 @@ def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
293302
with pytest.raises(UnknownCustomDestinationCallable):
294303
p = dlt.pipeline(
295304
"sink_test",
296-
destination=Destination.from_reference(
297-
"destination", destination_callable="does.not.exist"
298-
),
305+
destination=dest_ref_func("destination", destination_callable="does.not.exist"),
299306
dev_mode=True,
300307
)
301308

0 commit comments

Comments
 (0)