Skip to content

Source-Stripe: Updating CBT to use invoices and customer parents #58633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Iterable, Mapping, Any, Optional, List, Set
from datetime import timedelta, datetime
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.types import StreamSlice

class CustomerIdPartitionRouter(SubstreamPartitionRouter):
def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore
parent_slices = list(super().stream_slices())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you had to iterate through each of the substream partition configs

seen_customer_ids: Set[str] = set()
unique_slices = []

for slice in parent_slices:
customer_id = slice.get("customer_id")
if customer_id and customer_id not in seen_customer_ids:
seen_customer_ids.add(customer_id)
self.logger.info(f"[CustomerIdPartitionRouter] Unique slice for customer_id={customer_id}")
unique_slices.append(slice)
else:
self.logger.info(f"[CustomerIdPartitionRouter] Skipping duplicate customer_id={customer_id}")

self.logger.info(f"[CustomerIdPartitionRouter] Emitting {len(unique_slices)} unique slices")
yield from unique_slices

class CustomerBalanceStateMigration(StateMigration):
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
current_state = stream_state or {}
state_created = int(current_state.get("state", {}).get("created", 0))
current_parent_state = current_state.get("parent_state", {})

two_days = int(timedelta(days=2).total_seconds())
fourteen_day_floor = int(datetime.utcnow().timestamp()) - int(timedelta(days=14).total_seconds())

created_customer_balance_transactions = max(state_created - two_days, fourteen_day_floor, 0)
updated_customers = max(0, current_parent_state.get("customers", {}).get("updated", 0) - two_days, fourteen_day_floor)
updated_invoices = max(0, current_parent_state.get("invoices", {}).get("updated", 0) - two_days, fourteen_day_floor)

migrated_state = {
"state": {
"created": created_customer_balance_transactions
},
"parent_state": {
"customers": {
"updated": updated_customers
},
"invoices": {
"updated": updated_invoices
}
},
"use_global_cursor": True
}

return migrated_state
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ definitions:
http_codes:
- 404
error_message: >-
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting
child attributes like /v1/checkout/sessions/<session_id>/line_items when running the incremental sync,
you may safely ignore this warning.
bearer_authenticator:
type: BearerAuthenticator
Expand Down Expand Up @@ -860,33 +860,40 @@ definitions:
types[]: '{{["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"]}}'

customer_balance_transactions:
$ref: "#/definitions/entity_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: customer_balance_transactions
retriever:
$ref: "#/definitions/base_retriever"
$parameters:
path: customers/{{ stream_partition.customer_id }}/balance_transactions
path: customers/{{ stream_slice.customer_id }}/balance_transactions
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: source_stripe.components.CustomerIdPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: customer_id
stream:
$ref: "#/definitions/streams/customers"
incremental_dependency: true
# This stream is not truly incremental on the child level.
# We're configuring it this way to support incremental syncs on the parent only,
# which helps reduce the size of the state.
- type: ParentStreamConfig
parent_key: customer
partition_field: customer_id
stream:
$ref: "#/definitions/streams/invoices"
incremental_dependency: true
state_migrations:
- type: CustomStateMigration
class_name: source_stripe.components.CustomerBalanceStateMigration
Comment on lines +906 to +908

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: created
global_substream_cursor: true
transformations:
- type: RemoveFields
field_pointers:
- - updated # Remove field that added in entity_stream
- - updated

payout_balance_transactions:
$ref: "#/definitions/entity_stream"
Expand Down
Loading