Skip to content

Commit e7a9fb1

Browse files
Updating CBT to use invoices and customer parents
1 parent 01cd166 commit e7a9fb1

File tree

2 files changed

+40
-8
lines changed

2 files changed

+40
-8
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import Iterable, Mapping, Any, Optional, List, Set
2+
3+
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
4+
from airbyte_cdk.models import SyncMode
5+
from airbyte_cdk.sources.declarative.types import StreamSlice
6+
7+
class CustomerIdPartitionRouter(SubstreamPartitionRouter):
8+
def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore
9+
parent_slices = list(super().stream_slices())
10+
seen_customer_ids: Set[str] = set()
11+
unique_slices = []
12+
13+
for slice in parent_slices:
14+
customer_id = slice.get("customer_id")
15+
if customer_id and customer_id not in seen_customer_ids:
16+
seen_customer_ids.add(customer_id)
17+
self.logger.info(f"[CustomerIdPartitionRouter] Unique slice for customer_id={customer_id}")
18+
unique_slices.append(slice)
19+
else:
20+
self.logger.info(f"[CustomerIdPartitionRouter] Skipping duplicate customer_id={customer_id}")
21+
22+
self.logger.info(f"[CustomerIdPartitionRouter] Emitting {len(unique_slices)} unique slices")
23+
yield from unique_slices

airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ definitions:
6767
http_codes:
6868
- 404
6969
error_message: >-
70-
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
71-
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
70+
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
71+
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
7272
you may safely ignore this warning.
7373
bearer_authenticator:
7474
type: BearerAuthenticator
@@ -308,6 +308,8 @@ definitions:
308308
$parameters:
309309
request_parameters:
310310
types[]: '{{["customer.created", "customer.updated", "customer.deleted"]}}'
311+
cursor_field: updated
312+
lookback_window_days: 1
311313

312314
subscriptions:
313315
type: StateDelegatingStream
@@ -352,6 +354,8 @@ definitions:
352354
$parameters:
353355
request_parameters:
354356
types[]: '{{["invoice.created", "invoice.deleted", "invoice.finalization_failed", "invoice.finalized", "invoice.marked_uncollectible", "invoice.overdue", "invoice.paid", "invoice.payment_action_required", "invoice.payment_failed", "invoice.payment_succeeded", "invoice.sent", "invoice.updated", "invoice.voided", "invoice.will_be_due"]}}'
357+
cursor_field: updated
358+
lookback_window_days: 1
355359

356360
transfers:
357361
type: StateDelegatingStream
@@ -866,27 +870,32 @@ definitions:
866870
retriever:
867871
$ref: "#/definitions/base_retriever"
868872
$parameters:
869-
path: customers/{{ stream_partition.customer_id }}/balance_transactions
873+
path: customers/{{ stream_slice.customer_id }}/balance_transactions
870874
partition_router:
871-
type: SubstreamPartitionRouter
875+
type: CustomPartitionRouter
876+
class_name: source_stripe.components.CustomerIdPartitionRouter
872877
parent_stream_configs:
873878
- type: ParentStreamConfig
874879
parent_key: id
875880
partition_field: customer_id
876881
stream:
877882
$ref: "#/definitions/streams/customers"
878883
incremental_dependency: true
879-
# This stream is not truly incremental on the child level.
880-
# We're configuring it this way to support incremental syncs on the parent only,
881-
# which helps reduce the size of the state.
884+
- type: ParentStreamConfig
885+
parent_key: customer
886+
partition_field: customer_id
887+
stream:
888+
$ref: "#/definitions/streams/invoices"
889+
incremental_dependency: true
882890
incremental_sync:
883891
$ref: "#/definitions/entity_single_slice_cursor"
884892
cursor_field: created
893+
lookback_window_days: 2
885894
global_substream_cursor: true
886895
transformations:
887896
- type: RemoveFields
888897
field_pointers:
889-
- - updated # Remove field that added in entity_stream
898+
- - updated
890899

891900
payout_balance_transactions:
892901
$ref: "#/definitions/entity_stream"

0 commit comments

Comments
 (0)