Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
config.py

archive

archive-hidden

archive-hidden-again
.venv
.idea
.DS_Store
3 changes: 0 additions & 3 deletions .idea/.gitignore

This file was deleted.

1 change: 0 additions & 1 deletion .idea/.name

This file was deleted.

10 changes: 0 additions & 10 deletions .idea/github-talent-ETL-GCP.iml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/inspectionProfiles/profiles_settings.xml

This file was deleted.

7 changes: 0 additions & 7 deletions .idea/misc.xml

This file was deleted.

8 changes: 0 additions & 8 deletions .idea/modules.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/vcs.xml

This file was deleted.

230 changes: 89 additions & 141 deletions google_functions/1_insert_fetched_repos/main.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
import psycopg2
import json
import requests
import logging
import os
from datetime import timedelta
from typing import Any

import psycopg2
import requests
from google.cloud import secretmanager
from datetime import datetime, timedelta

# Configurations
PROJECT_ID = os.getenv("PROJECT_ID", "githubtalent-434920")
GITHUB_SECRET_NAME = os.getenv("GITHUB_SECRET_NAME", "github-search-secret")
DB_SECRET_NAME = os.getenv("DB_SECRET_NAME", "database-crud-secret")

def get_pat_from_secret_manager(version_id="latest"):
secrets_name = "github-search-secret"
client = secretmanager.SecretManagerServiceClient()
project_id = "githubtalent-434920"
name = f"projects/{project_id}/secrets/{secrets_name}/versions/{version_id}"
http_response = client.access_secret_version(request={"name": name})
json_payload = http_response.payload.data.decode("UTF-8")
return json_payload
# Logger setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def get_db_credentials_from_secret_manager(version_id="latest"):
secrets_name = "database-crud-secret"
def get_secret(secret_name: str, version_id: str = "latest") -> str:
"""Retrieve a secret from Google Cloud Secret Manager."""
client = secretmanager.SecretManagerServiceClient()
project_id = "githubtalent-434920"
name = f"projects/{project_id}/secrets/{secrets_name}/versions/{version_id}"
http_response = client.access_secret_version(request={"name": name})
json_payload = http_response.payload.data.decode("UTF-8")
return json_payload # this was validated in form by older script

name = f"projects/{PROJECT_ID}/secrets/{secret_name}/versions/{version_id}"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")

def call_github_search(pat: str):
url = "https://api.github.com/search/repositories?q=pushed%3A2024-09-02T00%3A00%3A00..2024-09-02T00%3A01%3A00+is%3Apublic+-fork%3Atrue&per_page=100&page=1"
print(f"PRINTING default url: {url}")

db_credentials_not_json = get_db_credentials_from_secret_manager()
db_credentials = json.loads(db_credentials_not_json)
def get_db_credentials() -> dict:
"""Get database credentials from Secret Manager."""
db_credentials_not_json = get_secret(DB_SECRET_NAME)
return json.loads(db_credentials_not_json)

# intentionally leaving potentially duplicative code in here until I prove the automation works

def create_db_connection() -> Any:
"""Create a database connection using credentials stored in Secret Manager."""
db_credentials = get_db_credentials()
try:
connection = psycopg2.connect(
host=db_credentials['DB_HOST'],
Expand All @@ -43,76 +43,75 @@ def call_github_search(pat: str):
user=db_credentials['DB_USER'],
password=db_credentials['DB_PASS']
)
print("PRINTING connection temporarily opened!")
except:
print("PRINTING connection FAILED creation")
logger.info("Connection successfully created")
return connection
except Exception as e:
logger.error(f"Failed to create connection: {e}")
raise

cursor = connection.cursor()
cursor.execute("SELECT last_call FROM task_scheduling WHERE function_name = %s;", ("scheduled_get_repos",))

current_call_time = cursor.fetchone()[0]

current_end_time = current_call_time + timedelta(
minutes=1) # we are only adding 1 minute at a time to get ~70 results per search. If we had done a full hour, it would be 5000 hits, well over the 1000 result limit set by GitHub API

start_time = current_call_time.strftime("%Y-%m-%dT%H:%M:%S")

end_time = current_end_time.strftime("%Y-%m-%dT%H:%M:%S")

next_call_time = current_call_time + timedelta(hours=1)

# replacing the search window with an incremented search window in the database
def update_task_schedule(cursor: Any, next_call_time: str) -> None:
"""Update the last call time for the scheduled task."""
cursor.execute(
"UPDATE task_scheduling SET last_call = %s WHERE function_name = %s;",
(next_call_time, "scheduled_get_repos")
)
connection.commit()

cursor.close()
connection.close()

def call_github_search(pat, start_time, end_time):
"""Call GitHub search API with the given PAT and time window."""
url = f"https://api.github.com/search/repositories?q=pushed%3A{start_time}..{end_time}+is%3Apublic+-fork%3Atrue&per_page=100&page=1"
headers = {
"Authorization": f"Bearer {pat}",
"Accept": "application/vnd.github.v3+json"
}
http_response_object = requests.get(url,
headers=headers) # WARNING, this is coming in as an HTTP response object, not a JSON just yet
github_search_results_in_json = http_response_object.json()
return github_search_results_in_json
response = requests.get(url, headers=headers)
response.raise_for_status() # raise an exception for HTTP error responses
return response.json()


