Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 309 additions & 0 deletions asf_heat_pump_suitability/pipeline/flows/run_add_features_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
"""
Add new features to EPC dataset:
- mean average garden size per MSOA
- lat/lon per UPRN
- property density per LSOA
- off gas properties by postcode
- listed building status per UPRN
- England and Wales building conservation area flag per UPRN
- Scotland World Heritage Site flag per UPRN
- grid capacity per LSOA (% of homes which could install a HP with current grid capacity)
- presence of anchor properties per LSOA

To run:
python -i asf_heat_pump_suitability/pipeline/run_scripts/run_add_features_flow.py --datastore=s3 run --epc_path [path/to/EPC] --year [YYYY] --quarter [Q]

NB: this pipeline takes the preprocessed and deduplicated EPC dataset in parquet file format.
"""

from metaflow import FlowSpec, step, batch, Parameter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch is unused so we should delete here



class AddFeaturesFlow(FlowSpec):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L22-42 refactored from arg parse into metaflow params

epc_path = Parameter(
name="epc_path",
help="Path to processed and deduplicated EPC dataset in parquet file format",
type=str,
required=True,
)

year = Parameter(
name="year",
help="EPC data year. Format YYYY",
type=int,
required=True,
)

quarter = Parameter(
name="quarter",
help="EPC data quarter",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency across scripts, just add the range of quarters here too (1-4)

type=int,
required=True,
)

@step
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the overall order of adding the features has changed so please review that the general order makes sense

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the thinking behind the change in the general order?

def start(self):
"""
Load EPC data and start flow.
"""
import logging
import polars as pl

# Import processed EPC
logging.info(f"Loading EPC file from path: {self.epc_path}")
self.epc_df = pl.read_parquet(
self.epc_path,
columns=[
"UPRN",
"COUNTRY",
"POSTCODE",
"PROPERTY_TYPE",
"BUILT_FORM",
"CURRENT_ENERGY_RATING",
],
).with_columns(
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]"))
.then(False)
.otherwise(True)
.alias("valid_UPRN")
)
Comment on lines +64 to +69
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L64-69 are new

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to add


self.next(self.clean_property_type)

