Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
07df38e
add draft garden size flow script `run_calculate_garden_size_flow.py`
crispy-wonton Apr 14, 2025
2f8c2ac
fix bugs in `run_calculate_garden_size_flow.py` and add script docume…
crispy-wonton Apr 14, 2025
519b615
set default `nations` parameter value in `run_calculate_garden_size_f…
crispy-wonton Apr 15, 2025
3840997
add `batch` decorators to `run_calculate_garden_size_flow.py`
crispy-wonton Apr 15, 2025
d52d126
add `batch` resources to `run_calculate_garden_size_flow.py`
crispy-wonton Apr 15, 2025
94267ec
add MANIFEST.in file and modify setup files to allow pip install of repo
crispy-wonton Jun 3, 2025
857def4
add metaflow to requirements.txt
crispy-wonton Jun 3, 2025
3db749b
add parallel_utils.py with function to chunk df
crispy-wonton Jun 3, 2025
85f3550
update CalculateGardenSize flow to improve chunking
crispy-wonton Jun 3, 2025
1dd4871
add end step to CalculateGardenSize flow
crispy-wonton Jun 3, 2025
56bd60c
set up CalculateGardenSize script to use sample dataset for testing
crispy-wonton Jun 3, 2025
ad6d805
fix bug in CalculateGardenSize flow
crispy-wonton Jun 3, 2025
d0ae871
correct branch name in CalculateGardenSize flow
crispy-wonton Jun 3, 2025
9ef1d93
cast NATIONALCADASTRALREFERENCE as string in CalculateGardenSize flow
crispy-wonton Jun 4, 2025
45b8a37
scale CalculateGardenSizeFlow to full dataset
crispy-wonton Jun 11, 2025
3849468
add documentation to CalculateGardenSize flow and set to run on sample
crispy-wonton Jun 11, 2025
a430583
comment out line in CalculateGardenSizeFlow for running with sample
crispy-wonton Jun 11, 2025
746f1f9
remove max-num-splits param from run instructions in `run_calculate_g…
crispy-wonton Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include asf_heat_pump_suitability/config/*.yaml
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
Flow to calculate garden area (m2) where possible for properties in the domestic EPC register using Land Registry data and
Microsoft Building Footprints data.

To run:
python asf_heat_pump_suitability/pipeline/run_scripts/run_calculate_garden_size_flow.py run --epc [path/to/EPC/data] --year [YYYY] --quarter [Q] --nations ews --max-num-splits 400

[Set --nations flag to "ew" or "s" for generating garden size estimates for either England and Wales or Scotland INSPIRE
files only.]

NB: this flow takes the preprocessed and deduplicated EPC dataset in parquet file format.
"""

from metaflow import FlowSpec, step, Parameter, batch


class CalculateGardenSizeFlow(FlowSpec):

# Parameters
epc = Parameter(
name="epc",
help="Path to processed and deduplicated EPC dataset in parquet file format",
type=str,
required=True,
)

year = Parameter(
name="year",
help="EPC data year. Format YYYY",
type=int,
required=True,
)

quarter = Parameter(
name="quarter",
help="EPC data quarter",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="EPC data quarter",
help="EPC data quarter, 1-4",

type=int,
required=True,
)

nations = Parameter(
name="nations",
help="Nations to get INSPIRE land registry file bounds for. Select from England and Wales (ew); Scotland (s); or all (ews).",
type=str,
required=True,
default="ews",
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely want to run a debug flag here where we are able to run just through the sample of 3 building footprint + land registry file pairs

@step
def start(self):
"""
Load datasets and start flow.
"""
import logging
import polars as pl
import geopandas as gpd
from asf_heat_pump_suitability.utils import parallel_utils
from asf_heat_pump_suitability.pipeline.prepare_features import (
building_footprint,
garden_size,
lat_lon,
)

logging.info("Load EPC UPRNs")
epc_df = pl.read_parquet(self.epc, columns=["UPRN"])

logging.info("Adding lat/lon data to EPC")
uprn_coords_df = lat_lon.transform_df_osopen_uprn_latlon()
epc_df = epc_df.join(uprn_coords_df, how="left", on="UPRN")
self.epc_gdf = lat_lon.generate_gdf_uprn_coords(epc_df, usecols=["UPRN"])[
["UPRN", "geometry"]
]

logging.info("Loading land registry file boundaries")
land_file_bounds = gpd.read_file(
f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/inspire_file_bounds_{self.nations.upper()}.geojson"
)
microsoft_file_bounds = building_footprint.transform_df_uk_dataset_links()

# Match land extent files with overlapping building footprint files
file_matches = garden_size.match_series_files_land_building(
land_files_gdf=land_file_bounds, building_files_gdf=microsoft_file_bounds
)
# self.chunked_file_matches = parallel_utils.chunk_df(file_matches, size=30)

# # TODO remove before merge
self.chunked_file_matches = parallel_utils.chunk_df(
file_matches.sample(3), size=1
)

logging.info(
f"Estimating garden size for properties across {len(file_matches)} pairs of land extent and building footprint files."
)

self.next(self.estimate_garden_size, foreach="chunked_file_matches")

@batch(cpu=2, memory=16000)
@step
def estimate_garden_size(self):
"""
Estimate garden size per property using land registry polygons and building footprints.
"""
import os

# TODO update to dev before merge
os.system(
"pip install git+https://github.com/nestauk/asf_heat_pump_suitability.git@152_parallelise_garden_script"
)

import shapely
import geopandas as gpd
import polars as pl
from asf_heat_pump_suitability.pipeline.prepare_features import (
building_footprint,
garden_size,
land_extent,
)

prev = None
self.epc_gardens = []

for land_file, building_file in self.input.items():
if land_file != prev:
# Prepare land parcel data
land_parcels_gdf = land_extent.transform_gdf_land_parcels(
f"s3://{land_file}"
)

# Prepare building footprints data
try:
building_footprints_gdf = (
building_footprint.transform_gdf_building_footprints(building_file)
)
except shapely.errors.GEOSException as e:
print(
f"Error loading building footprint file {building_file}. Error message: {e}.\n"
f"Skipping this land extent & building footprint pairing."
)
continue
else:
building_footprints_gdf["microsoft_building_footprint_file"] = (
building_file
)

# Get intersection of building footprint polygons and land polygons
intersection_gdf = garden_size.generate_gdf_land_building_overlay(
land_parcels_gdf=land_parcels_gdf,
building_footprints_gdf=building_footprints_gdf,
)

# Get garden size
gardens_gdf = garden_size.generate_gdf_garden_size(
intersection_gdf, land_parcels_gdf
)
gardens_gdf = gardens_gdf.assign(
inspire_land_extent_file=land_file,
microsoft_building_footprint_file=building_file,
)

# Match EPC UPRNs with land parcels and gardens using UPRN coordinates
# This will keep only EPC records for which garden size can be estimated
epc_df = gpd.sjoin(
self.epc_gdf,
gardens_gdf,
how="inner",
predicate="intersects",
).drop(columns=["geometry", "index_right"])

epc_df = pl.from_pandas(epc_df)
self.epc_gardens.append(
epc_df.with_columns(
pl.col("NATIONALCADASTRALREFERENCE").cast(pl.String)
)
)

self.next(self.concatenate_garden_size_dfs)

@step
def concatenate_garden_size_dfs(self, inputs):
"""
Concatenate estimated garden size data into one dataframe.
"""
import itertools
import polars as pl
import logging

self.epc_gardens_df = pl.concat(
list(itertools.chain.from_iterable([input.epc_gardens for input in inputs]))
)
logging.info(
f"Garden size calculated for {len(self.epc_gardens_df)} EPC properties in total."
)
self.next(self.save_outputs)

# @batch(cpu=2, memory=16000)
@step
def save_outputs(self):
"""
Save outputs to S3.
"""
import polars as pl
from asf_heat_pump_suitability.utils import save_utils
from asf_heat_pump_suitability.pipeline.prepare_features import garden_size

# save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}.parquet"
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_SAMPLE.parquet"
save_utils.save_to_s3(self.epc_gardens_df, save_as)

self.epc_gardens_df = self.epc_gardens_df.with_columns(
pl.col(pl.Float64).round(2)
)
self.epc_gardens_df = garden_size.deduplicate_df_garden_size(
self.epc_gardens_df
)

# save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_deduplicated.parquet"
save_as = f"s3://asf-heat-pump-suitability/outputs/{self.year}Q{self.quarter}/gardens/{self.year}_Q{self.quarter}_EPC_garden_size_estimates_{self.nations.upper()}_deduplicated_SAMPLE.parquet"
save_utils.save_to_s3(self.epc_gardens_df, save_as)

self.next(self.end)

@step
def end(self):
"""
Finish flow.
"""
import logging

logging.info("Calculate garden size flow complete!")


if __name__ == "__main__":
CalculateGardenSizeFlow()
16 changes: 16 additions & 0 deletions asf_heat_pump_suitability/utils/parallel_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import List
import pandas as pd


def chunk_df(df: pd.DataFrame, size: int) -> List[pd.DataFrame]:
"""
Split dataframe into chunks of specified size.

Args:
df (pl.DataFrame): dataframe
size (int): number of records per chunk

Returns:
List[pl.DataFrame]: list of dataframe chunks
"""
return [df.iloc[i : i + size] for i in range(0, len(df), size)]
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ tenacity
numpy<2.0.0
fiona
bs4
metaflow
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ per-file-ignores =
# Don't pester about parameters for a one-line docstring
strictness=short
docstring_style=google

[options]
include_package_data = True
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""asf_heat_pump_suitability."""

from pathlib import Path
from setuptools import find_packages
from setuptools import setup
Expand All @@ -18,7 +19,9 @@ def read_lines(path):
long_description=open(BASE_DIR / "README.md").read(),
install_requires=read_lines(BASE_DIR / "requirements.txt"),
extras_require={"dev": read_lines(BASE_DIR / "requirements_dev.txt")},
include_package_data=True,
packages=find_packages(exclude=["docs"]),
package_data={"": ["*.txt", "*.yaml"]},
version="0.1.0",
description="Early-stage scoping of a project to identify which homes/streets are likely to be suitable (or unsuitable) for which types of heat pumps.",
author="Nesta",
Expand Down