def get_github_repos(request):
pat_payload = get_pat_from_secret_manager()
pat_json = json.loads(pat_payload)
retrieved_pat = pat_json["GITHUB_PAT"]
github_repos_json = call_github_search(retrieved_pat)

# looping through our findings, eventually to put in DB
ids_array = []

repos_hash = github_repos_json['items']
for individual_repo_info in repos_hash:
individual_repo_id = individual_repo_info['id']
ids_array.append(individual_repo_id)

reporting_length = len(ids_array)
# [reporting_length,ids_array] # not using for now

db_credentials_payload = get_db_credentials_from_secret_manager()
db_credentials_payload_json = json.loads(db_credentials_payload)

connection = psycopg2.connect(
host=db_credentials_payload_json['DB_HOST'],
port=db_credentials_payload_json['DB_PORT'],
database=db_credentials_payload_json['DB_NAME'], # term of art, can't use NAME
user=db_credentials_payload_json['DB_USER'],
password=db_credentials_payload_json['DB_PASS']
)

cursor = connection.cursor()

query = """
try:
pat_json = json.loads(get_secret(GITHUB_SECRET_NAME))
retrieved_pat = pat_json["GITHUB_PAT"]

connection = create_db_connection()
cursor = connection.cursor()

cursor.execute("SELECT last_call FROM task_scheduling WHERE function_name = %s;", ("scheduled_get_repos",))
current_call_time = cursor.fetchone()[0]
current_end_time = current_call_time + timedelta(minutes=1) # Adjust as necessary
start_time = current_call_time.strftime("%Y-%m-%dT%H:%M:%S")
end_time = current_end_time.strftime("%Y-%m-%dT%H:%M:%S")
next_call_time = current_call_time + timedelta(hours=1) # Adjust as necessary

update_task_schedule(cursor, next_call_time)
connection.commit()

github_repos_json = call_github_search(retrieved_pat, start_time, end_time)
repos_hash = github_repos_json.get('items', [])

repo_data_for_insertion = [
(
repo['id'],
repo['name'],
repo['owner']['login'],
repo['owner']['id'],
repo['fork'],
repo.get('description', 'No description'),
repo['size'],
repo['stargazers_count'],
repo['watchers_count'],
repo['updated_at'],
repo['created_at'],
repo['html_url'],
json.dumps(repo.get('topics', [])), # Save as JSON string
repo.get('language', 'Unknown') # Adding a fallback value
)
for repo in repos_hash
]

insert_query = """
INSERT INTO github_repos (id, name, owner_login, owner_id, fork, description, size, stargazers_count, watchers_count, updated_at, created_at, url, db_inserted_date, db_updated_date, topics, primary_coding_language)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, %s, %s)
ON CONFLICT (id) DO UPDATE SET
Expand All @@ -129,68 +128,17 @@ def get_github_repos(request):
db_updated_date = CURRENT_TIMESTAMP,
topics = EXCLUDED.topics,
primary_coding_language = EXCLUDED.primary_coding_language
"""

repo_data_for_insertion = []
for repo in repos_hash:
repo_data = (
repo['id'],
repo['name'],
repo['owner']['login'],
repo['owner']['id'],
repo['fork'],
repo.get('description', 'No description'),
repo['size'],
repo['stargazers_count'],
repo['watchers_count'],
repo['updated_at'],
repo['created_at'],
repo['html_url'],
repo['topics'],
repo['language']
)
repo_data_for_insertion.append(repo_data)

cursor.executemany(query, repo_data_for_insertion) # attempt insertion

connection.commit() # this is your SUBMIT button

# begin section on pulling data, after insertions were attempted
cursor.execute("SELECT * FROM github_repos")
results = cursor.fetchall()
cursor.close()
connection.close()

return {"DATA RESULTS FROM TABLE": results}, 200


''' this code is likely obsolete now but keep it for debugging purposes
def select_all_from_db(db_credentials):
exposed_host = db_credentials['DB_HOST']
exposed_port = db_credentials['DB_PORT']
exposed_db_name = db_credentials['DB_NAME']
exposed_user = db_credentials['DB_USER']
exposed_password = db_credentials['DB_PASS']

# WARNING, THE BELOW ARE RESERVED TERMS
try:
attempted_db_connection = psycopg2.connect(
host = exposed_host,
port = exposed_port,
database = exposed_db_name, # term of art, can't use NAME
user = exposed_user,
password = exposed_password
)

cursor = attempted_db_connection.cursor() # need explanation here
cursor.execute("SELECT * FROM github_repos;") # puts the data in the cursor, but still need to get it out
rows_of_data = cursor.fetchall() #we cant rely on cursor anymore, because it demands open connection
"""
cursor.executemany(insert_query, repo_data_for_insertion)
connection.commit()

cursor.close() # ask why this is imperative
attempted_db_connection.close() # ask why this is imperative
cursor.execute("SELECT * FROM github_repos")
results = cursor.fetchall()
cursor.close()
connection.close()

return rows_of_data # this is a list of tuples
return {"DATA RESULTS FROM TABLE": results}, 200

except Exception as reported_error:
return f"Connecting to database, due to {reported_error}"
'''
except Exception as e:
logger.error(f"Error in get_github_repos: {e}")
return {"Error": str(e)}, 500
Loading