Skip to content

Commit 5151a24

Browse files
committed
Create skills flow update
1 parent 826010f commit 5151a24

File tree

3 files changed

+262
-2
lines changed

3 files changed

+262
-2
lines changed

dap_prinz_green_jobs/pipeline/ojo_application/flows/ojo_industry_measures_update.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def write_polars_s3(df, destination):
100100
f"There are {len(new_ojo_descriptions)} job adverts without existing green industry measures (of which there are {len(existing_ids)})"
101101
)
102102

103-
# The format used in SkillMeasures
103+
# The format used in IndustryMeasures
104104
ojo_jobs_data = (
105105
new_ojo_descriptions[[id_column, job_desc_column]]
106106
.rename(
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
"""
2+
Run the skills measures for the just new job adverts.
3+
4+
python dap_prinz_green_jobs/pipeline/ojo_application/flows/ojo_skills_measures_update.py
5+
6+
- filter new job advert data to not include any jobs that already have green skills measures
7+
- calculate green skills measures for new data
8+
- merge with original green skills data
9+
10+
"""
11+
12+
from dap_prinz_green_jobs import logger
13+
from dap_prinz_green_jobs.getters.data_getters import (
14+
save_to_s3,
15+
get_s3_data_paths,
16+
load_s3_data,
17+
)
18+
from dap_prinz_green_jobs import BUCKET_NAME, config
19+
from dap_prinz_green_jobs.pipeline.green_measures.skills.skill_measures_utils import (
20+
SkillMeasures,
21+
)
22+
from dap_prinz_green_jobs.getters.ojo_getters import (
23+
get_large_ojo_sample,
24+
)
25+
26+
from toolz import partition_all
27+
28+
from tqdm import tqdm
29+
import pandas as pd
30+
import polars as pl
31+
32+
from argparse import ArgumentParser
33+
from datetime import datetime as date
34+
35+
import os
36+
import numpy as np
37+
38+
39+
## ---- Change these everytime you update the data -----
40+
41+
# The existing green measures (no need to run skills greenness again for these job adverts)
42+
green_skills_existing_data_dir = "s3://prinz-green-jobs/outputs/data/ojo_application/extracted_green_measures/20240220/all_ojo_large_sample_skills_green_measures_production_True.csv"
43+
44+
# The latest job advert data
45+
new_ojo_descriptions_dir = "s3://prinz-green-jobs/outputs/data/ojo_application/deduplicated_sample/20241114/all_ojo_descriptions.parquet"
46+
47+
## ----------------------------------------------------
48+
49+
import s3fs
50+
51+
52+
def write_polars_s3(df, destination):
53+
fs = s3fs.S3FileSystem()
54+
# write parquet
55+
if ".csv" in destination:
56+
with fs.open(destination, mode="wb") as f:
57+
df.write_csv(f)
58+
elif ".parquet" in destination:
59+
with fs.open(destination, mode="wb") as f:
60+
df.write_parquet(f)
61+
else:
62+
print("destination should be a '.csv' or '.parquet'")
63+
64+
65+
if __name__ == "__main__":
66+
parser = ArgumentParser()
67+
parser.add_argument("--production", action="store_true", default=False)
68+
parser.add_argument("--job_desc_column", default="description", type=str)
69+
parser.add_argument("--id_column", default="id", type=str)
70+
parser.add_argument("--test_n", default=100, type=int)
71+
72+
args = parser.parse_args()
73+
production = args.production
74+
id_column = args.id_column
75+
test_n = args.test_n
76+
job_desc_column = args.job_desc_column
77+
78+
if not production:
79+
chunk_size = 20
80+
else:
81+
chunk_size = 10000
82+
83+
print("loading datasets...")
84+
85+
green_skills_existing_data = pl.read_csv(green_skills_existing_data_dir)
86+
87+
# Remove any job adverts which have existing green measures (there shouldn't really be any)
88+
all_ojo_descriptions = pl.read_parquet(new_ojo_descriptions_dir)
89+
existing_ids = set(green_skills_existing_data["job_id"].to_list())
90+
new_ojo_descriptions = all_ojo_descriptions.filter(
91+
~pl.col("id").is_in(existing_ids)
92+
)
93+
94+
print(
95+
f"There are {len(new_ojo_descriptions)} job adverts without existing green skills measures (of which there are {len(existing_ids)})"
96+
)
97+
98+
# The format used in SkillMeasures
99+
ojo_jobs_data = (
100+
new_ojo_descriptions[[id_column, job_desc_column]]
101+
.rename(
102+
{
103+
id_column: config["job_adverts"]["job_id_key"],
104+
job_desc_column: config["job_adverts"]["job_text_key"],
105+
}
106+
)
107+
.to_dicts()
108+
)
109+
110+
if not production:
111+
ojo_jobs_data = ojo_jobs_data[:test_n]
112+
113+
date_stamp = str(date.today().date()).replace("-", "")
114+
folder_name = f"outputs/data/ojo_application/extracted_green_measures/{date_stamp}/"
115+
116+
skills_output_folder = f"outputs/data/green_skill_lists/{date_stamp}"
117+
118+
# Skills config variables
119+
skills_config_name = config["skills"]["skills_config_name"]
120+
load_skills = config["skills"][
121+
"load_skills"
122+
] # Set to false if your job adverts or NER model changes
123+
load_skills_embeddings = config["skills"][
124+
"load_skills_embeddings"
125+
] # Set to false if your job advert data, NER model or way to embed changes
126+
load_taxonomy_embeddings = config["skills"][
127+
"load_taxonomy_embeddings"
128+
] # Set to false if your input taxonomy data or way to embed changes
129+
green_skills_classifier_model_file_name = config["skills"][
130+
"green_skills_classifier_model_file_name"
131+
]
132+
133+
if config["skills"]["load_taxonomy_embeddings"]:
134+
green_tax_embedding_path = config["skills"]["green_tax_embedding_path"]
135+
else:
136+
green_tax_embedding_path = os.path.join(
137+
skills_output_folder, "green_esco_embeddings.json"
138+
)
139+
140+
sm = SkillMeasures(
141+
config_name="extract_green_skills_esco",
142+
green_skills_classifier_model_file_name=green_skills_classifier_model_file_name,
143+
)
144+
sm.initiate_extract_skills(local=False, verbose=True)
145+
146+
taxonomy_skills_embeddings_dict = sm.get_green_taxonomy_embeddings(
147+
output_path=green_tax_embedding_path,
148+
load=load_taxonomy_embeddings,
149+
)
150+
151+
job_desc_chunks = list(partition_all(chunk_size, ojo_jobs_data))
152+
153+
print(
154+
f"Finding skills information for {chunk_size} job adverts in {len(job_desc_chunks)} batches."
155+
)
156+
157+
for i, job_desc_chunk in tqdm(enumerate(job_desc_chunks)):
158+
skills_output = os.path.join(
159+
skills_output_folder, f"predicted_skills_production_{production}/{i}.json"
160+
)
161+
skill_embeddings_output = os.path.join(
162+
skills_output_folder,
163+
f"extracted_skills_embeddings_production_{production}/{i}.json",
164+
)
165+
166+
# Where to output the mappings of skills to all of ESCO (not just green)
167+
skill_mappings_output_path = os.path.join(
168+
skills_output_folder,
169+
f"full_esco_skill_mappings_production_{production}/{i}.json",
170+
)
171+
172+
prop_green_skills = sm.get_measures(
173+
job_desc_chunk,
174+
skills_output_path=skills_output,
175+
load_skills=load_skills,
176+
job_text_key=config["job_adverts"]["job_text_key"],
177+
job_id_key=config["job_adverts"]["job_id_key"],
178+
skill_embeddings_output_path=skill_embeddings_output,
179+
load_skills_embeddings=load_skills_embeddings,
180+
skill_mappings_output_path=skill_mappings_output_path,
181+
)
182+
183+
save_to_s3(
184+
BUCKET_NAME,
185+
prop_green_skills,
186+
os.path.join(
187+
skills_output_folder,
188+
f"ojo_newest_skills_green_measures_production_{production}_interim/{i}.json",
189+
),
190+
)
191+
192+
# Read them back in and save altogether
193+
prop_green_skills_locs = get_s3_data_paths(
194+
BUCKET_NAME,
195+
os.path.join(
196+
skills_output_folder,
197+
f"ojo_newest_skills_green_measures_production_{production}_interim",
198+
),
199+
file_types=["*.json"],
200+
)
201+
202+
print("Load green measures per job advert")
203+
all_prop_green_skills = {}
204+
for prop_green_skills_loc in tqdm(prop_green_skills_locs):
205+
all_prop_green_skills.update(load_s3_data(BUCKET_NAME, prop_green_skills_loc))
206+
207+
save_to_s3(
208+
BUCKET_NAME,
209+
all_prop_green_skills,
210+
os.path.join(
211+
folder_name,
212+
f"ojo_newest_skills_green_measures_production_{production}.json",
213+
),
214+
)
215+
216+
skill_measures_df = (
217+
pd.DataFrame.from_dict(all_prop_green_skills, orient="index")
218+
.reset_index()
219+
.rename(columns={"index": "job_id"})
220+
)
221+
# save as csv because of invalid parquet schema
222+
skills_df_path = os.path.join(
223+
BUCKET_NAME,
224+
folder_name,
225+
f"ojo_newest_skills_green_measures_production_{production}.csv",
226+
)
227+
228+
skill_measures_df["ENTS"] = skill_measures_df["ENTS"].astype(str)
229+
skill_measures_df.to_parquet(f"s3://{skills_df_path}", index=False)
230+
231+
# Join with the existing green skills measures
232+
233+
skill_measures_pl = pl.from_pandas(skill_measures_df)
234+
235+
skill_measures_pl = skill_measures_pl.with_columns(
236+
pl.format(
237+
"[{}]", pl.col("GREEN_ENTS").cast(pl.List(pl.String)).list.join(", ")
238+
).alias("GREEN_ENTS")
239+
)
240+
skill_measures_pl = skill_measures_pl.with_columns(
241+
pl.format(
242+
"[{}]", pl.col("BENEFITS").cast(pl.List(pl.String)).list.join(", ")
243+
).alias("BENEFITS")
244+
)
245+
246+
# skill_measures_pl = skill_measures_pl.with_columns(pl.col("BENEFITS").cast(pl.String, strict=False))
247+
248+
all_skills_measures_df = pl.concat(
249+
[green_skills_existing_data, skill_measures_pl], how="vertical_relaxed"
250+
)
251+
252+
skills_all_df_path = os.path.join(
253+
BUCKET_NAME,
254+
folder_name,
255+
f"ojo_all_skills_green_measures_production_{production}.csv",
256+
)
257+
write_polars_s3(all_skills_measures_df, f"s3://{skills_all_df_path}")
258+
write_polars_s3(
259+
all_skills_measures_df, f"s3://{skills_all_df_path.replace('.csv', '.parquet')}"
260+
)

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pandas==1.3.5
66
tqdm==4.64.0
77
scikit-learn==0.23.2
88
openpyxl
9-
ojd-daps-skills
9+
ojd-daps-skills==1.0.2
1010
pyarrow==10.0.0
1111
altair
1212
vega

0 commit comments

Comments
 (0)