Skip to content

Commit a23cfe8

Browse files
committed
feat: support multiple streams from source
1 parent 98b96d2 commit a23cfe8

File tree

7 files changed

+192
-332
lines changed

7 files changed

+192
-332
lines changed

airbyte-integrations/connectors/destination-glide/destination_glide/destination.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
logger.setLevel(LOG_LEVEL_DEFAULT)
3030

3131

32-
def mapJsonSchemaTypeToGlideType(json_type: str) -> str:
32+
def airbyteTypeToGlideType(json_type: str) -> str:
3333
jsonSchemaTypeToGlideType = {
3434
"string": "string",
3535
"number": "number",
@@ -75,37 +75,39 @@ def write(
7575
api_host = config.get('api_host', CONFIG_GLIDE_API_HOST_DEFAULT)
7676
api_path_root = config['api_path_root']
7777
api_key = config['api_key']
78-
table_id = config['table_id']
7978
glide_api_version = config.get(
8079
'glide_api_version', CONFIG_GLIDE_API_VERSION_DEFAULT)
8180

81+
# configure the table based on the stream catalog:
8282
# choose a strategy based on config:
83-
glide = GlideBigTableFactory.create(glide_api_version)
84-
logger.debug(f"Using glide api strategy '{glide.__class__.__name__}' for glide_api_version '{glide_api_version}'.") # nopep8
85-
glide.init(api_host, api_key, api_path_root, table_id)
8683

87-
if len(configured_catalog.streams) > 1:
88-
# TODO: Consider how we might want to support multiple streams. Review other airbyte destinations
89-
raise Exception(f"The Glide destination expects only a single stream to be streamed into a Glide Table but found '{len(configured_catalog.streams)}' streams. Please select only one stream from the source.") # nopep8
84+
def create_table_client_for_stream(stream_name):
85+
# TODO: sanitize stream_name chars and length for GBT name
86+
glide = GlideBigTableFactory.create(glide_api_version)
87+
logger.debug(f"Using glide api strategy '{glide.__class__.__name__}' for glide_api_version '{glide_api_version}'.") # nopep8
88+
glide.init(api_host, api_key, api_path_root, stream_name)
89+
return glide
9090

91-
# configure the table based on the stream catalog:
91+
table_clients = {}
9292
stream_names = {s.stream.name for s in configured_catalog.streams}
9393
for configured_stream in configured_catalog.streams:
9494
if configured_stream.destination_sync_mode != DestinationSyncMode.overwrite:
9595
raise Exception(f'Only destination sync mode overwrite is supported, but received "{configured_stream.destination_sync_mode}".') # nopep8 because https://github.com/hhatto/autopep8/issues/712
9696

97+
glide = create_table_client_for_stream(
98+
configured_stream.stream.name)
9799
# upsert the GBT with schema to set_schema for dumping the data into it
98100
columns = []
99101
properties = configured_stream.stream.json_schema["properties"]
100-
for prop_name in properties.keys():
101-
prop = properties[prop_name]
102+
for (prop_name, prop) in properties.items():
102103
prop_type = prop["type"]
103-
prop_format = prop["format"] if "format" in prop else ""
104-
logger.debug(f"Found column/property '{prop_name}' with type '{prop_type}' and format '{prop_format}' in stream {configured_stream.stream.name}.") # nopep8
104+
logger.debug(f"Found column/property '{prop_name}' with type '{prop_type}' in stream {configured_stream.stream.name}.") # nopep8
105105
columns.append(
106-
Column(prop_name, mapJsonSchemaTypeToGlideType(prop_type)))
106+
Column(prop_name, airbyteTypeToGlideType(prop_type))
107+
)
107108

108109
glide.set_schema(columns)
110+
table_clients[configured_stream.stream.name] = glide
109111

110112
# stream the records into the GBT:
111113
buffers = defaultdict(list)
@@ -120,7 +122,6 @@ def write(
120122
f"Stream {stream_name} was not present in configured streams, skipping")
121123
continue
122124

123-
# TODO: check the columns match the columns that we saw in configured_catalog per https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#destination
124125
# add to buffer
125126
record_data = message.record.data
126127
record_id = str(uuid.uuid4())
@@ -133,24 +134,32 @@ def write(
133134
# `Type.State` is a signal from the source that we should save the previous batch of `Type.RECORD` messages to the destination.
134135
# It is a checkpoint that enables partial success.
135136
# See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#state--checkpointing
136-
logger.info(f"Writing buffered records to Glide API from {len(buffers.keys())} streams...") # nopep8 because
137+
logger.info(f"Writing buffered records to Glide API from {len(buffers.keys())} streams...") # nopep8
137138
for stream_name in buffers.keys():
138139
stream_buffer = buffers[stream_name]
139-
logger.info(f"Saving buffered records to Glide API (stream: '{stream_name}' count: '{len(stream_buffer)}')...") # nopep8 because https://github.com/hhatto/autopep8/issues/712
140+
logger.info(f"Saving buffered records to Glide API (stream: '{stream_name}', record count: '{len(stream_buffer)}')...") # nopep8
140141
DATA_INDEX = 2
141142
data_rows = [row_tuple[DATA_INDEX]
142143
for row_tuple in stream_buffer]
143144
if len(data_rows) > 0:
145+
if stream_name not in table_clients:
146+
raise Exception(
147+
f"Stream '{stream_name}' not found in table_clients")
148+
glide = table_clients[stream_name]
144149
glide.add_rows(data_rows)
145150
stream_buffer.clear()
146151
logger.info(f"Saving buffered records to Glide API complete.") # nopep8 because https://github.com/hhatto/autopep8/issues/712
147152

153+
# dump all buffers now as we just wrote them to the table:
154+
buffers = defaultdict(list)
148155
yield message
149156
else:
150157
logger.warn(f"Ignoring unknown Airbyte input message type: {message.type}") # nopep8 because https://github.com/hhatto/autopep8/issues/712
151158

152159
# commit the stash to the table
153-
glide.commit()
160+
for stream_name, glide in table_clients.items():
161+
glide.commit()
162+
logger.info(f"Committed stream '{stream_name}' to Glide.")
154163

155164
pass
156165

airbyte-integrations/connectors/destination-glide/destination_glide/glide.py

Lines changed: 57 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,15 @@ def url(self, path: str) -> str:
6464
The protocol is to call `init`, `set_schema`, `add_rows` one or more times, and `commit` in that order.
6565
"""
6666

67-
def init(self, api_host, api_key, api_path_root, table_id):
67+
def init(self, api_host, api_key, api_path_root, table_name):
6868
"""
6969
Sets the connection information for the table.
7070
"""
7171
self.api_host = api_host
7272
self.api_key = api_key
7373
self.api_path_root = api_path_root
74-
self.table_id = table_id
74+
self.table_name = table_name
7575
# todo: validate args
76-
pass
7776

7877
@abstractmethod
7978
def set_schema(self, columns: List[Column]) -> None:
@@ -108,8 +107,7 @@ def create(cls, strategy: str) -> GlideBigTableBase:
108107
Creates a new instance of the default implementation for the GlideBigTable API client.
109108
"""
110109
implementation_map = {
111-
"tables": GlideBigTableRestStrategy(),
112-
"mutations": GlideBigTableMutationsStrategy()
110+
"tables": GlideBigTableRestStrategy()
113111
}
114112
if strategy not in implementation_map:
115113
raise ValueError(f"Strategy '{strategy}' not found. Expected one of '{implmap.keys()}'.") # nopep8
@@ -135,13 +133,13 @@ def set_schema(self, columns: List[Column]) -> None:
135133
self.columns = columns
136134
# Create stash we can stash records into for later
137135
r = requests.post(
138-
self.url(f"/stashes"),
136+
self.url(f"stashes"),
139137
headers=self.headers(),
140138
)
141139
try:
142140
r.raise_for_status()
143141
except Exception as e:
144-
raise Exception(f"failed to create stash") from e # nopep8
142+
raise Exception(f"failed to create stash. Response was '{r.text}'") from e # nopep8
145143

146144
result = r.json()
147145
self.stash_id = result["data"]["stashID"]
@@ -157,7 +155,7 @@ def _add_row_batch(self, rows: List[BigTableRow]) -> None:
157155
# TODO: add rows to stash/serial https://web.postman.co/workspace/glideapps-Workspace~46b48d24-5fc1-44b6-89aa-8d6751db0fc5/request/9026518-c282ef52-4909-4806-88bf-08510ee80770
158156
logger.debug(f"Adding rows batch with size {len(rows)}")
159157
r = requests.post(
160-
self.url(f"/stashes/{self.stash_id}/{self.stash_serial}"),
158+
self.url(f"stashes/{self.stash_id}/{self.stash_serial}"),
161159
headers=self.headers(),
162160
json={
163161
"data": rows,
@@ -170,9 +168,9 @@ def _add_row_batch(self, rows: List[BigTableRow]) -> None:
170168
try:
171169
r.raise_for_status()
172170
except Exception as e:
173-
raise Exception(f"failed to add rows batch for serial '{self.stash_serial}'") from e # nopep8
171+
raise Exception(f"failed to add rows batch for serial '{self.stash_serial}'. Response was '{r.text}'") from e # nopep8
174172

175-
logger.info(f"Add rows batch for serial '{self.stash_serial}' succeeded.") # nopep8
173+
logger.info(f"Added {len(rows)} rows as batch for serial '{self.stash_serial}' successfully.") # nopep8
176174
self.stash_serial += 1
177175

178176
def add_rows(self, rows: Iterator[BigTableRow]) -> None:
@@ -184,14 +182,15 @@ def add_rows(self, rows: Iterator[BigTableRow]) -> None:
184182
batch = rows[i:i + min(BATCH_SIZE, len(rows) - i)]
185183
self._add_row_batch(batch)
186184

187-
def finalize_stash(self) -> None:
188-
# overwrite the existing table with the right schema and rows:
189-
r = requests.put(
190-
self.url(f"/tables/{self.table_id}"),
185+
def create_table_from_stash(self) -> None:
186+
logger.info(f"Creating new table for table name '{self.table_name}'...") # nopep8
187+
r = requests.post(
188+
self.url(f"tables"),
191189
headers=self.headers(),
192190
json={
191+
"name": self.table_name,
193192
"schema": {
194-
"columns": self.columns,
193+
"columns": self.columns
195194
},
196195
"rows": {
197196
"$stashID": self.stash_id
@@ -201,161 +200,56 @@ def finalize_stash(self) -> None:
201200
try:
202201
r.raise_for_status()
203202
except Exception as e:
204-
raise Exception(f"failed to finalize stash") from e # nopep8
205-
logger.info(f"Successfully finalized record stash for table '{self.table_id}' (stash ID '{self.stash_id}')")
206-
207-
def commit(self) -> None:
208-
self.raise_if_set_schema_not_called()
209-
self.finalize_stash()
203+
raise Exception(f"failed to create table '{self.table_name}'. Response was '{r.text}'.") from e # nopep8
210204

205+
logger.info(f"Creating table '{self.table_name}' succeeded.")
211206

212-
class GlideBigTableMutationsStrategy(GlideBigTableBase):
213-
def __init__(self):
214-
# TODO: hardcoded for now using old api
215-
self.hardcoded_app_id = "Ix9CEuP6DiFugfjhSG5t"
216-
self.hardcoded_column_lookup = {
217-
'_airtable_id': {'type': "string", 'name': "Name"},
218-
'_airtable_created_time': {'type': "date-time", 'name': "AwkFL"},
219-
'_airtable_table_name': {'type': "string", 'name': "QF0zI"},
220-
'id': {'type': "string", 'name': "tLPjZ"},
221-
'name': {'type': "string", 'name': "1ZqF1"},
222-
'host_id': {'type': "string", 'name': "B7fYe"},
223-
'host_name': {'type': "string", 'name': "oyVzO"},
224-
'neighbourhood_group': {'type': "string", 'name': "15J8U"},
225-
'neighbourhood': {'type': "string", 'name': "Fy28U"},
226-
'latitude': {'type': "number", 'name': "TLpMC"},
227-
'longitude': {'type': "number", 'name': "oazQO"},
228-
'room_type': {'type': "string", 'name': "TPJDZ"},
229-
'price': {'type': "number", 'name': "7xzlG"},
230-
'minimum_nights': {'type': "number", 'name': "usoY5"},
231-
'number_of_reviews': {'type': "number", 'name': "XFXmR"},
232-
'last_review': {'type': "date-time", 'name': "oseZl"},
233-
'reviews_per_month': {'type': "number", 'name': "alw2R"},
234-
'calculated_host_listings_count': {'type': "number", 'name': "hKws0"},
235-
'availability_365': {'type': "number", 'name': "qZsgl"},
236-
'number_of_reviews_ltm': {'type': "number", 'name': "rWisS"},
237-
'license': {'type': "string", 'name': "7PVig"}
238-
}
239-
240-
def headers(self) -> Dict[str, str]:
241-
return {
242-
"Content-Type": "application/json",
243-
f"Authorization": f"Bearer {self.api_key}"
244-
}
245-
246-
def url(self, path: str) -> str:
247-
return f"{self.api_host}/{self.api_path_root}/{path}"
248-
249-
def set_schema(self, columns: List[Column]) -> None:
250-
logger.debug(f"set_schema for table '{self.table_id}. Expecting columns: '{[c.id for c in columns]}'.") # nopep8
251-
self.delete_all()
252-
253-
for col in columns:
254-
if col.id not in self.hardcoded_column_lookup:
255-
logger.warning(
256-
f"Column '{col.id}' not found in hardcoded column lookup. Will be ignored.")
257-
258-
def rows(self) -> Iterator[BigTableRow]:
259-
"""
260-
Gets the rows as of the Glide Big Table.
261-
"""
262-
r = requests.post(
263-
self.url("function/queryTables"),
207+
def overwrite_table_from_stash(self, table_id) -> None:
208+
# overwrite the specified table's schema and rows with the stash:
209+
r = requests.put(
210+
self.url(f"tables/{table_id}"),
264211
headers=self.headers(),
265212
json={
266-
"appID": self.hardcoded_app_id,
267-
"queries": [
268-
{
269-
"tableName": self.table_id,
270-
"utc": True
271-
}
272-
]
273-
}
274-
)
275-
if r.status_code != 200:
276-
logger.error(f"get rows request failed with status {r.status_code}: {r.text}.") # nopep8 because https://github.com/hhatto/autopep8/issues/712
277-
r.raise_for_status()
278-
279-
result = r.json()
280-
281-
# the result looks like an array of results; each result has a rows member that has an array or JSON rows:
282-
for row in result:
283-
for r in row['rows']:
284-
yield r
285-
286-
def delete_all(self) -> None:
287-
# TODO: perf: don't put these in a list
288-
rows = list(self.rows())
289-
logger.debug(f"Iterating over {len(rows)} rows to delete")
290-
291-
for row in rows:
292-
# TODO: lame. batch these into one request with multiple mutations
293-
r = requests.post(
294-
self.url("function/mutateTables"),
295-
headers=self.headers(),
296-
json={
297-
"appID": self.hardcoded_app_id,
298-
"mutations": [
299-
{
300-
"kind": "delete-row",
301-
"tableName": self.table_id,
302-
"rowID": row['$rowID']
303-
}
304-
]
305-
}
306-
)
307-
if r.status_code != 200:
308-
logger.error(f"delete request failed with status {r.status_code}: {r.text} trying to delete row id {row['$rowID']} with row: {row}") # nopep8 because https://github.com/hhatto/autopep8/issues/712
309-
r.raise_for_status()
310-
else:
311-
logger.debug(
312-
f"Deleted row successfully (rowID:'{row['$rowID']}'")
313-
314-
def add_rows_batch(self, rows: Iterator[BigTableRow]) -> None:
315-
mutations = []
316-
for row in rows:
317-
# row is columnLabel -> value, but glide's mutate uses a column "name". We hard-code the lookup for our table here:
318-
mutated_row = dict()
319-
for k, v in row.items():
320-
if k in self.hardcoded_column_lookup:
321-
col_info = self.hardcoded_column_lookup[k]
322-
mutated_row[col_info["name"]] = v
323-
else:
324-
logger.error(
325-
f"Column {k} not found in column lookup. Ignoring column")
326-
327-
mutations.append(
328-
{
329-
"kind": "add-row-to-table",
330-
"tableName": self.table_id,
331-
"columnValues": mutated_row
213+
"schema": {
214+
"columns": self.columns,
215+
},
216+
"rows": {
217+
"$stashID": self.stash_id
332218
}
333-
)
334-
r = requests.post(
335-
self.url("function/mutateTables"),
336-
headers=self.headers(),
337-
json={
338-
"appID": self.hardcoded_app_id,
339-
"mutations": mutations
340219
}
341220
)
342-
if r.status_code != 200:
343-
logger.error(f"add rows failed with status {r.status_code}: {r.text}") # nopep8 because https://github.com/hhatto/autopep8/issues/712
221+
try:
344222
r.raise_for_status()
345-
346-
def add_rows(self, rows: Iterator[BigTableRow]) -> None:
347-
BATCH_SIZE = 100
348-
349-
batch = []
350-
for row in rows:
351-
batch.append(row)
352-
if len(batch) >= BATCH_SIZE:
353-
self.add_rows_batch(batch)
354-
batch = []
355-
356-
if len(batch) > 0:
357-
self.add_rows_batch(batch)
223+
except Exception as e:
224+
raise Exception(f"failed to overwrite table '{table_id}'. Response was '{r.text}'") from e # nopep8
358225

359226
def commit(self) -> None:
360-
logger.debug("commit table (noop).")
361-
pass
227+
self.raise_if_set_schema_not_called()
228+
# first see if the table already exists
229+
r = requests.get(
230+
self.url(f"tables"),
231+
headers=self.headers()
232+
)
233+
try:
234+
r.raise_for_status()
235+
except Exception as e:
236+
raise Exception(f"Failed to get table list. Response was '{r.text}'.") from e # nopep8
237+
238+
found_table_id = None
239+
# confirm if table exists:
240+
body = r.json()
241+
if "data" not in body:
242+
raise Exception(f"get tables response did not include data in body. Status was: {r.status_code}: {r.text}.") # nopep8
243+
244+
for table in body["data"]:
245+
if table["name"] == self.table_name:
246+
found_table_id = table["id"]
247+
logger.info(f"Found existing table to reuse for table name '{self.table_name}' with ID '{found_table_id}'.") # nopep8
248+
break
249+
250+
if found_table_id != None:
251+
self.overwrite_table_from_stash(found_table_id)
252+
else:
253+
self.create_table_from_stash()
254+
255+
logger.info(f"Successfully committed record stash for table '{self.table_name}' (stash ID '{self.stash_id}')") # nopep8

0 commit comments

Comments
 (0)