Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ output:
- Policy discussion analysis
- Keyword and topic-based filtering

### Overton
- Government publications
- UK and international searches
- Keyword search with LLM validation

## 📊 Outputs

### Generated Files
Expand Down
29 changes: 29 additions & 0 deletions discovery_mission_radar/config/llm_prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ missions:
- The description is generic energy/research text without a clear category link
- Content better fits another configured category
Default to 'no' if uncertain or ambiguous.
overton: |
Mark 'yes' only if the policy document clearly relates to the configured category in the correct sense (energy technology/solution) and contains at least one concrete cue such as:
- A policy, regulation, programme, target, deployment, funding, evidence, performance or implementation point linked to the category
- A technology, intervention, component, system, cost/benefit or pathway explicitly tied to the category

Consider the source/series/year metadata to disambiguate generic mentions:
- Source type should typically be government or IGO; venue should be relevant public bodies
- Series like Transcript/Press Release are not relevant
- Use snippet/highlights as match context; prefer substantive documents

Mark 'no' if any of the following:
- The term appears only in passing, list-like, or generic policy text without a clear link to the category
- The usage is a different meaning/sense than the configured category
- Content better fits another configured category
Default to 'no' if uncertain or ambiguous.
default: |
Mark 'yes' only if the text explicitly mentions the configured category in the correct sense AND includes at least one
nearby contextual cue evidencing a real connection (example, measure, implementation, funding, regulation, target, performance).
Expand Down Expand Up @@ -79,6 +94,20 @@ missions:
- The description is generic health/food/research text without a clear category link
- Content better fits another configured category
Default to 'no' if uncertain or ambiguous.
overton: |
Mark 'yes' only if the policy document clearly relates to the configured category in the correct sense (as defined by the topic scope) and includes at least one concrete cue such as:
- An intervention, service, technology, programme, regulation, funding, outcome/measurement, implementation or performance point explicitly tied to the category

Consider the source/series/year metadata to disambiguate generic mentions:
- Source type should typically be government or IGO; venue should be relevant public bodies
- Series like Transcript/Press Release are not relevant
- Use snippet/highlights as match context; prefer substantive documents

Mark 'no' if any of the following:
- The term appears only in passing, list-like, or generic policy/health text without a clear link to the category
- The usage is a different meaning/sense than the configured category
- Content better fits another configured category
Default to 'no' if uncertain or ambiguous.
default: |
Mark 'yes' only if the text explicitly mentions the configured category in the correct sense AND includes at least one
nearby contextual cue evidencing a real connection (example, measure, implementation, funding, regulation, target, performance).
Expand Down
28 changes: 28 additions & 0 deletions discovery_mission_radar/config/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ data_sources:
"weight"
]

overton:
enabled: true
# Only configurable knob; all other behaviour is fixed in code
window_months: 60
# Shared core UK policy sources used across missions
core_sources:
- govuk
- ukparliament_select
- ukparliament
- legislationgovuk
- onsgovuk
- scottishgovernment
- govwales
- northernirelandgovuk
# Mission-specific additions (merged with core_sources in implementation)
mission_sources:
ASF:
- theccc
AHL:
- nice
- nhsengland
- foodgovuk
international:
enabled_per_mission:
AHL: true
ASF: false

# Categories for cross-topic radar charts
categories_to_show:
ASF:
Expand All @@ -149,6 +176,7 @@ categories_to_show:
- "food_retail_analytics"
- "health_food_meal_replacement"
- "advanced_food_production"
- "glp_adjacent"

# Output settings
output:
Expand Down
2 changes: 2 additions & 0 deletions discovery_mission_radar/pipeline/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .crunchbase_analysis import CrunchbaseAnalysisModule
from .gtr_analysis import GtrAnalysisModule
from .hansard_analysis import HansardAnalysisModule
from .overton_analysis import OvertonAnalysisModule

# Import aggregation and consolidation functions
from .aggregation import produce_radar_charts
Expand All @@ -25,6 +26,7 @@
'CrunchbaseAnalysisModule',
'GtrAnalysisModule',
'HansardAnalysisModule',
'OvertonAnalysisModule',

