|
1 | 1 | from copy import deepcopy |
2 | 2 | import gzip |
3 | 3 | import os |
4 | | -from typing import Any, Iterator, List, cast |
| 4 | +from typing import Any, Iterator, List, cast, Tuple |
5 | 5 | from pathlib import Path |
6 | 6 | import pytest |
7 | 7 |
|
|
15 | 15 | from dlt.common.schema.schema import Schema |
16 | 16 | from dlt.common.schema.typing import VERSION_TABLE_NAME, REPLACE_STRATEGIES, TLoaderReplaceStrategy |
17 | 17 | from dlt.common.schema.utils import new_table |
18 | | -from dlt.common.typing import TDataItem |
| 18 | +from dlt.common.schema import TTableSchema |
| 19 | +from dlt.common.typing import TDataItem, TDataItems |
19 | 20 | from dlt.common.utils import uniq_id |
20 | 21 |
|
21 | 22 | from dlt.destinations.exceptions import DestinationUndefinedEntity |
@@ -1228,3 +1229,23 @@ def test_data(): |
1228 | 1229 | info = pipeline.run(test_data()) |
1229 | 1230 | assert_load_info(info) |
1230 | 1231 | assert (Path(TEST_STORAGE_ROOT) / FILE_BUCKET / pipeline.dataset_name / "test_data").exists() |
| 1232 | + |
| 1233 | + # 9. Should automatically infer destination type as 'dlt.destinations.destination' (custom destination implementation), |
| 1234 | + # if destination_callable is provided |
| 1235 | + calls: List[Tuple[TDataItems, TTableSchema]] = [] |
| 1236 | + |
| 1237 | + def local_sink_func(items: TDataItems, table: TTableSchema, my_val=dlt.config.value, /) -> None: |
| 1238 | + nonlocal calls |
| 1239 | + assert my_val == "something" |
| 1240 | + calls.append((items, table)) |
| 1241 | + |
| 1242 | + os.environ["DESTINATION__MY_VAL"] = "something" |
| 1243 | + |
| 1244 | + p = dlt.pipeline( |
| 1245 | + "sink_test", |
| 1246 | + destination=dlt.destination("custom_name", destination_callable=local_sink_func), |
| 1247 | + ) |
| 1248 | + assert p.destination.destination_name == "custom_name" |
| 1249 | + assert p.destination.destination_type == "dlt.destinations.destination" |
| 1250 | + p.run([1, 2, 3], table_name="items") |
| 1251 | + assert len(calls) == 1 |
0 commit comments