Skip to content

Commit 90b1f52

Browse files
authored
Feat/require target state keys to fit into type stable key (#1625)
* Using StableKey Instead of PyKey * fixing tests * Implemented IntoPyObject for PyStableKey * Adressing code comments * Removing _py argument in declare_target_state_with_child * Reverting back to using fingerprint * Final touches * Reducing unnecessary changes * adressing a few more reviews * Adding Symbol support in IntoPyObject
1 parent 26628f0 commit 90b1f52

17 files changed

Lines changed: 369 additions & 193 deletions

File tree

python/cocoindex/_internal/target_state.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,29 @@
1818
from . import core
1919
from .component_ctx import get_context_from_ctx
2020
from .pending_marker import PendingS, MaybePendingS, ResolvesTo
21-
from .typing import NonExistenceType
21+
from .typing import NonExistenceType, StableKey
2222

2323

2424
ActionT = TypeVar("ActionT")
2525
ActionT_co = TypeVar("ActionT_co", covariant=True)
2626
ActionT_contra = TypeVar("ActionT_contra", contravariant=True)
27-
KeyT = TypeVar("KeyT", bound=Hashable)
28-
KeyT_contra = TypeVar("KeyT_contra", contravariant=True, bound=Hashable)
27+
2928
ValueT = TypeVar("ValueT", default=Any)
3029
ValueT_contra = TypeVar("ValueT_contra", contravariant=True, default=Any)
3130
TrackingRecordT = TypeVar("TrackingRecordT", default=Any)
3231
TrackingRecordT_co = TypeVar("TrackingRecordT_co", covariant=True, default=Any)
3332
HandlerT_co = TypeVar(
34-
"HandlerT_co", covariant=True, bound="TargetHandler[Any, Any, Any, Any]"
33+
"HandlerT_co", covariant=True, bound="TargetHandler[Any, Any, Any]"
3534
)
3635
OptChildHandlerT = TypeVar(
3736
"OptChildHandlerT",
38-
bound="TargetHandler[Any, Any, Any, Any] | None",
37+
bound="TargetHandler[Any, Any, Any] | None",
3938
default=None,
4039
covariant=True,
4140
)
4241
OptChildHandlerT_co = TypeVar(
4342
"OptChildHandlerT_co",
44-
bound="TargetHandler[Any, Any, Any, Any] | None",
43+
bound="TargetHandler[Any, Any, Any] | None",
4544
default=None,
4645
covariant=True,
4746
)
@@ -144,12 +143,10 @@ class TargetReconcileOutput(
144143
tracking_record: TrackingRecordT_co | NonExistenceType
145144

146145

147-
class TargetHandler(
148-
Protocol[KeyT_contra, ValueT_contra, TrackingRecordT, OptChildHandlerT_co]
149-
):
146+
class TargetHandler(Protocol[ValueT_contra, TrackingRecordT, OptChildHandlerT_co]):
150147
def reconcile(
151148
self,
152-
key: KeyT_contra,
149+
key: StableKey,
153150
desired_target_state: ValueT_contra | NonExistenceType,
154151
prev_possible_states: Collection[TrackingRecordT],
155152
prev_may_be_missing: bool,
@@ -158,8 +155,8 @@ def reconcile(
158155

159156

160157
class TargetStateProvider(
161-
Generic[KeyT, ValueT, OptChildHandlerT, MaybePendingS],
162-
ResolvesTo["TargetStateProvider[KeyT, ValueT, OptChildHandlerT]"],
158+
Generic[ValueT, OptChildHandlerT, MaybePendingS],
159+
ResolvesTo["TargetStateProvider[ValueT, OptChildHandlerT]"],
163160
):
164161
__slots__ = ("_core", "memo_key")
165162
_core: core.TargetStateProvider
@@ -170,8 +167,8 @@ def __init__(self, core_provider: core.TargetStateProvider):
170167
self.memo_key = core_provider.coco_memo_key()
171168

172169
def target_state(
173-
self: TargetStateProvider[KeyT, ValueT, OptChildHandlerT],
174-
key: KeyT,
170+
self: TargetStateProvider[ValueT, OptChildHandlerT],
171+
key: StableKey,
175172
value: ValueT,
176173
) -> "TargetState[OptChildHandlerT]":
177174
return TargetState(self, key, value)
@@ -181,20 +178,20 @@ def __coco_memo_key__(self) -> str:
181178

182179

183180
PendingTargetStateProvider: TypeAlias = TargetStateProvider[
184-
KeyT, ValueT, OptChildHandlerT, PendingS
181+
ValueT, OptChildHandlerT, PendingS
185182
]
186183

187184

188185
class TargetState(Generic[OptChildHandlerT]):
189186
__slots__ = ("_provider", "_key", "_value")
190-
_provider: TargetStateProvider[Any, Any, OptChildHandlerT]
187+
_provider: TargetStateProvider[Any, OptChildHandlerT]
191188
_key: Any
192189
_value: Any
193190

194191
def __init__(
195192
self,
196-
provider: TargetStateProvider[KeyT, ValueT, OptChildHandlerT],
197-
key: KeyT,
193+
provider: TargetStateProvider[ValueT, OptChildHandlerT],
194+
key: StableKey,
198195
value: ValueT,
199196
):
200197
self._provider = provider
@@ -220,8 +217,8 @@ def declare_target_state(target_state: TargetState[None]) -> None:
220217

221218

222219
def declare_target_state_with_child(
223-
target_state: TargetState[TargetHandler[KeyT, ValueT, Any, OptChildHandlerT]],
224-
) -> PendingTargetStateProvider[KeyT, ValueT, OptChildHandlerT]:
220+
target_state: TargetState[TargetHandler[ValueT, Any, OptChildHandlerT]],
221+
) -> PendingTargetStateProvider[ValueT, OptChildHandlerT]:
225222
"""
226223
Declare a target state with a child handler within the current component context.
227224
@@ -243,7 +240,7 @@ def declare_target_state_with_child(
243240

244241

245242
def register_root_target_states_provider(
246-
name: str, handler: TargetHandler[KeyT, ValueT, Any, OptChildHandlerT]
247-
) -> TargetStateProvider[KeyT, ValueT, OptChildHandlerT]:
243+
name: str, handler: TargetHandler[ValueT, Any, OptChildHandlerT]
244+
) -> TargetStateProvider[ValueT, OptChildHandlerT]:
248245
provider = core.register_root_target_states_provider(name, handler)
249246
return TargetStateProvider(provider)

python/cocoindex/connectors/lancedb/_target.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Literal,
1919
NamedTuple,
2020
Sequence,
21+
cast,
2122
)
2223

2324
from typing_extensions import TypeVar
@@ -315,7 +316,7 @@ class _RowAction(NamedTuple):
315316
value: _RowValue | None # None means delete
316317

317318

318-
class _RowHandler(coco.TargetHandler[_RowKey, _RowValue, _RowFingerprint]):
319+
class _RowHandler(coco.TargetHandler[_RowValue, _RowFingerprint]):
319320
"""Handler for row-level target states within a table."""
320321

321322
_db_key: str
@@ -408,10 +409,12 @@ async def _execute_deletes(
408409
) -> None:
409410
"""Execute delete operations using LanceDB's delete."""
410411
pk_cols = self._table_schema.primary_key
412+
print(f"DEBUG: LanceDB deleting {len(deletes)} rows. PK cols: {pk_cols}")
411413

412414
# Build delete conditions for each row
413415
# LanceDB delete syntax: table.delete("column = value")
414416
for action in deletes:
417+
print(f"DEBUG: Processing delete for key: {action.key}")
415418
conditions = []
416419
for i, pk_col in enumerate(pk_cols):
417420
pk_value = action.key[i]
@@ -422,6 +425,7 @@ async def _execute_deletes(
422425
conditions.append(f"{pk_col} = {pk_value}")
423426

424427
condition = " AND ".join(conditions)
428+
print(f"DEBUG: Generated delete condition: {condition}")
425429
await table.delete(condition)
426430

427431
def _build_pyarrow_schema(self) -> pa.Schema:
@@ -434,12 +438,16 @@ def _build_pyarrow_schema(self) -> pa.Schema:
434438

435439
def reconcile(
436440
self,
437-
key: _RowKey,
441+
key: coco.StableKey,
438442
desired_state: _RowValue | coco.NonExistenceType,
439443
prev_possible_states: Collection[_RowFingerprint],
440444
prev_may_be_missing: bool,
441445
/,
442446
) -> coco.TargetReconcileOutput[_RowAction, _RowFingerprint] | None:
447+
if isinstance(key, tuple):
448+
key = cast(_RowKey, key)
449+
else:
450+
raise TypeError(f"Row key must be a tuple, got {type(key)}")
443451
if coco.is_non_existence(desired_state):
444452
# Delete case - only if it might exist
445453
if not prev_possible_states and not prev_may_be_missing:
@@ -542,9 +550,7 @@ class _TableAction(NamedTuple):
542550
)
543551

544552

545-
class _TableHandler(
546-
coco.TargetHandler[_TableKey, _TableSpec, _TableTrackingRecord, _RowHandler]
547-
):
553+
class _TableHandler(coco.TargetHandler[_TableSpec, _TableTrackingRecord, _RowHandler]):
548554
"""Handler for table-level target states."""
549555

550556
_sink: coco.TargetActionSink[_TableAction, _RowHandler]
@@ -660,7 +666,7 @@ def _build_pyarrow_schema(self, schema: TableSchema[Any]) -> pa.Schema:
660666

661667
def reconcile(
662668
self,
663-
key: _TableKey,
669+
key: coco.StableKey,
664670
desired_state: _TableSpec | coco.NonExistenceType,
665671
prev_possible_states: Collection[_TableTrackingRecord],
666672
prev_may_be_missing: bool,
@@ -669,6 +675,11 @@ def reconcile(
669675
coco.TargetReconcileOutput[_TableAction, _TableTrackingRecord, _RowHandler]
670676
| None
671677
):
678+
if isinstance(key, tuple):
679+
key_args = cast(tuple[str, str], key)
680+
key = _TableKey(*key_args)
681+
else:
682+
raise TypeError(f"Table key must be a tuple, got {type(key)}")
672683
tracking_record: _TableTrackingRecord | coco.NonExistenceType
673684

674685
if coco.is_non_existence(desired_state):
@@ -727,14 +738,12 @@ class TableTarget(
727738
RowT: The type of row objects (dict, dataclass, NamedTuple, or Pydantic model).
728739
"""
729740

730-
_provider: coco.TargetStateProvider[_RowKey, _RowValue, None, coco.MaybePendingS]
741+
_provider: coco.TargetStateProvider[_RowValue, None, coco.MaybePendingS]
731742
_table_schema: TableSchema[RowT]
732743

733744
def __init__(
734745
self,
735-
provider: coco.TargetStateProvider[
736-
_RowKey, _RowValue, None, coco.MaybePendingS
737-
],
746+
provider: coco.TargetStateProvider[_RowValue, None, coco.MaybePendingS],
738747
table_schema: TableSchema[RowT],
739748
) -> None:
740749
self._provider = provider

python/cocoindex/connectors/localfs/_target.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class _EntryTrackingRecord:
177177

178178

179179
class _EntryHandler(
180-
coco.TargetHandler[_EntryName, _EntrySpec, _EntryTrackingRecord, "_EntryHandler"]
180+
coco.TargetHandler[_EntrySpec, _EntryTrackingRecord, "_EntryHandler"]
181181
):
182182
"""Handler for file and directory entries within a parent directory."""
183183

@@ -190,7 +190,7 @@ def __init__(self, base_path: pathlib.Path) -> None:
190190

191191
def reconcile(
192192
self,
193-
key: _EntryName,
193+
key: coco.StableKey,
194194
desired_state: _EntrySpec | coco.NonExistenceType,
195195
prev_possible_states: Collection[_EntryTrackingRecord],
196196
prev_may_be_missing: bool,
@@ -199,6 +199,7 @@ def reconcile(
199199
coco.TargetReconcileOutput[_EntryAction, _EntryTrackingRecord, "_EntryHandler"]
200200
| None
201201
):
202+
key = cast(_EntryName, key)
202203
path = self._base_path / key
203204
return _reconcile_entry(
204205
path, desired_state, prev_possible_states, prev_may_be_missing
@@ -238,14 +239,12 @@ def _resolve_root_path(key: _RootKey) -> pathlib.Path:
238239
return (base_path / key.path).resolve()
239240

240241

241-
class _RootHandler(
242-
coco.TargetHandler[_RootKey, _EntrySpec, _EntryTrackingRecord, _EntryHandler]
243-
):
242+
class _RootHandler(coco.TargetHandler[_EntrySpec, _EntryTrackingRecord, _EntryHandler]):
244243
"""Handler for root-level entries (files and directories)."""
245244

246245
def reconcile(
247246
self,
248-
key: _RootKey,
247+
key: coco.StableKey,
249248
desired_state: _EntrySpec | coco.NonExistenceType,
250249
prev_possible_states: Collection[_EntryTrackingRecord],
251250
prev_may_be_missing: bool,
@@ -254,6 +253,12 @@ def reconcile(
254253
coco.TargetReconcileOutput[_EntryAction, _EntryTrackingRecord, _EntryHandler]
255254
| None
256255
):
256+
if isinstance(key, tuple):
257+
key_args = cast(tuple[str | None, str], key)
258+
key = _RootKey(*key_args)
259+
else:
260+
raise TypeError(f"Root key must be a tuple, got {type(key)}")
261+
257262
path = _resolve_root_path(key)
258263
return _reconcile_entry(
259264
path, desired_state, prev_possible_states, prev_may_be_missing
@@ -282,14 +287,12 @@ class DirTarget(Generic[coco.MaybePendingS], coco.ResolvesTo["DirTarget"]):
282287
files and directories that are no longer declared.
283288
"""
284289

285-
_provider: coco.TargetStateProvider[
286-
_EntryName, _EntrySpec, _EntryHandler, coco.MaybePendingS
287-
]
290+
_provider: coco.TargetStateProvider[_EntrySpec, _EntryHandler, coco.MaybePendingS]
288291

289292
def __init__(
290293
self,
291294
provider: coco.TargetStateProvider[
292-
_EntryName, _EntrySpec, _EntryHandler, coco.MaybePendingS
295+
_EntrySpec, _EntryHandler, coco.MaybePendingS
293296
],
294297
) -> None:
295298
self._provider = provider

python/cocoindex/connectors/postgres/_target.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
Literal,
2424
NamedTuple,
2525
Sequence,
26+
cast,
2627
)
2728

2829
from typing_extensions import TypeVar
@@ -359,7 +360,7 @@ class _RowAction(NamedTuple):
359360
value: _RowValue | None # None means delete
360361

361362

362-
class _RowHandler(coco.TargetHandler[_RowKey, _RowValue, _RowFingerprint]):
363+
class _RowHandler(coco.TargetHandler[_RowValue, _RowFingerprint]):
363364
"""Handler for row-level target states within a table."""
364365

365366
_pool: asyncpg.Pool
@@ -473,12 +474,16 @@ async def _execute_deletes(
473474

474475
def reconcile(
475476
self,
476-
key: _RowKey,
477+
key: coco.StableKey,
477478
desired_state: _RowValue | coco.NonExistenceType,
478479
prev_possible_states: Collection[_RowFingerprint],
479480
prev_may_be_missing: bool,
480481
/,
481482
) -> coco.TargetReconcileOutput[_RowAction, _RowFingerprint] | None:
483+
if isinstance(key, tuple):
484+
key = cast(_RowKey, key)
485+
else:
486+
raise TypeError(f"Row key must be a tuple, got {type(key)}")
482487
if coco.is_non_existence(desired_state):
483488
# Delete case - only if it might exist
484489
if not prev_possible_states and not prev_may_be_missing:
@@ -594,9 +599,7 @@ class _TableAction(NamedTuple):
594599
)
595600

596601

597-
class _TableHandler(
598-
coco.TargetHandler[_TableKey, _TableSpec, _TableTrackingRecord, _RowHandler]
599-
):
602+
class _TableHandler(coco.TargetHandler[_TableSpec, _TableTrackingRecord, _RowHandler]):
600603
"""Handler for table-level target states."""
601604

602605
_sink: coco.TargetActionSink[_TableAction, _RowHandler]
@@ -801,7 +804,7 @@ async def _apply_column_actions(
801804

802805
def reconcile(
803806
self,
804-
key: _TableKey,
807+
key: coco.StableKey,
805808
desired_state: _TableSpec | coco.NonExistenceType,
806809
prev_possible_states: Collection[_TableTrackingRecord],
807810
prev_may_be_missing: bool,
@@ -810,6 +813,12 @@ def reconcile(
810813
coco.TargetReconcileOutput[_TableAction, _TableTrackingRecord, _RowHandler]
811814
| None
812815
):
816+
if isinstance(key, tuple):
817+
key_args = cast(tuple[str, str | None, str], key)
818+
key = _TableKey(*key_args)
819+
else:
820+
raise TypeError(f"Table key must be a tuple, got {type(key)}")
821+
813822
tracking_record: _TableTrackingRecord | coco.NonExistenceType
814823

815824
if coco.is_non_existence(desired_state):
@@ -868,14 +877,12 @@ class TableTarget(
868877
RowT: The type of row objects (dict, dataclass, NamedTuple, or Pydantic model).
869878
"""
870879

871-
_provider: coco.TargetStateProvider[_RowKey, _RowValue, None, coco.MaybePendingS]
880+
_provider: coco.TargetStateProvider[_RowValue, None, coco.MaybePendingS]
872881
_table_schema: TableSchema[RowT]
873882

874883
def __init__(
875884
self,
876-
provider: coco.TargetStateProvider[
877-
_RowKey, _RowValue, None, coco.MaybePendingS
878-
],
885+
provider: coco.TargetStateProvider[_RowValue, None, coco.MaybePendingS],
879886
table_schema: TableSchema[RowT],
880887
) -> None:
881888
self._provider = provider

0 commit comments

Comments
 (0)