# Aggregation functions
'produce_radar_charts',
Expand Down
55 changes: 54 additions & 1 deletion discovery_mission_radar/pipeline/analysis/consolidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def consolidate_all_topics(topic_results: List[Dict[str, Any]], output_dir: Path
ukri_stats, ukri_stats_quarterly, ukri_projects = _consolidate_gtr_data(topic_results, mission)

hansard_stats, hansard_stats_quarterly, hansard_speeches = _consolidate_hansard_data(topic_results, mission)

overton_stats, overton_stats_quarterly = _consolidate_overton_data(topic_results, mission)

# Define data to save with their corresponding filenames
data_to_save = [
Expand All @@ -47,7 +49,9 @@ def consolidate_all_topics(topic_results: List[Dict[str, Any]], output_dir: Path
(ukri_projects, "ukri_projects.csv"),
(hansard_stats, "hansard_stats.csv"),
(hansard_stats_quarterly, "hansard_stats_quarterly.csv"),
(hansard_speeches, "hansard_speeches.csv")
(hansard_speeches, "hansard_speeches.csv"),
(overton_stats, "overton_stats.csv"),
(overton_stats_quarterly, "overton_stats_quarterly.csv")
]

# Save each non-empty dataframe
Expand All @@ -66,6 +70,7 @@ def consolidate_all_topics(topic_results: List[Dict[str, Any]], output_dir: Path
cb_stats, cb_stats_quarterly, cb_companies, cb_funding, cb_ipos, cb_acquisitions,
ukri_stats, ukri_stats_quarterly, ukri_projects,
hansard_stats, hansard_stats_quarterly, hansard_speeches,
overton_stats, overton_stats_quarterly,
config.google_sheets_id
)
except Exception as e:
Expand Down Expand Up @@ -217,11 +222,57 @@ def _consolidate_hansard_data(topic_results: List[Dict[str, Any]], mission: str)
pd.concat(all_speeches, ignore_index=True) if all_speeches else pd.DataFrame()
)

def _consolidate_overton_data(topic_results: List[Dict[str, Any]], mission: str) -> tuple:
"""Consolidate Overton data (yearly and quarterly counts) from all topics.

Reads ts_yearly.csv and ts_quarterly.csv from per-topic overton directories (UK and International)
and stacks them with topic and pass labels for upload/aggregation.
"""
all_yearly = []
all_quarterly = []
base_outputs = Path("outputs") / mission
for topic_result in topic_results:
topic_name = topic_result.get('topic_name')
if not topic_name:
continue
for label in ["overton_uk", "overton_international"]:
pass_name = "uk" if label.endswith("uk") else "international"
topic_dir = base_outputs / topic_name / label
yearly_fp = topic_dir / "ts_yearly.csv"
quarterly_fp = topic_dir / "ts_quarterly.csv"
try:
if yearly_fp.exists():
dfy = pd.read_csv(yearly_fp)
if not dfy.empty and set(["year", "count"]).issubset(dfy.columns):
dfy["topic"] = topic_name
dfy["pass"] = pass_name
all_yearly.append(dfy)
except Exception as e:
logger.warning(f"Failed to read Overton yearly for {topic_name}/{pass_name}: {e}")
try:
if quarterly_fp.exists():
dfq = pd.read_csv(quarterly_fp)
# accept either (year,quarter,count) or (quarter,count)
if not dfq.empty:
if "quarter" not in dfq.columns and {"year", "quarter"}.issubset(dfq.columns):
dfq["quarter"] = dfq.apply(lambda r: f"{int(r['year'])}-Q{int(r['quarter'])}", axis=1)
dfq = dfq[["quarter", "count"]]
if set(["quarter", "count"]).issubset(dfq.columns):
dfq["topic"] = topic_name
dfq["pass"] = pass_name
all_quarterly.append(dfq[["quarter", "count", "topic", "pass"]])
except Exception as e:
logger.warning(f"Failed to read Overton quarterly for {topic_name}/{pass_name}: {e}")
yearly_out = pd.concat(all_yearly, ignore_index=True) if all_yearly else pd.DataFrame()
quarterly_out = pd.concat(all_quarterly, ignore_index=True) if all_quarterly else pd.DataFrame()
return yearly_out, quarterly_out

def _upload_to_google_sheets(
cb_stats: pd.DataFrame, cb_stats_quarterly: pd.DataFrame, cb_companies: pd.DataFrame,
cb_funding: pd.DataFrame, cb_ipos: pd.DataFrame, cb_acquisitions: pd.DataFrame,
ukri_stats: pd.DataFrame, ukri_stats_quarterly: pd.DataFrame, ukri_projects: pd.DataFrame,
hansard_stats: pd.DataFrame, hansard_stats_quarterly: pd.DataFrame, hansard_speeches: pd.DataFrame,
overton_stats: pd.DataFrame, overton_stats_quarterly: pd.DataFrame,
sheet_id: str
) -> None:
"""Upload consolidated DataFrames to Google Sheets.
Expand Down Expand Up @@ -251,6 +302,8 @@ def _upload_to_google_sheets(
# TODO: bug - sheet upload error - input contains more than the maximum of 50000 characters in a single cell
# upload manually
#'hansard_speeches': hansard_speeches
'overton_stats': overton_stats,
'overton_stats_quarterly': overton_stats_quarterly,
}

# Add non-empty dataframes to the dataframes dict
Expand Down
186 changes: 186 additions & 0 deletions discovery_mission_radar/pipeline/analysis/overton_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""Overton Analysis Module.

Provides output writing for Overton results, separating concerns from the
getter/data source. Mirrors the folder structure and artefacts expected
by the pipeline: UK and International outputs side-by-side.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import json
import pandas as pd
from discovery_utils.utils import charts
import altair as alt
from .base import BaseAnalysisModule

try: # Optional typing/import safety
from discovery_utils.getters.overton import OvertonGetter # type: ignore
except Exception: # pragma: no cover
OvertonGetter = object # type: ignore


class OvertonAnalysisModule(BaseAnalysisModule[OvertonGetter]):
"""Overton analysis/output writer, aligned with BaseAnalysisModule.

Overton fetching, filtering and selection happen in the data source; this
module focuses on formatting outputs (CSV/JSON) and simple charts. The
BaseAnalysisModule abstract methods are implemented minimally to satisfy
the interface, but the pipeline calls `write_outputs` directly.
"""

def __init__(self, mission: str):
super().__init__("overton", mission)

def _create_default_getter(self) -> OvertonGetter: # type: ignore[override]
try:
from discovery_utils.getters.overton import OvertonGetter as _OG # type: ignore
return _OG()
except Exception:
return OvertonGetter() # type: ignore

def _process_topic_data(self, topic_data: Dict[str, Any], getter: OvertonGetter) -> Dict[str, pd.DataFrame]: # type: ignore[override]
# Not used in current flow; return empty placeholders.
return {
'ts_yearly': pd.DataFrame(),
'ts_quarterly': pd.DataFrame(),
}

def _generate_custom_stats(self, analysis_results: Dict[str, pd.DataFrame], topic_data: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[override]
return {}

def _create_source_charts(self, analysis_results: Dict[str, pd.DataFrame], charts_dir: Path, category_name: str, scale_factor: int): # type: ignore[override]
return []

def write_outputs(self, topic_cfg: Dict[str, Any], mission: str, uk_df: pd.DataFrame,
intl_df: Optional[pd.DataFrame], uk_facets: Dict[str, Any],
intl_facets: Dict[str, Any], uk_summary: Dict[str, Any],
intl_summary: Dict[str, Any]) -> None:
category_name = (topic_cfg.get("search_recipe", {}) or {}).get("category_name", "topic")
topic_slug = self._slugify(category_name)
base = Path("outputs") / mission / topic_slug

uk_dir = base / "overton_uk"
uk_dir.mkdir(parents=True, exist_ok=True)
self._write_outputs_for(uk_dir, uk_df, uk_facets, uk_summary, category_name)

if intl_df is not None and not intl_df.empty:
intl_dir = base / "overton_international"
intl_dir.mkdir(parents=True, exist_ok=True)
self._write_outputs_for(intl_dir, intl_df, intl_facets, intl_summary, category_name)

def _slugify(self, text: str) -> str:
import re
t = (text or "").strip().lower()
if not t:
return "topic"
slug = re.sub(r"[^a-z0-9]+", "_", t)
slug = re.sub(r"_+", "_", slug).strip("_")
return slug or "topic"

def _write_outputs_for(self, outdir: Path, df: pd.DataFrame, facets: Dict[str, Any], summary: Dict[str, Any], category_name: str) -> None:
outdir.mkdir(parents=True, exist_ok=True)

# selected_documents.csv
cols = [
"id",
"title",
"abstract",
"publication_year",
"venue",
"source_country",
"source_type",
"overton_policy_document_series",
"overton_url",
"similarity_score",
]
if df is not None and not df.empty:
available_cols = [c for c in cols if c in df.columns]
if available_cols:
df[available_cols].to_csv(outdir / "selected_documents.csv", index=False)
else:
pd.DataFrame(columns=[c for c in cols if c != "similarity_score"]).to_csv(outdir / "selected_documents.csv", index=False)
else:
pd.DataFrame(columns=[c for c in cols if c != "similarity_score"]).to_csv(outdir / "selected_documents.csv", index=False)

# facets.json
try:
with (outdir / "facets.json").open("w", encoding="utf-8") as f:
json.dump(facets or {}, f, ensure_ascii=False, indent=2)
except Exception:
with (outdir / "facets.json").open("w", encoding="utf-8") as f:
json.dump({}, f, ensure_ascii=False, indent=2)

# summary.json
with (outdir / "summary.json").open("w", encoding="utf-8") as f:
json.dump(summary or {}, f, ensure_ascii=False, indent=2)

# Time series and charts
yearly_df, quarterly_df = self._write_timeseries(df, outdir)
self._write_charts(yearly_df, quarterly_df, outdir, category_name)

def _write_timeseries(self, df: pd.DataFrame, outdir: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:
yearly = pd.DataFrame(columns=["year", "count"]) # default empty
quarterly = pd.DataFrame(columns=["quarter", "count"]) # default empty
# Yearly
if df is not None and not df.empty and "publication_year" in df.columns:
yearly = (
pd.DataFrame({"year": pd.to_numeric(df["publication_year"], errors="coerce")})
.dropna()
.astype({"year": "Int64"})
.groupby("year").size().reset_index(name="count")
.sort_values("year")
)
yearly.to_csv(outdir / "ts_yearly.csv", index=False)
else:
pd.DataFrame(columns=["year", "count"]).to_csv(outdir / "ts_yearly.csv", index=False)

# Quarterly
date_col = None
if df is not None and not df.empty:
for c in ["published_on", "publication_date", "published_date", "added_on", "date"]:
if c in df.columns:
date_col = c
break
if date_col is not None:
dt = pd.to_datetime(df[date_col], errors="coerce")
qdf = (
pd.DataFrame({"year": dt.dt.year, "q": dt.dt.quarter})
.dropna()
.astype({"year": "Int64", "q": "Int64"})
)
qdf["quarter"] = qdf.apply(lambda r: f"{int(r['year'])}-Q{int(r['q'])}", axis=1)
quarterly = (
qdf.groupby(["quarter"]).size().reset_index(name="count").sort_values(["quarter"])
)
quarterly.to_csv(outdir / "ts_quarterly.csv", index=False)
else:
pd.DataFrame(columns=["quarter", "count"]).to_csv(outdir / "ts_quarterly.csv", index=False)
return yearly, quarterly

def _write_charts(self, yearly_df: pd.DataFrame, quarterly_df: pd.DataFrame, outdir: Path, category_name: str) -> None:
# Match BaseAnalysisModule behaviour: temporarily set default theme
current_theme = alt.themes.active
alt.themes.enable('default')
try:
scale_factor = 2
# Yearly docs chart
if isinstance(yearly_df, pd.DataFrame) and not yearly_df.empty:
fig = charts.ts_bar(yearly_df, variable="count", variable_title="Number of policy documents")
fig = charts.configure_plots(fig, chart_title=f"Policy documents per year ({category_name})")
fig.save(str(outdir / "ts_yearly.png"), scale_factor=scale_factor)
# Quarterly docs chart
if isinstance(quarterly_df, pd.DataFrame) and not quarterly_df.empty:
fig = charts.ts_bar(quarterly_df, variable="count", variable_title="Number of policy documents", time_column="quarter")
fig = charts.configure_plots(fig, chart_title=f"Policy documents per quarter ({category_name})")
fig.save(str(outdir / "ts_quarterly.png"), scale_factor=scale_factor)
except Exception:
# Never fail pipeline due to chart errors
pass
finally:
# Restore original theme
alt.themes.enable(current_theme)


Loading