Skip to content

Commit fd69f56

Browse files
author
Elye Bliss
committed
cleaning run_training_data_prep_anemia function
1 parent 8e5044b commit fd69f56

File tree

1 file changed

+50
-202
lines changed

1 file changed

+50
-202
lines changed

src/rra_climate_health/data_prep/run_training_data_prep.py

Lines changed: 50 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
}
6060

6161
DATA_SOURCE_TYPE = {"stunting": "cgf", "wasting": "cgf", "underweight":"cgf", "low_adult_bmi": "bmi","anemia":"anemia"}
62-
MEASURES_IN_SOURCE = {"cgf": ["stunting", "wasting", "underweight"], "bmi": ["low_adult_bmi"], "anemia": ["anemia"]}
62+
MEASURES_IN_SOURCE = {"cgf": ["stunting", "wasting", "underweight"], "bmi": ["low_adult_bmi"], "anemia": ["anemia_anemic_brinda","anemia_mod_sev_brinda"]}
6363

6464
############################
6565
# Wasting/Stunting columns #
@@ -1086,7 +1086,7 @@ def run_training_data_prep_anemia(
10861086
cm_data = ClimateMalnutritionData(Path(DEFAULT_ROOT)/'anemia')
10871087
dhs_wealth_data = get_ldipc_from_asset_score(
10881088
dhs_wealth_data, cm_data, asset_score_col="wealth_index_dhs", weights_col="hhweight",
1089-
plot_pdf_path=Path("/mnt/team/integrated_analytics/pub/goalkeepers/goalkeepers_2025/plots/anemia") / "gbd_plots.pdf", # temporary
1089+
plot_pdf_path=Path(DEFAULT_ROOT) / "input"/ "ldi_plots"/ "dhs_plots.pdf",
10901090
ldi_version = LDI_VERSION,
10911091
)
10921092

@@ -1120,224 +1120,72 @@ def run_training_data_prep_anemia(
11201120
dhs_wealth_data.drop(columns=["old_hh_id"],inplace=True)
11211121

11221122
# Merge data
1123-
anemia_data_wealth = merge_left_without_inflating(anemia_data, dhs_wealth_data.drop(columns=['geospatial_id','lat', 'long']), on=merge_cols)
1124-
1125-
unmergable_rows = anemia_data_wealth[anemia_data_wealth['wealth_index_dhs'].isna()]
1126-
merge_percent = 100*len(anemia_data_wealth[anemia_data_wealth['wealth_index_dhs'].notna()])/len(anemia_data_raw)
1127-
1128-
print(f"{len(anemia_data_wealth[anemia_data_wealth['wealth_index_dhs'].notna()]):,} "
1129-
f"rows out of {len(anemia_data_raw):,} merged ({merge_percent:.1f}%). "
1130-
f"Unmergeable includes {len(missing_hh_rows):,} with missing hh_id in raw "
1131-
f"data, and {len(unmergable_rows):,} that failed to merge on "
1132-
'"nid", "ihme_loc_id", "hh_id", "psu", "year_start" variables')
1133-
1134-
1135-
cm_data = ClimateMalnutritionData(Path(DEFAULT_ROOT) / MEASURES_IN_SOURCE[data_source_type][0])
1136-
1137-
anemia_data_wealth_distribution = get_ldipc_from_asset_score(
1138-
anemia_data_wealth_distribution, cm_data, asset_score_col="wealth_index_dhs",
1139-
# plot_pdf_path=Path(DEFAULT_ROOT) / "input"/ "ldi_plots"/ "gbd_plots.pdf", # no permissions
1140-
plot_pdf_path=Path("/mnt/team/integrated_analytics/pub/goalkeepers/goalkeepers_2025/plots/anemia") / "gbd_plots.pdf", # temporary
1141-
ldi_version = LDI_VERSION,
1142-
)
1143-
ldi_cols = ['ldipc_unweighted_no_match', 'ldipc_weighted_no_match', 'ldipc_unweighted_match', 'ldipc_weighted_match']
1144-
1145-
anemia_data_wealth_distribution = anemia_data_wealth_distribution[
1146-
["nid", "ihme_loc_id", "location_id", "year_start", "psu", "hh_id"] + ldi_cols
1147-
]
1148-
1149-
anemia_data_own_wealth = merge_left_without_inflating(anemia_data_own_wealth, anemia_data_wealth_distribution, on=["nid", "ihme_loc_id", "year_start", "psu", "hh_id"])
1150-
1151-
# Getting income distributions
1152-
dhs_wealth_data = get_ldipc_from_asset_score(
1153-
dhs_wealth_data, cm_data, asset_score_col="wealth_index_dhs", weights_col="hhweight",
1154-
plot_pdf_path=Path(DEFAULT_ROOT) / "input"/ "ldi_plots"/ "dhs_plots.pdf",
1155-
ldi_version = LDI_VERSION,
1156-
)
1157-
1158-
mics_wealth_data = get_ldipc_from_asset_score(
1159-
mics_wealth_data, cm_data, asset_score_col="wealth_index_dhs", weights_col="hhweight",
1160-
plot_pdf_path=Path(DEFAULT_ROOT) / "input"/ "ldi_plots"/ "mics_plots.pdf",
1161-
ldi_version = LDI_VERSION,
1162-
)
1163-
1164-
lsms_wealth_data = get_ldipc_from_asset_score(
1165-
lsms_wealth_data, cm_data, asset_score_col="wealth_measurement", weights_col="hhweight",
1166-
plot_pdf_path=Path(DEFAULT_ROOT) / "input"/ "ldi_plots"/ "lsms_plots.pdf",
1167-
ldi_version = LDI_VERSION,
1168-
)
1169-
1170-
wealth_cols = ['ihme_loc_id', 'location_id', 'nid', 'psu', 'hh_id', 'geospatial_id', 'lat', 'long', 'year_start'] + ldi_cols
1171-
all_wealth_data = pd.concat([dhs_wealth_data[wealth_cols], mics_wealth_data[wealth_cols], lsms_wealth_data[wealth_cols]])
1172-
wealth_lsae_df = all_wealth_data.copy()
1173-
1174-
1175-
# Now the LSAE CGF data
1176-
lsae_cgf_data = lsae_cgf_data_raw[
1177-
[
1178-
"nid",
1179-
"country",
1180-
"year_start",
1181-
"end_year",
1182-
"geospatial_id",
1183-
"psu",
1184-
"pweight",
1185-
#"strata",
1186-
"hh_id",
1187-
"sex",
1188-
"age_year",
1189-
"age_mo",
1190-
"int_year",
1191-
"int_month",
1192-
"stunting_mod_b",
1193-
"wasting_mod_b",
1194-
"underweight_mod_b",
1195-
]
1196-
]
1197-
lsae_cgf_data = lsae_cgf_data.rename(columns=COLUMN_NAME_TRANSLATOR)
1198-
1199-
print("Processing LSAE data...")
1200-
# Take away bad NIDs, without household information
1201-
print(len(lsae_cgf_data))
1202-
no_hhid_nids = (
1203-
lsae_cgf_data.groupby("nid")
1204-
.filter(lambda x: x["hh_id"].isna().all())
1205-
.nid.unique()
1206-
)
1207-
lsae_cgf_data = lsae_cgf_data[~lsae_cgf_data.nid.isin(no_hhid_nids)]
1208-
#TODO print these NIDS to a file
1209-
1210-
# Try to make household id usable to merge on
1211-
# For some reason in some extractions hh_id is a string with household id and psu, in others it's just the household id
1212-
lsae_cgf_data = lsae_cgf_data[lsae_cgf_data["nid"].isin(common_nids)].copy()
1213-
lsae_cgf_data = lsae_cgf_data.rename(
1214-
columns={"latitude": "lat", "longitude": "long", "hh_id": "old_hh_id"}
1215-
)
1216-
wealth_lsae_df = wealth_lsae_df.rename(columns={"hh_id": "old_hh_id"})
1217-
lsae_cgf_data["hh_id"] = lsae_cgf_data["old_hh_id"].str.split(r"[_ ]").str[-1]
1218-
wealth_lsae_df["hh_id"] = wealth_lsae_df["old_hh_id"].str.split(r"[_ ]").str[-1]
1219-
lsae_cgf_data["psu"] = lsae_cgf_data["psu"].astype(int)
1220-
lsae_cgf_data["hh_id"] = lsae_cgf_data.apply(clean_hh_id, axis=1)
1221-
wealth_lsae_df["hh_id"] = wealth_lsae_df.apply(clean_hh_id, axis=1)
1222-
print(len(lsae_cgf_data))
1223-
1224-
# Some NIDs need extra cleaning so that hh_id can be merged.
1225-
# Take those out and merge LSAE CGF data with wealth
1226-
print(len(lsae_cgf_data))
1227-
1228-
merge_cols = ["nid", "ihme_loc_id", "hh_id", "psu", "year_start"]
1229-
# maybe_fixable_df.loc[maybe_fixable_df.sex_id == 0, "sex_id"] = 2
1230-
1231-
lsae_merged = merge_left_without_inflating(lsae_cgf_data.drop(columns=["old_hh_id"]), wealth_lsae_df, on=merge_cols)
1232-
print(len(lsae_cgf_data))
1233-
print(len(lsae_merged))
1234-
1235-
# Take out NIDs with more than 5% of missing wealth data
1236-
allowed_wealth_nan_proportion = 0.05
1237-
nan_proportion = lsae_merged.groupby('nid').apply(lambda x: x.ldipc_unweighted_no_match.isna().mean(), include_groups=False).reset_index().rename(columns={0: 'nan_proportion'})
1238-
bad_wealth_merge_nids = nan_proportion.query('nan_proportion > @allowed_wealth_nan_proportion').nid.unique()
1239-
print(len(lsae_merged))
1240-
lsae_merged = lsae_merged[~lsae_merged.nid.isin(bad_wealth_merge_nids)]
1241-
print(len(lsae_merged))
1242-
# Take out rows with missing wealth data but in NIDs with less than 5% of missing wealth data
1243-
lsae_merged = lsae_merged[~lsae_merged.ldipc_unweighted_no_match.isna()]
1244-
print(len(lsae_merged))
1245-
1246-
maybe_fixable_df = lsae_cgf_data[
1247-
lsae_cgf_data["nid"].isin(bad_wealth_merge_nids)#([157057, 286780, 341838])
1248-
].copy()
1249-
1250-
# Only interested in rows have have either stunting, wasting or underweight information, with both wealth and location
1251-
print(len(lsae_merged))
1252-
lsae_merged = lsae_merged.dropna(
1253-
subset=["stunting", "wasting", "underweight"], how="all"
1254-
)
1255-
print(len(lsae_merged))
1256-
lsae_merged = lsae_merged.dropna(subset=["lat", "long"], how="any")
1257-
lsae_merged.loc[lsae_merged.sex_id == 0, "sex_id"] = 2
1258-
print(len(lsae_merged))
1259-
1260-
# Drop rows from GBD dataset with missing location information
1261-
#gbd_cgf_data = gbd_cgf_data.dropna(subset=["lat", "long"], how="any")
1262-
1263-
1264-
extra_nids = gbd_cgf_data_to_match.copy().drop(columns=['lat', 'long'])
1265-
1266-
extra_nids["hh_id"] = extra_nids["hh_id"].str.split(r"[_ ]").str[-1]
1267-
extra_nids["hh_id"] = extra_nids.apply(clean_hh_id, axis=1)
1268-
extra_nids_nids = extra_nids.nid.unique()
1269-
1270-
extra_nids_wealth = all_wealth_data.query("nid in @extra_nids_nids").copy()
1271-
extra_nids_wealth["hh_id"] = extra_nids_wealth["hh_id"].str.split(r"[_ ]").str[-1]
1272-
extra_nids_wealth["hh_id"] = extra_nids_wealth.apply(clean_hh_id, axis=1)
1273-
extra_nids = merge_left_without_inflating(extra_nids, extra_nids_wealth, on=["nid", "ihme_loc_id", "hh_id", "psu", "year_start"])
1274-
print(len(extra_nids))
1275-
#extra_nids = extra_nids.dropna(subset=["ldipc"])
1276-
1277-
# Take out NIDs with more than 5% of missing wealth data
1278-
allowed_wealth_nan_proportion = 0.05
1279-
nan_proportion = extra_nids.groupby('nid').apply(lambda x: x.ldipc_unweighted_no_match.isna().mean(), include_groups=False).reset_index().rename(columns={0: 'nan_proportion'})
1280-
bad_wealth_merge_nids_extra = nan_proportion.query('nan_proportion > @allowed_wealth_nan_proportion').nid.unique()
1281-
print(len(extra_nids))
1282-
extra_nids = extra_nids[~extra_nids.nid.isin(bad_wealth_merge_nids_extra)]
1283-
print(len(extra_nids))
1284-
# Take out rows with missing wealth data but in NIDs with less than 5% of missing wealth data
1285-
extra_nids = extra_nids[~extra_nids.ldipc_unweighted_no_match.isna()]
1286-
print(len(extra_nids))
1287-
1288-
1289-
# Bring the two datasets (LSAE and GBD) together, giving preference to the LSAE extractions
1290-
gbd_extraction_nids = set(gbd_cgf_data_own_wealth.nid.unique()) | set(extra_nids.nid.unique())
1291-
lsae_only = lsae_merged.loc[~lsae_merged.nid.isin(gbd_extraction_nids)]
1292-
gbd_only = pd.concat([extra_nids.loc[~extra_nids.nid.isin(lsae_merged.nid.unique())],
1293-
gbd_cgf_data_own_wealth.loc[~gbd_cgf_data_own_wealth.nid.isin(lsae_merged.nid.unique())]
1294-
])
1123+
anemia_data_wealth = merge_left_without_inflating(anemia_data, dhs_wealth_data.drop(columns=['geospatial_id', 'strata', 'lat', 'long']), on=merge_cols)
12951124

1125+
#Calculate proportion of NA and filter out nids with too much wealth missingness (bad merges)
1126+
merged_na_props = (anemia_data_wealth.groupby(['nid']).ldipc_weighted_no_match.count() / anemia_data_wealth.groupby(['nid']).ldipc_weighted_no_match.size())
1127+
merged_nids = merged_na_props[merged_na_props > 0.95].index.to_list()
1128+
anemia_df = anemia_data_wealth.query("nid in @merged_nids").copy()
1129+
dropped_too_missingness = len(anemia_data_wealth) - len(anemia_df)
12961130

1297-
1298-
cgf_consolidated = pd.concat(
1299-
[lsae_merged, gbd_only], ignore_index=True
1300-
).reset_index(drop=True)
1301-
1302-
cgf_consolidated = cgf_consolidated.drop(columns=["strata", "geospatial_id"])
1303-
# selected_wealth_column = "ldi_pc_weighted_no_match"
1304-
# cgf_consolidated["ldi_pc_pd"] = cgf_consolidated["ldipc"] / 365
1131+
# drop other unmerged
1132+
unmergable_rows = anemia_data_wealth[anemia_data_wealth['wealth_index_dhs'].isna()]
1133+
anemia_data_wealth = anemia_data_wealth[anemia_data_wealth['wealth_index_dhs'].notna()]
13051134

13061135
# Assign age group
1307-
cgf_consolidated = assign_age_group(cgf_consolidated, )
1308-
cgf_consolidated = cgf_consolidated.dropna(subset=["age_group_id"])
1136+
before_rows = len(anemia_df)
1137+
anemia_df = assign_age_group(anemia_df, )
1138+
anemia_df = anemia_df.dropna(subset=["age_group_id"])
1139+
dropped_due_to_age = before_rows - len(anemia_df)
13091140

13101141
# Take out data with invalid lat and long
1311-
cgf_consolidated = cgf_consolidated.dropna(subset=["lat", "long"])
1312-
cgf_consolidated = cgf_consolidated.query("lat != 0 and long != 0")
1142+
before_rows = len(anemia_df)
1143+
anemia_df = anemia_df.dropna(subset=["lat", "long"])
1144+
anemia_df = anemia_df.query("lat != 0 and long != 0")
1145+
dropped_due_to_coords = before_rows - len(anemia_df)
13131146

13141147
# NID 275090 is a very long survey in Peru, 2003-2008 that is coded as having
1315-
# multiple year_starts. Removing it
1316-
cgf_consolidated = cgf_consolidated.query("nid != 275090")
1317-
1148+
# multiple year_starts. Removing it.
13181149
# NID 411301 is a Zambia survey in which the prevalences end up being 0
1319-
# after removing data with invalid age columns, remove it
1320-
cgf_consolidated = cgf_consolidated.query("nid != 411301")
1321-
1150+
# after removing data with invalid age columns, remove it.
1151+
problematic_nids = [275090, 411301]
1152+
before_rows = len(anemia_df)
1153+
anemia_df = anemia_df.query("nid not in @problematic_nids")
1154+
dropped_problematic_nids = before_rows - len(anemia_df)
1155+
1156+
# missing outcome variables
1157+
measure_columns = MEASURES_IN_SOURCE[data_source_type]
1158+
rows_with_na_outcomes = anemia_df[measure_columns].isna().any(axis=1).sum()
1159+
rows_with_na_outcomes = int(rows_with_na_outcomes)
1160+
1161+
full_data_rows = len(anemia_df)-rows_with_na_outcomes
1162+
print(f"Data contains {full_data_rows:,} "
1163+
f"rows out of raw {len(anemia_data_raw):,}. "
1164+
f"Dropped data includes:\n"
1165+
f" - {len(missing_hh_rows):,} with missing hh_id in raw data\n"
1166+
f" - {dropped_too_missingness:,} that were dropped due to excessive missingness\n"
1167+
f' - {len(unmergable_rows):,} that further failed to merge on "nid", "ihme_loc_id", "hh_id", "psu", "year_start" variables\n'
1168+
f" - {dropped_due_to_age:,} due to age groups not found among 388, 389, 238, 34\n"
1169+
f" - {dropped_due_to_coords:,} due to invalid lat and long values\n"
1170+
f" - {dropped_problematic_nids:,} due to problematic NIDs\n"
1171+
f" - {rows_with_na_outcomes:,} with missing outcome variables ({measure_columns})\n"
1172+
)
13221173

13231174
# Merge with climate data
13241175
print("Processing climate data...")
1325-
climate_df = get_climate_vars_for_dataframe(cgf_consolidated)
1326-
cgf_consolidated = merge_left_without_inflating(cgf_consolidated, climate_df, on=["int_year", "lat", "long"])
1327-
1176+
climate_vars = get_climate_vars_for_dataframe(anemia_df)
1177+
anemia_df = merge_left_without_inflating(anemia_df, climate_vars, on=["int_year", "lat", "long"])
13281178

13291179
print("Adding elevation data...")
1330-
cgf_consolidated = get_elevation_for_dataframe(cgf_consolidated)
1331-
1332-
cgf_consolidated = assign_lbd_admin2_location_id(cgf_consolidated)
1333-
#cgf_consolidated = assign_sdi(cgf_consolidated)
1180+
anemia_df = get_elevation_for_dataframe(anemia_df)
13341181

1182+
anemia_df = assign_lbd_admin2_location_id(anemia_df)
13351183

13361184
#Write to output
13371185
for measure in MEASURES_IN_SOURCE[data_source_type]:
1338-
measure_df = cgf_consolidated[cgf_consolidated[measure].notna()].copy().drop(columns=[x for x in cgf_consolidated.columns if '_x' in x or '_y' in x])
1339-
measure_df["cgf_measure"] = measure
1340-
measure_df["cgf_value"] = measure_df[measure]
1186+
measure_df = anemia_df[anemia_df[measure].notna()].copy()
1187+
measure_df["measure"] = measure
1188+
measure_df["value"] = measure_df[measure]
13411189
measure_root = Path(output_root) / measure
13421190
cm_data = ClimateMalnutritionData(measure_root)
13431191
print(f"Saving data for {measure} to {measure_root} {len(measure_df)} rows")

0 commit comments

Comments
 (0)