Skip to content

Source-Stripe: Updating CBT to use invoices and customer parents #58633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Iterable, Mapping, Any, Optional, Set, Dict
from datetime import timedelta, datetime
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.types import StreamSlice
from collections import Counter

class CustomerIdPartitionRouter(SubstreamPartitionRouter):
def __init__(self, stream_state: Optional[Mapping[str, Any]] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._stream_state = stream_state
self.logger.info(f"[CustomerIdPartitionRouter] State after CustomStateMigration: {self._stream_state}")

def _extract_metadata(self, slice: Dict[str, Any]) -> tuple[Optional[str], Optional[str], float]:
extras = getattr(slice, "_extra_fields", {})
object_type = extras.get("object")
customer_id = slice.get("customer_id")
balance_or_total = extras.get("balance", 0) if object_type == "customer" else extras.get("total", 0)
return customer_id, object_type, balance_or_total

def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore
parent_slices = list(super().stream_slices())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you had to iterate through each of the substream partition configs

seen_customer_ids: Set[str] = set()
unique_slices = []
totals = Counter()

for slice in parent_slices:
customer_id, object_type, balance_or_total = self._extract_metadata(slice)

if object_type == "customer":
totals["customers"] += 1
elif object_type == "invoice":
totals["invoices"] += 1

if customer_id and customer_id not in seen_customer_ids and balance_or_total != 0:
seen_customer_ids.add(customer_id)
unique_slices.append(slice)
else:
totals["skipped"] += 1

self.logger.info(f"[CustomerIdPartitionRouter] Total parent slices: {len(parent_slices)}")
self.logger.info(f"[CustomerIdPartitionRouter] Total customer records: {totals['customers']}")
self.logger.info(f"[CustomerIdPartitionRouter] Total invoice records: {totals['invoices']}")
self.logger.info(f"[CustomerIdPartitionRouter] Unique slices emitted: {len(unique_slices)}")
self.logger.info(f"[CustomerIdPartitionRouter] Skipped records: {totals['skipped']}")

yield from unique_slices

class CustomerBalanceStateMigration(StateMigration):
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return "parent_state" in stream_state

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
current_state = stream_state or {}
state_created = int(current_state.get("state", {}).get("created", 0))
current_parent_state = current_state.get("parent_state", {})

two_days = int(timedelta(days=2).total_seconds())
fourteen_day_floor = int(datetime.utcnow().timestamp()) - int(timedelta(days=14).total_seconds())

created_customer_balance_transactions = max(state_created - two_days, fourteen_day_floor, 0)
updated_customers = max(0, current_parent_state.get("customers", {}).get("updated", 0) - two_days, fourteen_day_floor)
updated_invoices = max(0, current_parent_state.get("invoices", {}).get("updated", 0) - two_days, fourteen_day_floor)

migrated_state = {
"state": {
"created": created_customer_balance_transactions
},
"parent_state": {
"customers": {
"updated": updated_customers
},
"invoices": {
"updated": updated_invoices
}
},
"use_global_cursor": True
}

return migrated_state
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ definitions:
http_codes:
- 404
error_message: >-
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
you may safely ignore this warning.
bearer_authenticator:
type: BearerAuthenticator
Expand Down Expand Up @@ -860,33 +860,60 @@ definitions:
types[]: '{{["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"]}}'

customer_balance_transactions:
$ref: "#/definitions/entity_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: customer_balance_transactions
retriever:
$ref: "#/definitions/base_retriever"
$parameters:
path: customers/{{ stream_partition.customer_id }}/balance_transactions
path: customers/{{ stream_slice.customer_id }}/balance_transactions
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: source_stripe.components.CustomerIdPartitionRouter
parameters:
stream_state: "{{ stream_state }}"
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: customer_id
stream:
$ref: "#/definitions/streams/customers"
incremental_dependency: true
# This stream is not truly incremental on the child level.
# We're configuring it this way to support incremental syncs on the parent only,
# which helps reduce the size of the state.
record_selector:
extractor:
field_pointer: []
parameters:
inject_into: extra_fields
extra_fields:
- ["object"]
- ["balance"]
- ["id"]
- type: ParentStreamConfig
parent_key: customer
partition_field: customer_id
stream:
$ref: "#/definitions/streams/invoices"
incremental_dependency: true
record_selector:
extractor:
field_pointer: ["data"]
parameters:
inject_into: extra_fields
extra_fields:
- ["object"]
- ["total"]
- ["customer"]
state_migrations:
- type: CustomStateMigration
class_name: source_stripe.components.CustomerBalanceStateMigration
Comment on lines +906 to +908

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: created
global_substream_cursor: true
transformations:
- type: RemoveFields
field_pointers:
- - updated # Remove field that added in entity_stream
- - updated

payout_balance_transactions:
$ref: "#/definitions/entity_stream"
Expand Down
Loading