Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions flow/api/admin/INGEST.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
Raw Ingested JSON:

```
{
"term_code": 1249,
"term_name": "Fall 2024",
"data": [
{
"course_code": "acc760",
"course_id": 32,
"instructor": "Andrew Bauer",
"scraped_at": "2025-05-02 09:50 PM"
},
{
"course_code": "acc760",
"course_id": 32,
"instructor": "Kaishu Wu,",
"scraped_at": "2025-05-02 09:50 PM"
},
{
"course_code": "actsc221",
"course_id": 47,
"instructor": "Brent Matheson",
"scraped_at": "2025-05-02 09:50 PM"
},
{
"course_code": "actsc231",
"course_id": 49,
"instructor": "Fan Yang",
"scraped_at": "2025-05-02 09:50 PM"
},
{
"course_code": "actsc232",
"course_id": 50,
"instructor": "Fan Yang",
"scraped_at": "2025-05-02 09:50 PM"
}
]
}
```

#1: Normalize Instructor Name and Generate ProfCode

```
{
{
"course_id": 32,
"prof_code" "andrew_bauer",
"instructor": "Andrew Bauer",
},
{
"course_id": 32,
"prof_code": "kaishu_wu",
"instructor": "Kaishu Wu,",
},
{
"course_id": 47,
"prof_code": "brent_matheson",
"instructor": "Brent Matheson",
},
}
```

#2: Load data into memory in SQL called insert_prof_teaches_delta with new Similarity Score

#3: Within SQL, categorize into Insert Prof, Update Course, Ignore or Ambiguous:

> Insert image of categorization here!

```
{
{
"course_id": 32,
"prof_code" "andrew_bauer",
"instructor": "Andrew Bauer",
"prof_id": 1
"categorize": "INSERT_AND_ADD_PROF"
},
{
"course_id": 32,
"prof_code": "kaishu_wu",
"instructor": "Kaishu Wu,",
"prof_id": 2
"categorize": "INSERT"
},
{
"course_id": 47,
"prof_code": "brent_matheson",
"instructor": "Brent Matheson",
"categorize": "AMBIGUOUS
},
}
```

#4: Extract all UPDATE_TEACH entries and insert (code, name)

```
CREATE TABLE prof (
id SERIAL PRIMARY KEY,
-- unique handle of the form first(_middle)?_last
code TEXT NOT NULL
CONSTRAINT prof_code_unique UNIQUE,
name TEXT NOT NULL
CONSTRAINT prof_name_length CHECK (LENGTH(name) <= 256),
picture_url TEXT
);
```

#5: Within Delta Table, annotate with prof_id joining on prof_code for INSERT_AND_ADD_PROF and INSERT

#6: Insert from Delta Table into prof_teaches_course table

#7: Return Results data structure with data update stats

#8: Save Delta Table as JSON in addition to AMBIGUOUS entires as raw JSON
155 changes: 155 additions & 0 deletions flow/api/admin/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// GEMINI
package admin

import (
"encoding/json"
"fmt"
"net/http"

"flow/common/db"
"flow/common/util"
"flow/importer/uw/log"
)

type profCourseEntry struct {
ProfName string
CourseID int
}




