Skip to content
Open
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/satellite_consumer/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"""

import datetime as dt
import os
from importlib.metadata import PackageNotFoundError, version

import sentry_sdk
import eumdac.product
from joblib import Parallel, delayed
from loguru import logger as log
Expand All @@ -30,10 +32,23 @@
except PackageNotFoundError:
__version__ = "v?"


# Sentry initialization as per the suggestion
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
environment=os.getenv("ENVIRONMENT", "local"),
traces_sample_rate=1,
)
sentry_sdk.set_tag("app_name", "satellite_consumer")
sentry_sdk.set_tag("version", __version__)

def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reset all changes below this, if you want to do this, please put in a different PR and explain what they are for and why

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is green lines below here, then it means new lines have been added

"""Run the download and processing pipeline."""

def _consume_to_store(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None:
"""Logic for the consume command (and the archive command)."""
fs = get_fs(path=command_opts.zarr_path)

fs = get_fs(path=command_opts.zarr_path)
window = command_opts.time_window

product_iter = get_products_iterator(
Expand All @@ -42,11 +57,9 @@ def _consume_to_store(command_opts: ArchiveCommandOptions | ConsumeCommandOption
end=window[1],
)

# Use existing zarr store if it exists
if fs.exists(command_opts.zarr_path.replace("s3://", "")):
log.info("Using existing zarr store", dst=command_opts.zarr_path)
else:
# Create new store
log.info("Creating new zarr store", dst=command_opts.zarr_path)
_ = create_empty_zarr(dst=command_opts.zarr_path, coords=command_opts.as_coordinates())

Expand All @@ -60,8 +73,11 @@ def _etl(product: eumdac.product.Product) -> str | None:
return nat_filepath

nat_filepaths: list[str] = []


num_skipped: int = 0
# Iterate through all products in search

for nat_filepath in Parallel(
n_jobs=command_opts.num_workers, return_as="generator",
)(delayed(_etl)(product) for product in product_iter):
Expand Down Expand Up @@ -95,6 +111,10 @@ def _etl(product: eumdac.product.Product) -> str | None:
f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}",
num_files=len(nat_filepaths), dst=command_opts.raw_folder,
)

for f in nat_filepaths:
f.unlink() # type: ignore

_ = [f.unlink() for f in nat_filepaths] # type:ignore

def _merge_command(command_opts: MergeCommandOptions) -> None:
Expand Down Expand Up @@ -125,7 +145,6 @@ def _merge_command(command_opts: MergeCommandOptions) -> None:
dst = create_latest_zip(srcs=zarr_paths)
log.info("Created latest.zip", dst=dst)


def run(config: SatelliteConsumerConfig) -> None:
"""Run the download and processing pipeline."""
prog_start = dt.datetime.now(tz=dt.UTC)
Expand All @@ -146,4 +165,3 @@ def run(config: SatelliteConsumerConfig) -> None:

runtime = dt.datetime.now(tz=dt.UTC) - prog_start
log.info(f"Completed satellite consumer run in {runtime!s}.")