-
Notifications
You must be signed in to change notification settings - Fork 1
162 metaflow add features #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
this is due to memory errors. parallelisation can be reimplemented later with batch
required=True, | ||
) | ||
|
||
@step |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi, the overall order of adding the features has changed so please review that the general order makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the thinking behind the change in the general order?
).with_columns( | ||
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]")) | ||
.then(False) | ||
.otherwise(True) | ||
.alias("valid_UPRN") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L64-69 are new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to add
|
||
@step | ||
def clean_property_type(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_output_areas(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_lat_lon_data(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
pl.when( | ||
(pl.col("lad_conservation_area_data_available_ew")) | ||
& pl.col("COUNTRY").is_in(["England", "Wales"]) | ||
& pl.col("valid_UPRN") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L164 is new, otherwise this step is Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_average_garden_size_per_msoa(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_property_density(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_off_gas_flag(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
).with_columns( | ||
pl.when(pl.col("valid_UPRN")) | ||
.then(pl.col("listed_building").fill_null(False)) | ||
.otherwise(pl.col("listed_building")) | ||
.alias("listed_building") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L239-244 new and need review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false?
|
||
@step | ||
def add_grid_capacity(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def add_anchor_property_flag(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchanged from original code except to refactor to metaflow
syntax
|
||
@step | ||
def save_output(self): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor changes here
from metaflow import FlowSpec, step, batch, Parameter | ||
|
||
|
||
class AddFeaturesFlow(FlowSpec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L22-42 refactored from arg parse into metaflow
params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Roisín!
Thanks for this, good work as per usual. I was able to run everything and generate the output!
Have one 'dumb' question, if we aren't parallelising the scripts or sending them off to the batch machines, what are the advantages of using metaflow here than our regular old pipeline?
In the doc string at the topc of the script, it says we are adding the Scotland Heritage sites, but as far as I can see, I don't think that's occuring in any of the steps? If not explicitly.
I think we should add a separate issue for this pipeline around the data quality checks using pandera decorators and the relevant schema, might have to do some thinking around how to incorporate it (suspect it might be quite a bit of copy paste), but think this would be very effective here.
Minor comment/suggestion, should we create a .env_template, to define enviroment variables such as 'METAFLOW_USER', so others that use the code are aware of some environment variables they have to define before running these scripts.
Final question is around how are you planning on orchestrating the flow overall (not just for this metaflow script, but all of them)? This is more out of curiosity than anything!
required=True, | ||
) | ||
|
||
@step |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the thinking behind the change in the general order?
|
||
quarter = Parameter( | ||
name="quarter", | ||
help="EPC data quarter", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistency across scripts, just add the range of quarters here too (1-4)
).with_columns( | ||
pl.when(pl.col("UPRN").str.contains(r"[a-zA-Z]")) | ||
.then(False) | ||
.otherwise(True) | ||
.alias("valid_UPRN") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to add
from asf_heat_pump_suitability.pipeline.reweight_epc import prepare_sample | ||
|
||
logging.info("Cleaning property type data in EPC") | ||
# Below we process the EPC `PROPERTY_TYPE` column and rename, then drop the original column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brief description of what cleaning is done would be nice as that's where my mind went straight to
) | ||
|
||
# TODO this is far too slow and is only used to correct some EPC records with incorrect postcodes which get joined to the wrong LSOA/MSOA | ||
# TODO can we filter the dataset somehow and only do the join with incorrect postcodes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it would be a good solution, guess you would be able to update the main dataframe fairly easily. Is the issue around how we would go about filtering the dataset?
).with_columns( | ||
pl.when(pl.col("valid_UPRN")) | ||
.then(pl.col("listed_building").fill_null(False)) | ||
.otherwise(pl.col("listed_building")) | ||
.alias("listed_building") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we join on the UPRN, and therefore we can't be sure about invalid UPRNs and their listed building status even if they are false?
self.epc_df = self.epc_df.with_columns( | ||
pl.when( | ||
(pl.col("lad_conservation_area_data_available_ew")) | ||
& pl.col("COUNTRY").is_in(["England", "Wales"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we add the Scotland World Heritage Site flag at any point? if not, we should remove from doc string at top of doc
NB: this pipeline takes the preprocessed and deduplicated EPC dataset in parquet file format. | ||
""" | ||
|
||
from metaflow import FlowSpec, step, batch, Parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch is unused so we should delete here
""" | ||
from asf_heat_pump_suitability.utils import save_utils | ||
|
||
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/features/{self.year}_Q{self.quarter}_EPC_features_test.parquet" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When merging into dev we should get rid of the 'test' at the end of this filename?
Fixes #162
Description
Refactor script to add new features to EPC data into
metaflow
flow.New files:
pipeline/flows/run_add_features_flow.py
- new metaflow script to add features to EPC. Refactored frompipeline/run_scripts/run_add_features.py
NB: I didn't add any parallelisation into this flow. I initially tested out parallelising the steps to add each feature to EPC so they all get added concurrently. However, this resulted in local memory errors and crashing of the pipeline and my computer. I didn't think the time gains would be significant enough to warrant parallelisation here because of the setup time for each batch step. Therefore, I didn't pursue parallelisation and instead kept it as a linear flow.
Note, I checked that the outputs are the same for this script vs the
run_add_features.py
script (for those columns where the underlying code is unchanged). Here's the code snippet I used. The assertion statement passed:Modified files:
pipeline/prepare_features/listed_buildings.py
- update one of the functions to drop EPC rows which have invalid geometry before conductingsjoin
with listed buildings geometry data. This is because the invalid geometries were causingshapely
GEOSError
exceptions when attempting the spatial join between EPC and listed buildings data. I'm not sure why this wasn't happening before when we ran this script. I tested therun_add_features.py
script before updating the join function and it was throwing the same errors so not sure what changed since we last ran it to make the pipeline error out now...perhaps something ingeopandas
was updated.Instructions for Reviewer
In order to test the code in this PR you need to ...
run the following lines:
This will run the full script and save an output to S3. It took ~2hrs for me at home - in the office it should be much faster. Please let me know the time taken if you do run in office! If it's taking too long, feel free to kill the run. As it's not a complex flow, I think that as long as it starts up successfully then it's probably ok and I have run it all the way through.
Please pay special attention to ...
Checklist:
notebooks/
pre-commit
and addressed any issues not automatically fixeddev
README
s