@step
def clean_property_type(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Clean property type column.
"""
import logging
from asf_heat_pump_suitability.pipeline.reweight_epc import prepare_sample

logging.info("Cleaning property type data in EPC")
# Below we process the EPC `PROPERTY_TYPE` column and rename, then drop the original column
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brief description of what cleaning is done would be nice as that's where my mind went straight to

self.epc_df = prepare_sample.add_col_property_type(self.epc_df).drop(
["PROPERTY_TYPE", "BUILT_FORM"]
)

self.next(self.add_output_areas)

@step
def add_output_areas(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add LSOA, MSOA, LAD, and rural-urban indicators to EPC.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import output_areas

logging.info("Adding LSOA, MSOA, LAD, and rural-urban indicators to EPC")
onspd_df = output_areas.load_transform_df_area_info()

self.epc_df = output_areas.standardise_col_postcode(
self.epc_df, pcd_col="POSTCODE"
)
self.epc_df = self.epc_df.join(onspd_df, how="left", on="POSTCODE")

self.next(self.add_lat_lon_data)

@step
def add_lat_lon_data(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add lat/ lon data to EPC per property.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import lat_lon

logging.info("Adding lat/lon data to EPC")
uprn_latlon_df = lat_lon.transform_df_osopen_uprn_latlon()
self.epc_df = self.epc_df.join(uprn_latlon_df, how="left", on="UPRN")
self.epc_gdf = lat_lon.generate_gdf_uprn_coords(
self.epc_df, usecols=["UPRN", "COUNTRY", "lad_code"]
)

# TODO this is far too slow and is only used to correct some EPC records with incorrect postcodes which get joined to the wrong LSOA/MSOA
# TODO can we filter the dataset somehow and only do the join with incorrect postcodes?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be a good solution, guess you would be able to update the main dataframe fairly easily. Is the issue around how we would go about filtering the dataset?

# # Replace `lad_code` from postcode with `lad_code` from geospatial join and postcode
# logging.info("Adding LAD code with geospatial join")
# uprn_lad_df = output_areas.sjoin_df_uprn_lad_code(self.epc_gdf)
# self.epc_df = self.epc_df.drop("lad_code").join(uprn_lad_df, how="left", on="UPRN")

self.next(self.add_protected_area_flag)

@step
def add_protected_area_flag(self):
"""
Add protected area flag.
"""
import logging
import polars as pl
from asf_heat_pump_suitability.pipeline.prepare_features import protected_areas

logging.info("Adding protected area flag")
uprns_in_protected_area_df = (
protected_areas.load_transform_df_uprn_in_protected_area(self.epc_gdf)
)
self.epc_df = self.epc_df.join(
uprns_in_protected_area_df, how="left", on="UPRN"
)

logging.info(
"Adding local authority building conservation area data availability flag for England and Wales"
)
lad_cons_areas_df = (
protected_areas.generate_df_conservation_area_data_availability(
ladcd_col="LAD23CD"
)
)
self.epc_df = self.epc_df.join(
lad_cons_areas_df, how="left", left_on="lad_code", right_on="LAD23CD"
)

self.epc_df = self.epc_df.with_columns(
pl.when(
(pl.col("lad_conservation_area_data_available_ew"))
& pl.col("COUNTRY").is_in(["England", "Wales"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we add the Scotland World Heritage Site flag at any point? if not, we should remove from doc string at top of doc

& pl.col("valid_UPRN")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L164 is new, otherwise this step is Unchanged from original code except to refactor to metaflow syntax

)
.then(pl.col("in_protected_area").fill_null(False))
.otherwise(pl.col("in_protected_area"))
.alias("in_protected_area")
)

self.next(self.add_average_garden_size_per_msoa)

@step
def add_average_garden_size_per_msoa(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add average garden size per MSOA to EPC.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import (
epc,
garden_space_avg,
)

logging.info("Adding average garden size per MSOA to EPC")
garden_space_avg_msoa_df = garden_space_avg.generate_df_garden_space_avg()
self.epc_df = epc.add_col_msoa_avg_outdoor_space_property_type(self.epc_df)
self.epc_df = self.epc_df.join(
garden_space_avg_msoa_df,
how="left",
left_on=["msoa", "msoa_avg_outdoor_space_property_type"],
right_on=["MSOA code", "msoa_avg_outdoor_space_property_type"],
)

self.next(self.add_property_density)

@step
def add_property_density(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add property density per LSOA.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import property_density

logging.info("Adding property density to EPC")
lsoa_density_df = property_density.generate_df_property_density()
self.epc_df = self.epc_df.join(lsoa_density_df, how="left", on="lsoa")

self.next(self.add_off_gas_flag)

@step
def add_off_gas_flag(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add off gas grid column to EPC per postcode.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import off_gas

logging.info("Adding off gas grid column to EPC")
off_gas_postcodes = off_gas.process_off_gas_data()
self.epc_df = off_gas.add_off_gas_feature(self.epc_df, off_gas_postcodes)

self.next(self.add_listed_building_status)

@step
def add_listed_building_status(self):
"""
Add flag indicating whether property is within a listed building.
"""
import logging
import polars as pl
from asf_heat_pump_suitability.pipeline.prepare_features import listed_buildings

logging.info("Adding listed buildings to EPC")
listed_buildings_df = listed_buildings.generate_df_epc_listed_buildings(
epc_df=self.epc_df
)
self.epc_df = self.epc_df.join(
listed_buildings_df, how="left", on="UPRN"
).with_columns(
pl.when(pl.col("valid_UPRN"))
.then(pl.col("listed_building").fill_null(False))
.otherwise(pl.col("listed_building"))
.alias("listed_building")
)
Comment on lines +239 to +244
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L239-244 new and need review

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false?


self.next(self.add_grid_capacity)

@step
def add_grid_capacity(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add LSOA grid capacity and maximum percentage of households per LSOA that the grid could support having a heat
pump.
"""
import logging
from asf_heat_pump_suitability.pipeline.prepare_features import grid_capacity

logging.info("Adding grid capacity column to EPC")
grid_capacities = grid_capacity.calculate_grid_capacity().select(
["lsoa", "heatpump_installation_percentage"]
)
self.epc_df = self.epc_df.join(grid_capacities, how="left", on="lsoa")

self.next(self.add_anchor_property_flag)

@step
def add_anchor_property_flag(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Add flag to indicate whether LSOA has at least one anchor property.
"""
import logging
import polars as pl
from asf_heat_pump_suitability.pipeline.prepare_features import (
anchor_properties,
)

logging.info("Adding anchor properties column to EPC")
anchor_properties_df = anchor_properties.identify_anchor_properties_df().select(
["lsoa", "has_anchor_property"]
)
self.epc_df = self.epc_df.join(
anchor_properties_df, how="left", on="lsoa"
).with_columns(pl.col("has_anchor_property").fill_null(False))

self.next(self.save_output)

@step
def save_output(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor changes here

Save final dataframe to S3.
"""
from asf_heat_pump_suitability.utils import save_utils

save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/features/{self.year}_Q{self.quarter}_EPC_features_test.parquet"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When merging into dev we should get rid of the 'test' at the end of this filename?

save_utils.save_to_s3(self.epc_df, save_as)

self.next(self.end)

@step
def end(self):
"""
Finish flow.
"""
import logging

logging.info("Add features flow complete!")


if __name__ == "__main__":
AddFeaturesFlow()
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ def sjoin_df_epc_listed_buildings(
pd.DataFrame: EPC UPRNs in listed buildings
"""
epc_gdf = lat_lon.generate_gdf_uprn_coords(df=epc_df, usecols=["UPRN"])

# Some EPC rows have invalid geometry e.g. `POINT(NaN NaN)` because they have invalid UPRNs
valid_rows = epc_gdf["geometry"].is_valid.sum()
logging.warning(
f"{len(epc_gdf) - valid_rows} EPC rows with invalid point geometries cannot be matched to listed buildings."
)
epc_gdf = epc_gdf[epc_gdf["geometry"].is_valid]

if any(
[
expr in listed_buildings_gdf.geom_type.unique()
Expand Down