Skip to content

Commit 96300c5

Browse files
committed
Python Shell dag and README.md Update
Updated the python shell dag script to show how the data was extracted to the source bucket and how it would be moved to the bronze tier from the source bucket. Also updated README.md file with project outline going forward.
1 parent 142eb26 commit 96300c5

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ In this project, raw, untransformed data resides in external databases and is in
1616
into a bronze tier S3 bucket. The pipeline works on the raw data, processing it, and subsequently storing it in
1717
the appropriate data lake tier as determined by business requirements. The tiers are represented as folders within
1818
a single S3 bucket for this project. However, each tier should be given a dedicated bucket (as it is in production
19-
environments).
19+
environments).
2020

2121
An orchestrator triggers the extraction of the ingested raw and untransformed data into the bronze tier S3 where
2222
it is then moved on to the other tiers, getting enriched as it gets refined up the data lake tiers.
2323

2424
An understanding of creating and assigning IAM roles is required as the AWS resources used are configured in such
2525
a way to interact with one another, i.e., a role that grants permission to let AWS Glue access the resources it needs.
2626
The degree of restrictions can be narrowed down based on the level of security needed. For this pipeline, I assigned
27-
an IAM role to access AWS Glue resources and read/write to AWS S3 and Redshift.
27+
an IAM role to access AWS Glue resources and read/write to AWS S3 and Redshift.
2828

2929
## Medallion Architecture
3030

@@ -41,7 +41,7 @@ Each tier houses its own schemas and tables, which differ based on data update f
4141
This multi-layered approach ensures data integrity and optimizes its use for business needs.
4242

4343

44-
##
44+
## Pipeline Steps
4545

4646
### Ingesting the Data From Source
4747

@@ -65,4 +65,11 @@ data, into the bronze layer, in a separate bucket (usually more secure with more
6565
source bucket).
6666

6767

68+
### Creating the Silver Data Tier
69+
70+
The silver data tier takes the data a step further in its refinement as the data passes through extensive cleanup and
71+
validation. The silver tier sees the datatype standardization, filling and/or removal of null values, creation of
72+
desirable datatypes, detection and removal of duplicates, and to a certain degree, some data aggregation as some facts
73+
and dimension tables could be merged in the silver tier to allow downstream users utilize the data.
74+
6875

dags_python_shell.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
config.read('config.ini')
99

1010
# Extract tables from Source to Bronze Bucket
11-
def extract_to_bronze_tier(db_path=config['DATABASE']['PATH'],
11+
def extract_to_source(db_path=config['DATABASE']['PATH'],
1212
tables=eval(config.get('DATABASE', 'TABLES'))):
1313

1414
""" Extracts tables from a SQLite database and uploads them to an S3 bucket as CSV files.
@@ -36,10 +36,52 @@ def extract_to_bronze_tier(db_path=config['DATABASE']['PATH'],
3636
s3_conn = S3Buckets.credentials(config['AWS_ACCESS']['REGION'])
3737
s3_conn.upload_dataframe_to_s3(df = df,
3838
bucket_name=config['AWS_ACCESS']['PROJECT_BUCKET'],
39-
object_name=f'bronze/bronze_{table}.csv')
39+
object_name=f'source/{table}.csv')
4040
print(f"The {table} table was uploaded")
4141

4242
return f'The tables were was uploaded to the S3 Bucket'
4343

44+
45+
# Extraction to Bronze Tier from Source
46+
def extract_category_to_bronze(table='category'):
47+
48+
# Connect to AWS S3 and upload Extracted Dataframe as CSV file
49+
s3_conn = S3Buckets.credentials(config['AWS_ACCESS']['REGION'])
50+
s3_conn.upload_dataframe_to_s3(df=df,
51+
bucket_name=config['AWS_ACCESS']['PROJECT_BUCKET'],
52+
object_name=f'source/{table}.csv')
53+
print(f"The {table} table was uploaded")
54+
pass
55+
56+
def extract_date_to_bronze():
57+
pass
58+
59+
def extract_event_to_bronze():
60+
pass
61+
62+
def extract_listing_to_bronze():
63+
pass
64+
65+
def extract_sale_to_bronze():
66+
pass
67+
68+
def extract_user_to_bronze():
69+
pass
70+
71+
def extract_venue_to_bronze():
72+
pass
73+
74+
75+
76+
4477
def transform_to_silver_tier():
78+
pass
79+
80+
def transform_cat_eve_ven():
81+
pass
82+
83+
def transform_date_list_sales():
84+
pass
85+
86+
def transform_date_list_users():
4587
pass

0 commit comments

Comments
 (0)