func IngestProfData(tx *db.Tx, r *http.Request) (interface{}, error) {
var body []profCourseEntry
err := json.NewDecoder(r.Body).Decode(&body)
if err != nil {
return nil, fmt.Errorf("failed to decode JSON body: %w", err)
}

var result log.DbResult

err = tx.Begin()
if err != nil {
return &result, fmt.Errorf("failed to open transaction: %w", err)
}
defer tx.Rollback()

// Create staging table
_, err = tx.Exec(`
CREATE TABLE work.raw_staging_table (
prof_name TEXT NOT NULL,
course_id INT NOT NULL,
CONSTRAINT raw_staging_table_pkey PRIMARY KEY (prof_name, course_id)
);
`)
if err != nil {
return &result, fmt.Errorf("failed to create work table: %w", err)
}

// Prepare and copy data
preparedProfTeachesCourse := make([][]interface{}, len(body))
for i, profCourse := range body {
preparedProfTeachesCourse[i] = util.AsSlice(profCourse)
}

_, err = tx.CopyFrom(
db.Identifier{"work", "raw_staging_table"},
util.Fields(profCourseEntry{}),
preparedProfTeachesCourse,
)
if err != nil {
return &result, fmt.Errorf("failed to copy data to staging table: %w", err)
}

// Create trigram extension and index if they don't exist
_, err = tx.Exec(`CREATE EXTENSION IF NOT EXISTS pg_trgm;`)
if err != nil {
return &result, fmt.Errorf("failed to create pg_trgm extension: %w", err)
}

_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS idx_prof_name_trgm ON prof USING GIN (name gin_trgm_ops);`)
if err != nil {
return &result, fmt.Errorf("failed to create trigram index: %w", err)
}

// Execute the matching and categorization query
_, err = tx.Exec(`
WITH dataWithCourseCode AS (
SELECT
raw_data.prof_name,
raw_data.course_id,
c.code AS course_code
FROM
work.raw_staging_table raw_data
JOIN course c ON raw_data.course_id = c.id
),
RankedSimilarities AS (
SELECT
raw_data.prof_name,
raw_data.course_id,
raw_data.course_code,
best_match_query.id AS best_matched_prof_id,
best_match_query.name AS best_matched_prof_name,
similarity(best_match_query.name, raw_data.prof_name) as similarity_score
FROM
dataWithCourseCode raw_data
CROSS JOIN LATERAL (
SELECT id, name
FROM prof
ORDER BY similarity(name, raw_data.prof_name) DESC
LIMIT 1
) AS best_match_query
),
SubjectMatches AS (
SELECT
rs.prof_name,
rs.course_id,
rs.best_matched_prof_id,
rs.best_matched_prof_name,
rs.similarity_score,
CASE
WHEN substring(rs.course_code FROM '^[[:alpha:]]+') IN (
SELECT substring(c.code FROM '^[[:alpha:]]+')
FROM prof_teaches_course ptc
JOIN course c ON c.id = ptc.course_id
WHERE ptc.prof_id = rs.best_matched_prof_id
) THEN TRUE
ELSE FALSE
END AS subject_match
FROM
RankedSimilarities rs
)
INSERT INTO work.prof_teaches_course_delta (prof_id, course_id, category, similarity)
SELECT
sm.best_matched_prof_id,
sm.course_id,
CASE
WHEN sm.similarity_score = 1.0 THEN 'INSERT'::work.prof_teaches_course_category
WHEN sm.similarity_score > 0.6 AND sm.subject_match THEN 'INSERT'::work.prof_teaches_course_category
WHEN sm.similarity_score > 0.6 AND NOT sm.subject_match THEN 'AMBIGUOUS'::work.prof_teaches_course_category
WHEN sm.similarity_score <= 0.6 AND sm.subject_match THEN 'AMBIGUOUS'::work.prof_teaches_course_category
WHEN sm.similarity_score <= 0.6 AND NOT sm.subject_match THEN 'INSERT_AND_ADD_PROF'::work.prof_teaches_course_category
ELSE 'IGNORE'::work.prof_teaches_course_category
END AS category,
sm.similarity_score
FROM
SubjectMatches sm;
`)
if err != nil {
return &result, fmt.Errorf("failed to process and categorize matches: %w", err)
}

// Clean up staging table
_, err = tx.Exec(`DROP TABLE work.raw_staging_table;`)
if err != nil {
return &result, fmt.Errorf("failed to clean up staging table: %w", err)
}

err = tx.Commit()
if err != nil {
return &result, fmt.Errorf("failed to commit transaction: %w", err)
}

return &result, nil
}

93 changes: 93 additions & 0 deletions flow/api/admin/load.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@

-- Define in Hasura Migration
CREATE TABLE work.raw_staging_table (
prof_name TEXT NOT NULL,
course_id INT NOT NULL,
CONSTRAINT raw_staging_table_pkey PRIMARY KEY (prof_id, course_id)
);

-- Get Course Code
WITH dataWithCourseCode AS (
SELECT
raw_data.prof_name,
raw_data.course_id,
c.code AS course_code
FROM
work.raw_staging_table raw_data
JOIN course c ON raw_data.course_id = c.id
)

-- Ranked Similarities
-- Get the best matched prof for each raw_data prof
WITH RankedSimilarities AS (
SELECT
raw_data.prof_name,
raw_data.course_id,
raw_data.course_code,
best_match_query.id AS best_matched_prof_id,
best_match_query.name AS best_matched_prof_name,
best_match_query.similarity_score
FROM

-- For each row from raw_data, the fuzzy match subquery is executed for each scraped prof nam e
dataWithCourseCode raw_data
CROSS JOIN LATERAL (
SELECT id, name
FROM prof
ORDER BY similarity(name, raw_data.prof_name) DESC
LIMIT 1
) AS best_match_query;
),

-- Subject Matches
-- Check if the best matched prof has taught the same subject as the raw_data prof
WITH SubjectMatches AS (
SELECT
rd.prof_name,
rd.course_id,
rd.best_matched_prof_id,
rd.best_matched_prof_name,
rd.similarity_score,
CASE
WHEN substring(c.coure_code FROM '^[[:alpha:]]+') IN (
SELECT c.code
FROM prof_teaches_course ptc
JOIN course c ON c.id = ptc.course_id
WHERE ptc.prof_id = p.id
ORDER BY c.code
) THEN TRUE ELSE FALSE
END AS subject_match
FROM
RankedSimilarities rd
)

-- Categorize Matches

-- From 1745912089107_rebuild_prof_teaches_course migration:

-- CREATE TYPE work.prof_teaches_course_category AS ENUM ('INSERT_AND_ADD_PROF', 'INSERT', 'AMBIGUOUS', 'IGNORE');

-- CREATE TABLE work.prof_teaches_course_delta(
-- prof_id INT NOT NULL,
-- course_id INT NOT NULL,
-- category work.prof_teaches_course_category NOT NULL,
-- similarity FLOAT DEFAULT NULL,

-- CONSTRAINT prof_teaches_course_delta_pkey PRIMARY KEY (prof_id, course_id)
-- );

INSERT INTO work.prof_teaches_course_delta (prof_id, course_id, category, similarity_score)
SELECT
sm.best_matched_prof_id,
sm.course_id,
CASE
WHEN sm.similarity_score == 1.0 THEN 'INSERT'
WHEN sm.similarity_score > 0.6 AND sm.subject_match THEN 'INSERT'
WHEN sm.similarity_score > 0.6 AND NOT sm.subject_match THEN 'AMBIGUOUS'
WHEN sm.similarity_score <= 0.6 AND sm.subject_match THEN 'AMBIGUOUS'
WHEN sm.similarity_score <= 0.6 AND NOT sm.subject_match THEN 'INSERT_AND_ADD_PROF'
ELSE 'IGNORE'
END AS category,
sm.similarity_score
FROM
SubjectMatches sm
10 changes: 9 additions & 1 deletion flow/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

"flow/common/db"

ingest "flow/api/admin"

"github.com/go-chi/chi/v5"
chi_middleware "github.com/go-chi/chi/v5/middleware"
)
Expand All @@ -35,7 +37,7 @@ func setupRouter(conn *db.Conn) *chi.Mux {
chi_middleware.Logger,
chi_middleware.Recoverer,
chi_middleware.RequestID,
chi_middleware.Timeout(10*time.Second),
chi_middleware.Timeout(10*time.Minute),
)

router.Post(
Expand Down Expand Up @@ -97,6 +99,12 @@ func setupRouter(conn *db.Conn) *chi.Mux {
serde.WithDbDirect(conn, auth.DeleteAccount, "account deletion"),
)

// admin endpoints
router.Post(
"/admin/prof",
serde.WithDbResponse(conn, ingest.IngestProfData, "prof taught course ingestion"),
)

return router
}

Expand Down
Loading