Skip to content

v0.2.1

Latest
Compare
Choose a tag to compare
@valayDave valayDave released this 17 Apr 01:36

Coreweave Datastore Support

Usage

Saving checkpoints to Coreweave datastore

@with_artifact_store(
    type="coreweave",
    config=lambda: {
        "root": "s3://my-coreweave-bucket/metaflow-artifacts-new",
        "client_params": {
            "aws_access_key_id": os.environ.get("COREWEAVE_ACCESS_KEY"),
            "aws_secret_access_key": os.environ.get("COREWEAVE_SECRET_KEY"),
            "endpoint_url": "https://cwobject.com/",
            "config": dict(s3={"addressing_style": "virtual"}),
        },
    },
)
class MyFlow(FlowSpec):

# ensure `COREWEAVE_SECRET_KEY`/`COREWEAVE_ACCESS_KEY` available through @secrets decorator or via env vars
    @checkpoint
    @step
    def start(self):
        ...

Accessing Objects written to Coreweave datastore

from metaflow import artifact_store_from, load_model, Checkpoint, Run
import os
if __name__ == "__main__":
    run = Run("CheckpointsTestsFlow/9382", _namespace_check=False)
    with artifact_store_from(run=run, config={
        "client_params": {
            "aws_access_key_id": os.environ.get("COREWEAVE_ACCESS_KEY"),
            "aws_secret_access_key": os.environ.get("COREWEAVE_SECRET_KEY"),
            "endpoint_url": "https://cwobject.com/",
            "config": dict(s3={"addressing_style": "virtual"}),
        }
    }):
        load_model(
            run.data.nested_dir_checkpoint, "cw-checkpoint"
        )
        with Checkpoint() as chckpt:
            chckpt.load(
                run.data.nested_dir_checkpoint
            )
            print(os.listdir(chckpt.directory))