Skip to content

Commit 9f7607e

Browse files
adding state migration for lookbacks
1 parent e7a9fb1 commit 9f7607e

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

airbyte-integrations/connectors/source-stripe/source_stripe/components.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Iterable, Mapping, Any, Optional, List, Set
2-
2+
from datetime import timedelta, datetime
33
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
4+
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
45
from airbyte_cdk.models import SyncMode
56
from airbyte_cdk.sources.declarative.types import StreamSlice
67

@@ -21,3 +22,36 @@ def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore
2122

2223
self.logger.info(f"[CustomerIdPartitionRouter] Emitting {len(unique_slices)} unique slices")
2324
yield from unique_slices
25+
26+
class CustomerBalanceStateMigration(StateMigration):
27+
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
28+
return True
29+
30+
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
31+
current_state = stream_state or {}
32+
state_created = int(current_state.get("state", {}).get("created", 0))
33+
current_parent_state = current_state.get("parent_state", {})
34+
35+
two_days = int(timedelta(days=2).total_seconds())
36+
fourteen_day_floor = int(datetime.utcnow().timestamp()) - int(timedelta(days=14).total_seconds())
37+
38+
created_customer_balance_transactions = max(state_created - two_days, fourteen_day_floor, 0)
39+
updated_customers = max(0, current_parent_state.get("customers", {}).get("updated", 0) - two_days, fourteen_day_floor)
40+
updated_invoices = max(0, current_parent_state.get("invoices", {}).get("updated", 0) - two_days, fourteen_day_floor)
41+
42+
migrated_state = {
43+
"state": {
44+
"created": created_customer_balance_transactions
45+
},
46+
"parent_state": {
47+
"customers": {
48+
"updated": updated_customers
49+
},
50+
"invoices": {
51+
"updated": updated_invoices
52+
}
53+
},
54+
"use_global_cursor": True
55+
}
56+
57+
return migrated_state

airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,6 @@ definitions:
308308
$parameters:
309309
request_parameters:
310310
types[]: '{{["customer.created", "customer.updated", "customer.deleted"]}}'
311-
cursor_field: updated
312-
lookback_window_days: 1
313311

314312
subscriptions:
315313
type: StateDelegatingStream
@@ -354,8 +352,6 @@ definitions:
354352
$parameters:
355353
request_parameters:
356354
types[]: '{{["invoice.created", "invoice.deleted", "invoice.finalization_failed", "invoice.finalized", "invoice.marked_uncollectible", "invoice.overdue", "invoice.paid", "invoice.payment_action_required", "invoice.payment_failed", "invoice.payment_succeeded", "invoice.sent", "invoice.updated", "invoice.voided", "invoice.will_be_due"]}}'
357-
cursor_field: updated
358-
lookback_window_days: 1
359355

360356
transfers:
361357
type: StateDelegatingStream
@@ -864,7 +860,7 @@ definitions:
864860
types[]: '{{["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"]}}'
865861

866862
customer_balance_transactions:
867-
$ref: "#/definitions/entity_stream"
863+
$ref: "#/definitions/base_stream"
868864
$parameters:
869865
name: customer_balance_transactions
870866
retriever:
@@ -887,10 +883,12 @@ definitions:
887883
stream:
888884
$ref: "#/definitions/streams/invoices"
889885
incremental_dependency: true
886+
state_migrations:
887+
- type: CustomStateMigration
888+
class_name: source_stripe.components.CustomerBalanceStateMigration
890889
incremental_sync:
891890
$ref: "#/definitions/entity_single_slice_cursor"
892891
cursor_field: created
893-
lookback_window_days: 2
894892
global_substream_cursor: true
895893
transformations:
896894
- type: RemoveFields

0 commit comments

Comments
 (0)