Skip to content

Conversation

crispy-wonton
Copy link
Collaborator

@crispy-wonton crispy-wonton commented Jun 19, 2025

Fixes #162

Description

Refactor script to add new features to EPC data into metaflow flow.

New files:

  • pipeline/flows/run_add_features_flow.py - new metaflow script to add features to EPC. Refactored from pipeline/run_scripts/run_add_features.py

NB: I didn't add any parallelisation into this flow. I initially tested out parallelising the steps to add each feature to EPC so they all get added concurrently. However, this resulted in local memory errors and crashing of the pipeline and my computer. I didn't think the time gains would be significant enough to warrant parallelisation here because of the setup time for each batch step. Therefore, I didn't pursue parallelisation and instead kept it as a linear flow.

Note, I checked that the outputs are the same for this script vs the run_add_features.py script (for those columns where the underlying code is unchanged). Here's the code snippet I used. The assertion statement passed:

# New dataframe from the refactored metaflow script
new_df = pl.read_parquet("s3://asf-heat-pump-suitability/outputs/2023Q4/features/2023_Q4_EPC_features_test.parquet")

# Old df from the original script
old_df = pl.read_parquet("s3://asf-heat-pump-suitability/outputs/2023Q4/features/2023_Q4_EPC_features.parquet")

# Drop columns which are new or where processing has changed
 _new_df = df.drop(["listed_building", "in_protected_area", "valid_UPRN", "lad_code", "lad_conservation_area_data_available_ew"])
_old_df = _old_df.select(_new_df.columns)
testing.assert_frame_equal(_new_df, _old_df)

Modified files:

  • pipeline/prepare_features/listed_buildings.py - update one of the functions to drop EPC rows which have invalid geometry before conducting sjoin with listed buildings geometry data. This is because the invalid geometries were causing shapely GEOSError exceptions when attempting the spatial join between EPC and listed buildings data. I'm not sure why this wasn't happening before when we ran this script. I tested the run_add_features.py script before updating the join function and it was throwing the same errors so not sure what changed since we last ran it to make the pipeline error out now...perhaps something in geopandas was updated.

Instructions for Reviewer

In order to test the code in this PR you need to ...
run the following lines:

export METAFLOW_USER=yournamewithnospacesorpunctuation
python asf_heat_pump_suitability/pipeline/flows/run_add_features_flow.py --datastore=s3 run --epc_path s3://asf-daps/lakehouse/processed/epc/old/deduplicated/processed_dedupl-0.parquet --year 2023 --quarter 4

This will run the full script and save an output to S3. It took ~2hrs for me at home - in the office it should be much faster. Please let me know the time taken if you do run in office! If it's taking too long, feel free to kill the run. As it's not a complex flow, I think that as long as it starts up successfully then it's probably ok and I have run it all the way through.

Please pay special attention to ...

  • the logic of the refactored flow steps where it has changed from the original script. I've indicated the parts of the script that don't need a thorough review because they are unchanged (except to refactor into metaflow syntax). Please still do review these steps for the new syntax but the logic has been reviewed. And please do review the general overall flow and the documentation.

Checklist:

  • I have refactored my code out from notebooks/
  • I have checked the code runs
  • I have tested the code
  • I have run pre-commit and addressed any issues not automatically fixed
  • I have merged any new changes from dev
  • I have documented the code
    • Major functions have docstrings
    • Appropriate information has been added to READMEs
  • I have explained this PR above
  • I have requested a code review

required=True,
)

@step
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the overall order of adding the features has changed so please review that the general order makes sense

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the thinking behind the change in the general order?

Comment on lines +64 to +69
).with_columns(
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]"))
.then(False)
.otherwise(True)
.alias("valid_UPRN")
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L64-69 are new

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to add


@step
def clean_property_type(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def add_output_areas(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def add_lat_lon_data(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

pl.when(
(pl.col("lad_conservation_area_data_available_ew"))
& pl.col("COUNTRY").is_in(["England", "Wales"])
& pl.col("valid_UPRN")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L164 is new, otherwise this step is Unchanged from original code except to refactor to metaflow syntax


@step
def add_average_garden_size_per_msoa(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def add_property_density(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def add_off_gas_flag(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax

Comment on lines +239 to +244
).with_columns(
pl.when(pl.col("valid_UPRN"))
.then(pl.col("listed_building").fill_null(False))
.otherwise(pl.col("listed_building"))
.alias("listed_building")
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L239-244 new and need review

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false?


@step
def add_grid_capacity(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def add_anchor_property_flag(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchanged from original code except to refactor to metaflow syntax


@step
def save_output(self):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor changes here

from metaflow import FlowSpec, step, batch, Parameter


class AddFeaturesFlow(FlowSpec):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L22-42 refactored from arg parse into metaflow params

Copy link
Collaborator

@helloaidank helloaidank left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Roisín!

Thanks for this, good work as per usual. I was able to run everything and generate the output!

Have one 'dumb' question, if we aren't parallelising the scripts or sending them off to the batch machines, what are the advantages of using metaflow here than our regular old pipeline?

In the doc string at the topc of the script, it says we are adding the Scotland Heritage sites, but as far as I can see, I don't think that's occuring in any of the steps? If not explicitly.

I think we should add a separate issue for this pipeline around the data quality checks using pandera decorators and the relevant schema, might have to do some thinking around how to incorporate it (suspect it might be quite a bit of copy paste), but think this would be very effective here.

Minor comment/suggestion, should we create a .env_template, to define enviroment variables such as 'METAFLOW_USER', so others that use the code are aware of some environment variables they have to define before running these scripts.

Final question is around how are you planning on orchestrating the flow overall (not just for this metaflow script, but all of them)? This is more out of curiosity than anything!

required=True,
)

@step
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the thinking behind the change in the general order?


quarter = Parameter(
name="quarter",
help="EPC data quarter",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency across scripts, just add the range of quarters here too (1-4)

Comment on lines +64 to +69
).with_columns(
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]"))
.then(False)
.otherwise(True)
.alias("valid_UPRN")
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to add

from asf_heat_pump_suitability.pipeline.reweight_epc import prepare_sample

logging.info("Cleaning property type data in EPC")
# Below we process the EPC `PROPERTY_TYPE` column and rename, then drop the original column
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brief description of what cleaning is done would be nice as that's where my mind went straight to

)

# TODO this is far too slow and is only used to correct some EPC records with incorrect postcodes which get joined to the wrong LSOA/MSOA
# TODO can we filter the dataset somehow and only do the join with incorrect postcodes?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be a good solution, guess you would be able to update the main dataframe fairly easily. Is the issue around how we would go about filtering the dataset?

Comment on lines +239 to +244
).with_columns(
pl.when(pl.col("valid_UPRN"))
.then(pl.col("listed_building").fill_null(False))
.otherwise(pl.col("listed_building"))
.alias("listed_building")
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false?

self.epc_df = self.epc_df.with_columns(
pl.when(
(pl.col("lad_conservation_area_data_available_ew"))
& pl.col("COUNTRY").is_in(["England", "Wales"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we add the Scotland World Heritage Site flag at any point? if not, we should remove from doc string at top of doc

NB: this pipeline takes the preprocessed and deduplicated EPC dataset in parquet file format.
"""

from metaflow import FlowSpec, step, batch, Parameter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch is unused so we should delete here

"""
from asf_heat_pump_suitability.utils import save_utils

save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/features/{self.year}_Q{self.quarter}_EPC_features_test.parquet"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When merging into dev we should get rid of the 'test' at the end of this filename?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor add features script into metaflow
2 participants