Skip to content

Commit 6c1afa3

Browse files
adding state migration for lookbacks
1 parent e7a9fb1 commit 6c1afa3

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

airbyte-integrations/connectors/source-stripe/source_stripe/components.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Iterable, Mapping, Any, Optional, List, Set
2-
2+
from datetime import timedelta, datetime
33
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
4+
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
45
from airbyte_cdk.models import SyncMode
56
from airbyte_cdk.sources.declarative.types import StreamSlice
67

@@ -21,3 +22,36 @@ def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore
2122

2223
self.logger.info(f"[CustomerIdPartitionRouter] Emitting {len(unique_slices)} unique slices")
2324
yield from unique_slices
25+
26+
class CustomerBalanceStateMigration(StateMigration):
27+
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
28+
return True
29+
30+
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
31+
current_state = stream_state or {}
32+
state_created = int(current_state.get("state", {}).get("created", 0))
33+
current_parent_state = current_state.get("parent_state", {})
34+
35+
two_days = int(timedelta(days=2).total_seconds())
36+
fourteen_day_floor = int(datetime.utcnow().timestamp()) - int(timedelta(days=14).total_seconds())
37+
38+
created_customer_balance_transactions = max(state_created - two_days, fourteen_day_floor, 0)
39+
updated_customers = max(0, current_parent_state.get("customers", {}).get("updated", 0) - two_days, fourteen_day_floor)
40+
updated_invoices = max(0, current_parent_state.get("invoices", {}).get("updated", 0) - two_days, fourteen_day_floor)
41+
42+
migrated_state = {
43+
"state": {
44+
"created": created_customer_balance_transactions
45+
},
46+
"parent_state": {
47+
"customers": {
48+
"updated": updated_customers
49+
},
50+
"invoices": {
51+
"updated": updated_invoices
52+
}
53+
},
54+
"use_global_cursor": True
55+
}
56+
57+
return migrated_state

airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ definitions:
864864
types[]: '{{["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"]}}'
865865

866866
customer_balance_transactions:
867-
$ref: "#/definitions/entity_stream"
867+
$ref: "#/definitions/base_stream"
868868
$parameters:
869869
name: customer_balance_transactions
870870
retriever:
@@ -887,10 +887,12 @@ definitions:
887887
stream:
888888
$ref: "#/definitions/streams/invoices"
889889
incremental_dependency: true
890+
state_migrations:
891+
- type: CustomStateMigration
892+
class_name: source_stripe.components.CustomerBalanceStateMigration
890893
incremental_sync:
891894
$ref: "#/definitions/entity_single_slice_cursor"
892895
cursor_field: created
893-
lookback_window_days: 2
894896
global_substream_cursor: true
895897
transformations:
896898
- type: RemoveFields

0 commit comments

Comments
 (0)