Skip to content

Commit 733147b

Browse files
committed
adapter: use glue API for schema and column introspection
1 parent 0fe5925 commit 733147b

File tree

4 files changed

+67
-59
lines changed

4 files changed

+67
-59
lines changed

dbt/adapters/athena/impl.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
import re
44
import boto3.session
55
from botocore.exceptions import ClientError
6-
from typing import Optional
6+
from typing import Optional, List
77

8-
from dbt.adapters.base import available
8+
from dbt.adapters.base import available, Column
99
from dbt.adapters.sql import SQLAdapter
1010
from dbt.adapters.athena import AthenaConnectionManager
1111
from dbt.adapters.athena.relation import AthenaRelation
1212
from dbt.events import AdapterLogger
13+
from dbt.contracts.relation import RelationType
1314
logger = AdapterLogger("Athena")
1415

1516
class AthenaAdapter(SQLAdapter):
@@ -170,3 +171,63 @@ def quote_seed_column(
170171
self, column: str, quote_config: Optional[bool]
171172
) -> str:
172173
return super().quote_seed_column(column, False)
174+
175+
def get_columns_in_relation(self, relation: AthenaRelation) -> List[Column]:
176+
conn = self.connections.get_thread_connection()
177+
creds = conn.credentials
178+
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)
179+
glue_client = session.client('glue')
180+
181+
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.identifier)
182+
return [Column(c["Name"], c["Type"]) for c in table["Table"]["StorageDescriptor"]["Columns"] + table["Table"]["PartitionKeys"]]
183+
184+
def list_schemas(self, database: str) -> List[str]:
185+
conn = self.connections.get_thread_connection()
186+
creds = conn.credentials
187+
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)
188+
glue_client = session.client('glue')
189+
paginator = glue_client.get_paginator("get_databases")
190+
191+
result = []
192+
logger.debug("CALL glue.get_databases()")
193+
for page in paginator.paginate():
194+
for db in page["DatabaseList"]:
195+
result.append(db["Name"])
196+
return result
197+
198+
def list_relations_without_caching(self, schema_relation: AthenaRelation) -> List[AthenaRelation]:
199+
conn = self.connections.get_thread_connection()
200+
creds = conn.credentials
201+
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)
202+
glue_client = session.client('glue')
203+
paginator = glue_client.get_paginator("get_tables")
204+
205+
result = []
206+
logger.debug("CALL glue.get_tables('{}')", schema_relation.schema)
207+
for page in paginator.paginate(DatabaseName=schema_relation.schema):
208+
for table in page["TableList"]:
209+
if table["TableType"] == "EXTERNAL_TABLE":
210+
table_type = RelationType.Table
211+
elif table["TableType"] == "VIRTUAL_VIEW":
212+
table_type = RelationType.View
213+
else:
214+
raise ValueError(f"Unknown TableType for {table['Name']}: {table['TableType']}")
215+
rel = AthenaRelation.create(schema=table["DatabaseName"], identifier=table["Name"], database=schema_relation.database, type=table_type)
216+
result.append(rel)
217+
218+
return result
219+
220+
def drop_relation(self, relation: AthenaRelation):
221+
conn = self.connections.get_thread_connection()
222+
creds = conn.credentials
223+
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)
224+
glue_client = session.client('glue')
225+
226+
logger.debug("CALL glue.delete_table({}, {})", relation.schema, relation.identifier)
227+
try:
228+
glue_client.delete_table(DatabaseName=relation.schema, Name=relation.identifier)
229+
except ClientError as e:
230+
if e.response['Error']['Code'] == 'EntityNotFoundException':
231+
logger.debug("Table '{}' does not exists - Ignoring", relation)
232+
else:
233+
raise
Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,3 @@
11
{% macro athena__get_columns_in_relation(relation) -%}
2-
{% call statement('get_columns_in_relation', fetch_result=True) %}
3-
4-
select
5-
column_name,
6-
data_type,
7-
null as character_maximum_length,
8-
null as numeric_precision,
9-
null as numeric_scale
10-
11-
from {{ relation.information_schema('columns') }}
12-
where LOWER(table_name) = LOWER('{{ relation.identifier }}')
13-
{% if relation.schema %}
14-
and LOWER(table_schema) = LOWER('{{ relation.schema }}')
15-
{% endif %}
16-
order by ordinal_position
17-
18-
{% endcall %}
19-
20-
{% set table = load_result('get_columns_in_relation').table %}
21-
{% do return(sql_convert_columns_in_relation(table)) %}
2+
{{ return(adapter.get_columns_in_relation(relation)) }}
223
{% endmacro %}

dbt/include/athena/macros/adapters/metadata.sql

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -79,42 +79,10 @@
7979

8080

8181
{% macro athena__list_schemas(database) -%}
82-
{% call statement('list_schemas', fetch_result=True) %}
83-
select
84-
distinct schema_name
85-
86-
from {{ information_schema_name(database) }}.schemata
87-
{% endcall %}
88-
{{ return(load_result('list_schemas').table) }}
82+
{{ return(adapter.list_schemas()) }}
8983
{% endmacro %}
9084

9185

9286
{% macro athena__list_relations_without_caching(schema_relation) %}
93-
{% call statement('list_relations_without_caching', fetch_result=True) -%}
94-
WITH views AS (
95-
select
96-
table_catalog as database,
97-
table_name as name,
98-
table_schema as schema
99-
from {{ schema_relation.information_schema() }}.views
100-
where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}')
101-
), tables AS (
102-
select
103-
table_catalog as database,
104-
table_name as name,
105-
table_schema as schema
106-
107-
from {{ schema_relation.information_schema() }}.tables
108-
where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}')
109-
110-
-- Views appear in both `tables` and `views`, so excluding them from tables
111-
EXCEPT
112-
113-
select * from views
114-
)
115-
select views.*, 'view' AS table_type FROM views
116-
UNION ALL
117-
select tables.*, 'table' AS table_type FROM tables
118-
{% endcall %}
119-
{% do return(load_result('list_relations_without_caching').table) %}
87+
{{ return(adapter.list_relations_without_caching(schema_relation)) }}
12088
{% endmacro %}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
{% macro athena__drop_relation(relation) -%}
22
{%- do adapter.clean_up_table(relation.schema, relation.table) -%}
3-
{% call statement('drop_relation', auto_begin=False) -%}
4-
drop {{ relation.type }} if exists {{ relation }}
5-
{%- endcall %}
3+
{{ return(adapter.drop_relation(relation)) }}
64
{% endmacro %}

0 commit comments

Comments
 (0)