-
Notifications
You must be signed in to change notification settings - Fork 1
162 metaflow add features #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
1cc2c7c
983e8a4
1ca6c69
687fc02
17f69e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,309 @@ | ||
""" | ||
Add new features to EPC dataset: | ||
- mean average garden size per MSOA | ||
- lat/lon per UPRN | ||
- property density per LSOA | ||
- off gas properties by postcode | ||
- listed building status per UPRN | ||
- England and Wales building conservation area flag per UPRN | ||
- Scotland World Heritage Site flag per UPRN | ||
- grid capacity per LSOA (% of homes which could install a HP with current grid capacity) | ||
- presence of anchor properties per LSOA | ||
|
||
To run: | ||
python -i asf_heat_pump_suitability/pipeline/run_scripts/run_add_features_flow.py --datastore=s3 run --epc_path [path/to/EPC] --year [YYYY] --quarter [Q] | ||
|
||
NB: this pipeline takes the preprocessed and deduplicated EPC dataset in parquet file format. | ||
""" | ||
|
||
from metaflow import FlowSpec, step, batch, Parameter | ||
|
||
|
||
class AddFeaturesFlow(FlowSpec): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L22-42 refactored from arg parse into |
||
epc_path = Parameter( | ||
name="epc_path", | ||
help="Path to processed and deduplicated EPC dataset in parquet file format", | ||
type=str, | ||
required=True, | ||
) | ||
|
||
year = Parameter( | ||
name="year", | ||
help="EPC data year. Format YYYY", | ||
type=int, | ||
required=True, | ||
) | ||
|
||
quarter = Parameter( | ||
name="quarter", | ||
help="EPC data quarter", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for consistency across scripts, just add the range of quarters here too (1-4) |
||
type=int, | ||
required=True, | ||
) | ||
|
||
@step | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi, the overall order of adding the features has changed so please review that the general order makes sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What was the thinking behind the change in the general order? |
||
def start(self): | ||
""" | ||
Load EPC data and start flow. | ||
""" | ||
import logging | ||
import polars as pl | ||
|
||
# Import processed EPC | ||
logging.info(f"Loading EPC file from path: {self.epc_path}") | ||
self.epc_df = pl.read_parquet( | ||
self.epc_path, | ||
columns=[ | ||
"UPRN", | ||
"COUNTRY", | ||
"POSTCODE", | ||
"PROPERTY_TYPE", | ||
"BUILT_FORM", | ||
"CURRENT_ENERGY_RATING", | ||
], | ||
).with_columns( | ||
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]")) | ||
.then(False) | ||
.otherwise(True) | ||
.alias("valid_UPRN") | ||
) | ||
Comment on lines
+64
to
+69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L64-69 are new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense to add |
||
|
||
self.next(self.clean_property_type) | ||
|
||
@step | ||
def clean_property_type(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Clean property type column. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.reweight_epc import prepare_sample | ||
|
||
logging.info("Cleaning property type data in EPC") | ||
# Below we process the EPC `PROPERTY_TYPE` column and rename, then drop the original column | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. brief description of what cleaning is done would be nice as that's where my mind went straight to |
||
self.epc_df = prepare_sample.add_col_property_type(self.epc_df).drop( | ||
["PROPERTY_TYPE", "BUILT_FORM"] | ||
) | ||
|
||
self.next(self.add_output_areas) | ||
|
||
@step | ||
def add_output_areas(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add LSOA, MSOA, LAD, and rural-urban indicators to EPC. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import output_areas | ||
|
||
logging.info("Adding LSOA, MSOA, LAD, and rural-urban indicators to EPC") | ||
onspd_df = output_areas.load_transform_df_area_info() | ||
|
||
self.epc_df = output_areas.standardise_col_postcode( | ||
self.epc_df, pcd_col="POSTCODE" | ||
) | ||
self.epc_df = self.epc_df.join(onspd_df, how="left", on="POSTCODE") | ||
|
||
self.next(self.add_lat_lon_data) | ||
|
||
@step | ||
def add_lat_lon_data(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add lat/ lon data to EPC per property. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import lat_lon | ||
|
||
logging.info("Adding lat/lon data to EPC") | ||
uprn_latlon_df = lat_lon.transform_df_osopen_uprn_latlon() | ||
self.epc_df = self.epc_df.join(uprn_latlon_df, how="left", on="UPRN") | ||
self.epc_gdf = lat_lon.generate_gdf_uprn_coords( | ||
self.epc_df, usecols=["UPRN", "COUNTRY", "lad_code"] | ||
) | ||
|
||
# TODO this is far too slow and is only used to correct some EPC records with incorrect postcodes which get joined to the wrong LSOA/MSOA | ||
# TODO can we filter the dataset somehow and only do the join with incorrect postcodes? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like it would be a good solution, guess you would be able to update the main dataframe fairly easily. Is the issue around how we would go about filtering the dataset? |
||
# # Replace `lad_code` from postcode with `lad_code` from geospatial join and postcode | ||
# logging.info("Adding LAD code with geospatial join") | ||
# uprn_lad_df = output_areas.sjoin_df_uprn_lad_code(self.epc_gdf) | ||
# self.epc_df = self.epc_df.drop("lad_code").join(uprn_lad_df, how="left", on="UPRN") | ||
|
||
self.next(self.add_protected_area_flag) | ||
|
||
@step | ||
def add_protected_area_flag(self): | ||
""" | ||
Add protected area flag. | ||
""" | ||
import logging | ||
import polars as pl | ||
from asf_heat_pump_suitability.pipeline.prepare_features import protected_areas | ||
|
||
logging.info("Adding protected area flag") | ||
uprns_in_protected_area_df = ( | ||
protected_areas.load_transform_df_uprn_in_protected_area(self.epc_gdf) | ||
) | ||
self.epc_df = self.epc_df.join( | ||
uprns_in_protected_area_df, how="left", on="UPRN" | ||
) | ||
|
||
logging.info( | ||
"Adding local authority building conservation area data availability flag for England and Wales" | ||
) | ||
lad_cons_areas_df = ( | ||
protected_areas.generate_df_conservation_area_data_availability( | ||
ladcd_col="LAD23CD" | ||
) | ||
) | ||
self.epc_df = self.epc_df.join( | ||
lad_cons_areas_df, how="left", left_on="lad_code", right_on="LAD23CD" | ||
) | ||
|
||
self.epc_df = self.epc_df.with_columns( | ||
pl.when( | ||
(pl.col("lad_conservation_area_data_available_ew")) | ||
& pl.col("COUNTRY").is_in(["England", "Wales"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we add the Scotland World Heritage Site flag at any point? if not, we should remove from doc string at top of doc |
||
& pl.col("valid_UPRN") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L164 is new, otherwise this step is Unchanged from original code except to refactor to |
||
) | ||
.then(pl.col("in_protected_area").fill_null(False)) | ||
.otherwise(pl.col("in_protected_area")) | ||
.alias("in_protected_area") | ||
) | ||
|
||
self.next(self.add_average_garden_size_per_msoa) | ||
|
||
@step | ||
def add_average_garden_size_per_msoa(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add average garden size per MSOA to EPC. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import ( | ||
epc, | ||
garden_space_avg, | ||
) | ||
|
||
logging.info("Adding average garden size per MSOA to EPC") | ||
garden_space_avg_msoa_df = garden_space_avg.generate_df_garden_space_avg() | ||
self.epc_df = epc.add_col_msoa_avg_outdoor_space_property_type(self.epc_df) | ||
self.epc_df = self.epc_df.join( | ||
garden_space_avg_msoa_df, | ||
how="left", | ||
left_on=["msoa", "msoa_avg_outdoor_space_property_type"], | ||
right_on=["MSOA code", "msoa_avg_outdoor_space_property_type"], | ||
) | ||
|
||
self.next(self.add_property_density) | ||
|
||
@step | ||
def add_property_density(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add property density per LSOA. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import property_density | ||
|
||
logging.info("Adding property density to EPC") | ||
lsoa_density_df = property_density.generate_df_property_density() | ||
self.epc_df = self.epc_df.join(lsoa_density_df, how="left", on="lsoa") | ||
|
||
self.next(self.add_off_gas_flag) | ||
|
||
@step | ||
def add_off_gas_flag(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add off gas grid column to EPC per postcode. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import off_gas | ||
|
||
logging.info("Adding off gas grid column to EPC") | ||
off_gas_postcodes = off_gas.process_off_gas_data() | ||
self.epc_df = off_gas.add_off_gas_feature(self.epc_df, off_gas_postcodes) | ||
|
||
self.next(self.add_listed_building_status) | ||
|
||
@step | ||
def add_listed_building_status(self): | ||
""" | ||
Add flag indicating whether property is within a listed building. | ||
""" | ||
import logging | ||
import polars as pl | ||
from asf_heat_pump_suitability.pipeline.prepare_features import listed_buildings | ||
|
||
logging.info("Adding listed buildings to EPC") | ||
listed_buildings_df = listed_buildings.generate_df_epc_listed_buildings( | ||
epc_df=self.epc_df | ||
) | ||
self.epc_df = self.epc_df.join( | ||
listed_buildings_df, how="left", on="UPRN" | ||
).with_columns( | ||
pl.when(pl.col("valid_UPRN")) | ||
.then(pl.col("listed_building").fill_null(False)) | ||
.otherwise(pl.col("listed_building")) | ||
.alias("listed_building") | ||
) | ||
Comment on lines
+239
to
+244
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L239-244 new and need review There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false? |
||
|
||
self.next(self.add_grid_capacity) | ||
|
||
@step | ||
def add_grid_capacity(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add LSOA grid capacity and maximum percentage of households per LSOA that the grid could support having a heat | ||
pump. | ||
""" | ||
import logging | ||
from asf_heat_pump_suitability.pipeline.prepare_features import grid_capacity | ||
|
||
logging.info("Adding grid capacity column to EPC") | ||
grid_capacities = grid_capacity.calculate_grid_capacity().select( | ||
["lsoa", "heatpump_installation_percentage"] | ||
) | ||
self.epc_df = self.epc_df.join(grid_capacities, how="left", on="lsoa") | ||
|
||
self.next(self.add_anchor_property_flag) | ||
|
||
@step | ||
def add_anchor_property_flag(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchanged from original code except to refactor to |
||
Add flag to indicate whether LSOA has at least one anchor property. | ||
""" | ||
import logging | ||
import polars as pl | ||
from asf_heat_pump_suitability.pipeline.prepare_features import ( | ||
anchor_properties, | ||
) | ||
|
||
logging.info("Adding anchor properties column to EPC") | ||
anchor_properties_df = anchor_properties.identify_anchor_properties_df().select( | ||
["lsoa", "has_anchor_property"] | ||
) | ||
self.epc_df = self.epc_df.join( | ||
anchor_properties_df, how="left", on="lsoa" | ||
).with_columns(pl.col("has_anchor_property").fill_null(False)) | ||
|
||
self.next(self.save_output) | ||
|
||
@step | ||
def save_output(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor changes here |
||
Save final dataframe to S3. | ||
""" | ||
from asf_heat_pump_suitability.utils import save_utils | ||
|
||
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/features/{self.year}_Q{self.quarter}_EPC_features_test.parquet" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When merging into dev we should get rid of the 'test' at the end of this filename? |
||
save_utils.save_to_s3(self.epc_df, save_as) | ||
|
||
self.next(self.end) | ||
|
||
@step | ||
def end(self): | ||
""" | ||
Finish flow. | ||
""" | ||
import logging | ||
|
||
logging.info("Add features flow complete!") | ||
|
||
|
||
if __name__ == "__main__": | ||
AddFeaturesFlow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch is unused so we should delete here