diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/components.py b/airbyte-integrations/connectors/source-stripe/source_stripe/components.py new file mode 100644 index 0000000000000..d7d2b7fcf8e4b --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/components.py @@ -0,0 +1,80 @@ +from typing import Iterable, Mapping, Any, Optional, Set, Dict +from datetime import timedelta, datetime +from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration +from airbyte_cdk.sources.declarative.types import StreamSlice +from collections import Counter + +class CustomerIdPartitionRouter(SubstreamPartitionRouter): + def __init__(self, stream_state: Optional[Mapping[str, Any]] = None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._stream_state = stream_state + self.logger.info(f"[CustomerIdPartitionRouter] State after CustomStateMigration: {self._stream_state}") + + def _extract_metadata(self, slice: Dict[str, Any]) -> tuple[Optional[str], Optional[str], float]: + extras = getattr(slice, "_extra_fields", {}) + object_type = extras.get("object") + customer_id = slice.get("customer_id") + balance_or_total = extras.get("balance", 0) if object_type == "customer" else extras.get("total", 0) + return customer_id, object_type, balance_or_total + + def stream_slices(self) -> Iterable[StreamSlice]: # type: ignore + parent_slices = list(super().stream_slices()) + seen_customer_ids: Set[str] = set() + unique_slices = [] + totals = Counter() + + for slice in parent_slices: + customer_id, object_type, balance_or_total = self._extract_metadata(slice) + + if object_type == "customer": + totals["customers"] += 1 + elif object_type == "invoice": + totals["invoices"] += 1 + + if customer_id and customer_id not in seen_customer_ids and balance_or_total != 0: + seen_customer_ids.add(customer_id) + unique_slices.append(slice) + else: + totals["skipped"] += 1 + + self.logger.info(f"[CustomerIdPartitionRouter] Total parent slices: {len(parent_slices)}") + self.logger.info(f"[CustomerIdPartitionRouter] Total customer records: {totals['customers']}") + self.logger.info(f"[CustomerIdPartitionRouter] Total invoice records: {totals['invoices']}") + self.logger.info(f"[CustomerIdPartitionRouter] Unique slices emitted: {len(unique_slices)}") + self.logger.info(f"[CustomerIdPartitionRouter] Skipped records: {totals['skipped']}") + + yield from unique_slices + +class CustomerBalanceStateMigration(StateMigration): + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: + return "parent_state" in stream_state + + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + current_state = stream_state or {} + state_created = int(current_state.get("state", {}).get("created", 0)) + current_parent_state = current_state.get("parent_state", {}) + + two_days = int(timedelta(days=2).total_seconds()) + fourteen_day_floor = int(datetime.utcnow().timestamp()) - int(timedelta(days=14).total_seconds()) + + created_customer_balance_transactions = max(state_created - two_days, fourteen_day_floor, 0) + updated_customers = max(0, current_parent_state.get("customers", {}).get("updated", 0) - two_days, fourteen_day_floor) + updated_invoices = max(0, current_parent_state.get("invoices", {}).get("updated", 0) - two_days, fourteen_day_floor) + + migrated_state = { + "state": { + "created": created_customer_balance_transactions + }, + "parent_state": { + "customers": { + "updated": updated_customers + }, + "invoices": { + "updated": updated_invoices + } + }, + "use_global_cursor": True + } + + return migrated_state diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml index dad9899af3327..2dbdebc8e0311 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml @@ -67,8 +67,8 @@ definitions: http_codes: - 404 error_message: >- - Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting - child attributes like /v1/checkout/sessions//line_items when running the incremental sync, + Data was not found. Error message: {{ response['error']['message'] }} If this is a path for getting + child attributes like /v1/checkout/sessions//line_items when running the incremental sync, you may safely ignore this warning. bearer_authenticator: type: BearerAuthenticator @@ -860,15 +860,18 @@ definitions: types[]: '{{["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"]}}' customer_balance_transactions: - $ref: "#/definitions/entity_stream" + $ref: "#/definitions/base_stream" $parameters: name: customer_balance_transactions retriever: $ref: "#/definitions/base_retriever" $parameters: - path: customers/{{ stream_partition.customer_id }}/balance_transactions + path: customers/{{ stream_slice.customer_id }}/balance_transactions partition_router: - type: SubstreamPartitionRouter + type: CustomPartitionRouter + class_name: source_stripe.components.CustomerIdPartitionRouter + parameters: + stream_state: "{{ stream_state }}" parent_stream_configs: - type: ParentStreamConfig parent_key: id @@ -876,9 +879,33 @@ definitions: stream: $ref: "#/definitions/streams/customers" incremental_dependency: true - # This stream is not truly incremental on the child level. - # We're configuring it this way to support incremental syncs on the parent only, - # which helps reduce the size of the state. + record_selector: + extractor: + field_pointer: [] + parameters: + inject_into: extra_fields + extra_fields: + - ["object"] + - ["balance"] + - ["id"] + - type: ParentStreamConfig + parent_key: customer + partition_field: customer_id + stream: + $ref: "#/definitions/streams/invoices" + incremental_dependency: true + record_selector: + extractor: + field_pointer: ["data"] + parameters: + inject_into: extra_fields + extra_fields: + - ["object"] + - ["total"] + - ["customer"] + state_migrations: + - type: CustomStateMigration + class_name: source_stripe.components.CustomerBalanceStateMigration incremental_sync: $ref: "#/definitions/entity_single_slice_cursor" cursor_field: created @@ -886,7 +913,7 @@ definitions: transformations: - type: RemoveFields field_pointers: - - - updated # Remove field that added in entity_stream + - - updated payout_balance_transactions: $ref: "#/definitions/entity_stream"