-
Notifications
You must be signed in to change notification settings - Fork 1
152 parallelise garden script #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
crispy-wonton
wants to merge
18
commits into
dev
Choose a base branch
from
152_parallelise_garden_script
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
07df38e
add draft garden size flow script `run_calculate_garden_size_flow.py`
crispy-wonton 2f8c2ac
fix bugs in `run_calculate_garden_size_flow.py` and add script docume…
crispy-wonton 519b615
set default `nations` parameter value in `run_calculate_garden_size_f…
crispy-wonton 3840997
add `batch` decorators to `run_calculate_garden_size_flow.py`
crispy-wonton d52d126
add `batch` resources to `run_calculate_garden_size_flow.py`
crispy-wonton 94267ec
add MANIFEST.in file and modify setup files to allow pip install of repo
crispy-wonton 857def4
add metaflow to requirements.txt
crispy-wonton 3db749b
add parallel_utils.py with function to chunk df
crispy-wonton 85f3550
update CalculateGardenSize flow to improve chunking
crispy-wonton 1dd4871
add end step to CalculateGardenSize flow
crispy-wonton 56bd60c
set up CalculateGardenSize script to use sample dataset for testing
crispy-wonton ad6d805
fix bug in CalculateGardenSize flow
crispy-wonton d0ae871
correct branch name in CalculateGardenSize flow
crispy-wonton 9ef1d93
cast NATIONALCADASTRALREFERENCE as string in CalculateGardenSize flow
crispy-wonton 45b8a37
scale CalculateGardenSizeFlow to full dataset
crispy-wonton 3849468
add documentation to CalculateGardenSize flow and set to run on sample
crispy-wonton a430583
comment out line in CalculateGardenSizeFlow for running with sample
crispy-wonton 746f1f9
remove max-num-splits param from run instructions in `run_calculate_g…
crispy-wonton File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include asf_heat_pump_suitability/config/*.yaml |
Empty file.
233 changes: 233 additions & 0 deletions
233
asf_heat_pump_suitability/pipeline/flows/run_calculate_garden_size_flow.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,233 @@ | ||||||
""" | ||||||
Flow to calculate garden area (m2) where possible for properties in the domestic EPC register using Land Registry data and | ||||||
Microsoft Building Footprints data. | ||||||
|
||||||
To run: | ||||||
python asf_heat_pump_suitability/pipeline/run_scripts/run_calculate_garden_size_flow.py run --epc [path/to/EPC/data] --year [YYYY] --quarter [Q] --nations ews --max-num-splits 400 | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
[Set --nations flag to "ew" or "s" for generating garden size estimates for either England and Wales or Scotland INSPIRE | ||||||
files only.] | ||||||
|
||||||
NB: this flow takes the preprocessed and deduplicated EPC dataset in parquet file format. | ||||||
""" | ||||||
|
||||||
from metaflow import FlowSpec, step, Parameter, batch | ||||||
|
||||||
|
||||||
class CalculateGardenSizeFlow(FlowSpec): | ||||||
|
||||||
# Parameters | ||||||
epc = Parameter( | ||||||
name="epc", | ||||||
help="Path to processed and deduplicated EPC dataset in parquet file format", | ||||||
type=str, | ||||||
required=True, | ||||||
) | ||||||
|
||||||
year = Parameter( | ||||||
name="year", | ||||||
help="EPC data year. Format YYYY", | ||||||
type=int, | ||||||
required=True, | ||||||
) | ||||||
|
||||||
quarter = Parameter( | ||||||
name="quarter", | ||||||
help="EPC data quarter", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
type=int, | ||||||
required=True, | ||||||
) | ||||||
|
||||||
nations = Parameter( | ||||||
name="nations", | ||||||
help="Nations to get INSPIRE land registry file bounds for. Select from England and Wales (ew); Scotland (s); or all (ews).", | ||||||
type=str, | ||||||
required=True, | ||||||
default="ews", | ||||||
) | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likely want to run a debug flag here where we are able to run just through the sample of 3 building footprint + land registry file pairs |
||||||
@step | ||||||
def start(self): | ||||||
""" | ||||||
Load datasets and start flow. | ||||||
""" | ||||||
import logging | ||||||
import polars as pl | ||||||
import geopandas as gpd | ||||||
from asf_heat_pump_suitability.utils import parallel_utils | ||||||
from asf_heat_pump_suitability.pipeline.prepare_features import ( | ||||||
building_footprint, | ||||||
garden_size, | ||||||
lat_lon, | ||||||
) | ||||||
|
||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
logging.info("Load EPC UPRNs") | ||||||
epc_df = pl.read_parquet(self.epc, columns=["UPRN"]) | ||||||
|
||||||
logging.info("Adding lat/lon data to EPC") | ||||||
uprn_coords_df = lat_lon.transform_df_osopen_uprn_latlon() | ||||||
epc_df = epc_df.join(uprn_coords_df, how="left", on="UPRN") | ||||||
self.epc_gdf = lat_lon.generate_gdf_uprn_coords(epc_df, usecols=["UPRN"])[ | ||||||
["UPRN", "geometry"] | ||||||
] | ||||||
|
||||||
logging.info("Loading land registry file boundaries") | ||||||
land_file_bounds = gpd.read_file( | ||||||
f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/inspire_file_bounds_{self.nations.upper()}.geojson" | ||||||
) | ||||||
microsoft_file_bounds = building_footprint.transform_df_uk_dataset_links() | ||||||
|
||||||
# Match land extent files with overlapping building footprint files | ||||||
file_matches = garden_size.match_series_files_land_building( | ||||||
land_files_gdf=land_file_bounds, building_files_gdf=microsoft_file_bounds | ||||||
) | ||||||
# self.chunked_file_matches = parallel_utils.chunk_df(file_matches, size=30) | ||||||
|
||||||
# # TODO remove before merge | ||||||
self.chunked_file_matches = parallel_utils.chunk_df( | ||||||
file_matches.sample(3), size=1 | ||||||
) | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
logging.info( | ||||||
f"Estimating garden size for properties across {len(file_matches)} pairs of land extent and building footprint files." | ||||||
) | ||||||
|
||||||
self.next(self.estimate_garden_size, foreach="chunked_file_matches") | ||||||
|
||||||
@batch(cpu=2, memory=16000) | ||||||
@step | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
def estimate_garden_size(self): | ||||||
""" | ||||||
Estimate garden size per property using land registry polygons and building footprints. | ||||||
""" | ||||||
import os | ||||||
|
||||||
# TODO update to dev before merge | ||||||
os.system( | ||||||
"pip install git+https://github.com/nestauk/asf_heat_pump_suitability.git@152_parallelise_garden_script" | ||||||
) | ||||||
|
||||||
import shapely | ||||||
import geopandas as gpd | ||||||
import polars as pl | ||||||
from asf_heat_pump_suitability.pipeline.prepare_features import ( | ||||||
building_footprint, | ||||||
garden_size, | ||||||
land_extent, | ||||||
) | ||||||
|
||||||
prev = None | ||||||
self.epc_gardens = [] | ||||||
|
||||||
for land_file, building_file in self.input.items(): | ||||||
if land_file != prev: | ||||||
# Prepare land parcel data | ||||||
land_parcels_gdf = land_extent.transform_gdf_land_parcels( | ||||||
f"s3://{land_file}" | ||||||
) | ||||||
|
||||||
# Prepare building footprints data | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
try: | ||||||
building_footprints_gdf = ( | ||||||
building_footprint.transform_gdf_building_footprints(building_file) | ||||||
) | ||||||
except shapely.errors.GEOSException as e: | ||||||
print( | ||||||
f"Error loading building footprint file {building_file}. Error message: {e}.\n" | ||||||
f"Skipping this land extent & building footprint pairing." | ||||||
) | ||||||
continue | ||||||
else: | ||||||
building_footprints_gdf["microsoft_building_footprint_file"] = ( | ||||||
building_file | ||||||
) | ||||||
|
||||||
# Get intersection of building footprint polygons and land polygons | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
intersection_gdf = garden_size.generate_gdf_land_building_overlay( | ||||||
land_parcels_gdf=land_parcels_gdf, | ||||||
building_footprints_gdf=building_footprints_gdf, | ||||||
) | ||||||
|
||||||
# Get garden size | ||||||
gardens_gdf = garden_size.generate_gdf_garden_size( | ||||||
intersection_gdf, land_parcels_gdf | ||||||
) | ||||||
gardens_gdf = gardens_gdf.assign( | ||||||
inspire_land_extent_file=land_file, | ||||||
microsoft_building_footprint_file=building_file, | ||||||
) | ||||||
|
||||||
# Match EPC UPRNs with land parcels and gardens using UPRN coordinates | ||||||
# This will keep only EPC records for which garden size can be estimated | ||||||
epc_df = gpd.sjoin( | ||||||
self.epc_gdf, | ||||||
gardens_gdf, | ||||||
how="inner", | ||||||
predicate="intersects", | ||||||
).drop(columns=["geometry", "index_right"]) | ||||||
|
||||||
epc_df = pl.from_pandas(epc_df) | ||||||
self.epc_gardens.append( | ||||||
epc_df.with_columns( | ||||||
pl.col("NATIONALCADASTRALREFERENCE").cast(pl.String) | ||||||
) | ||||||
) | ||||||
|
||||||
self.next(self.concatenate_garden_size_dfs) | ||||||
|
||||||
@step | ||||||
def concatenate_garden_size_dfs(self, inputs): | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
Concatenate estimated garden size data into one dataframe. | ||||||
""" | ||||||
import itertools | ||||||
import polars as pl | ||||||
import logging | ||||||
|
||||||
self.epc_gardens_df = pl.concat( | ||||||
list(itertools.chain.from_iterable([input.epc_gardens for input in inputs])) | ||||||
) | ||||||
logging.info( | ||||||
f"Garden size calculated for {len(self.epc_gardens_df)} EPC properties in total." | ||||||
) | ||||||
self.next(self.save_outputs) | ||||||
|
||||||
# @batch(cpu=2, memory=16000) | ||||||
@step | ||||||
def save_outputs(self): | ||||||
""" | ||||||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
Save outputs to S3. | ||||||
""" | ||||||
import polars as pl | ||||||
from asf_heat_pump_suitability.utils import save_utils | ||||||
from asf_heat_pump_suitability.pipeline.prepare_features import garden_size | ||||||
|
||||||
# save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}.parquet" | ||||||
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_SAMPLE.parquet" | ||||||
save_utils.save_to_s3(self.epc_gardens_df, save_as) | ||||||
|
||||||
self.epc_gardens_df = self.epc_gardens_df.with_columns( | ||||||
pl.col(pl.Float64).round(2) | ||||||
) | ||||||
self.epc_gardens_df = garden_size.deduplicate_df_garden_size( | ||||||
self.epc_gardens_df | ||||||
) | ||||||
|
||||||
# save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_deduplicated.parquet" | ||||||
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_deduplicated_SAMPLE.parquet" | ||||||
save_utils.save_to_s3(self.epc_gardens_df, save_as) | ||||||
|
||||||
self.next(self.end) | ||||||
|
||||||
@step | ||||||
def end(self): | ||||||
""" | ||||||
Finish flow. | ||||||
""" | ||||||
import logging | ||||||
|
||||||
logging.info("Calculate garden size flow complete!") | ||||||
|
||||||
|
||||||
if __name__ == "__main__": | ||||||
CalculateGardenSizeFlow() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from typing import List | ||
import pandas as pd | ||
|
||
|
||
crispy-wonton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def chunk_df(df: pd.DataFrame, size: int) -> List[pd.DataFrame]: | ||
""" | ||
Split dataframe into chunks of specified size. | ||
|
||
Args: | ||
df (pl.DataFrame): dataframe | ||
size (int): number of records per chunk | ||
|
||
Returns: | ||
List[pl.DataFrame]: list of dataframe chunks | ||
""" | ||
return [df.iloc[i : i + size] for i in range(0, len(df), size)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,3 +23,4 @@ tenacity | |
numpy<2.0.0 | ||
fiona | ||
bs4 | ||
metaflow |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.