Skip to content

More DFGv1 cleanups and checks and Blobstore updates #1159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/DataBlobs/entities/BlobEntry.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ Base.@kwdef struct Blobentry
createdTimestamp::Union{ZonedDateTime, Nothing} = nothing
""" Use carefully, but necessary to support advanced usage such as time synchronization over Blob data. """
lastUpdatedTimestamp::Union{ZonedDateTime, Nothing} = nothing
""" Type version of this Blobentry. TBD.jl consider upgrading to `::VersionNumber`. """
_version::String = string(_getDFGVersion())
""" Type version of this Blobentry."""
_version::VersionNumber = _getDFGVersion()
end

StructTypes.StructType(::Type{Blobentry}) = StructTypes.UnorderedStruct()
StructTypes.idproperty(::Type{Blobentry}) = :id
StructTypes.omitempties(::Type{Blobentry}) = (:id,)

_fixtimezone(cts::NamedTuple) = ZonedDateTime(cts.utc_datetime * "+00")
30 changes: 30 additions & 0 deletions src/DataBlobs/entities/BlobStores.jl
Original file line number Diff line number Diff line change
@@ -1 +1,31 @@
"""
AbstractBlobstore{T}

Abstract supertype for all blobstore implementations.

# Usage

Subtypes of `AbstractBlobstore{T}` must implement the required interface for blob storage and retrieval, such as:

- `add!(store, blobId, blob)`: Add a new blob to the store.
- `get(store, blobId)`: Retrieve a blob by its ID.
- `list(store)`: List all blob IDs in the store.

The parameter `T` represents the type of blobs stored (e.g., `Vector{UInt8}` or a custom `Blob` type).

See concrete implementations for details.

Design Notes
- `blobId` is not considered unique across blobstores with different labels only within a single blobstore.
- We cannot guarantee that `blobId` is unique across different blobstores with the same label and this is up to the end user.
- Within a single blobstore `addBlob!` will fail if there is a UUID collision.
- TODO: We should consider using uuid7 for `blobId`s (requires jl v1.12).
- `Blobstrores`are identified by a `label::Symbol`, which allows for multiple blobstores to coexist in the same system.

TODO: If we want to make the `blobId`=>Blob pair immutable:
- We can use the tombstone pattern to mark a blob as deleted. See FolderStore in PR#TODO.

Design goal: all `Blobstore`s with the same `label` can contain the same `blobId`=>`Blob` pair and the blobs should be identical since they are immutable.

"""
abstract type AbstractBlobstore{T} end
4 changes: 2 additions & 2 deletions src/DataBlobs/services/BlobEntry.jl
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ function addBlobentry!(var::VariableDFG, entry::Blobentry)
return entry
end

function addBlobentry!(dfg::AbstractDFG, vLbl::Symbol, entry::Blobentry;)
function addBlobentry!(dfg::AbstractDFG, vLbl::Symbol, entry::Blobentry)
return addBlobentry!(getVariable(dfg, vLbl), entry)
end

Expand Down Expand Up @@ -238,7 +238,7 @@ end
"""
$SIGNATURES

Does a blob entry (element) exist with `blobLabel`.
Does a blob entry exist with `blobLabel`.
"""
hasBlobentry(var::AbstractDFGVariable, blobLabel::Symbol) = haskey(var.dataDict, blobLabel)

Expand Down
74 changes: 34 additions & 40 deletions src/DataBlobs/services/BlobStores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
function getBlob end

"""
Adds a blob to the blob store or dfg with the given entry.
Adds a blob to the blob store or dfg with the blobId.

Related
[`addBlobentry!`](@ref)
Expand Down Expand Up @@ -81,34 +81,11 @@
##==============================================================================
## AbstractBlobstore derived CRUD for Blob
##==============================================================================
#TODO looking in all the blobstores does not make sense since since there is a chance that the blobId is not unique across blobstores.
# using the cached blobstore is the right way to go here.
#TODO maybe we should generalize and move the cached blobstore to DFG.
function getBlob(dfg::AbstractDFG, entry::Blobentry)
stores = getBlobstores(dfg)
storekeys = collect(keys(stores))
# first check the saved blobstore and then fall back to the rest
fidx = findfirst(==(entry.blobstore), storekeys)
if !isnothing(fidx)
skey = storekeys[fidx]
popat!(storekeys, fidx)
pushfirst!(storekeys, skey)
end
for k in storekeys
store = stores[k]
try
blob = getBlob(store, entry)
return blob
catch err
if !(err isa KeyError)
throw(err)
end
end
end
throw(
KeyError(
"could not find $(entry.label), uuid $(entry.blobId) in any of the listed blobstores:\n $([s->getLabel(s) for (s,v) in stores]))",
),
)
storeLabel = entry.blobstore
store = getBlobstore(dfg, storeLabel)
return getBlob(store, entry.blobId)
end

function getBlob(store::AbstractBlobstore, entry::Blobentry)
Expand Down Expand Up @@ -204,19 +181,18 @@
return read(f)
end
else
throw(KeyError("Could not find file '$(blobfilename)'."))
throw(IdNotFoundError("Blob", blobId))
end
end

function addBlob!(store::FolderStore{T}, blobId::UUID, data::T) where {T}
blobfilename = joinpath(store.folder, string(store.label), string(blobId))
if isfile(blobfilename)
throw(KeyError("Key '$blobId' blob already exists."))
throw(IdExistsError("Blob", blobId))
else
open(blobfilename, "w") do f
return write(f, data)
end
# return data
return blobId
end
end
Expand All @@ -235,11 +211,13 @@

function deleteBlob!(store::FolderStore{T}, blobId::UUID) where {T}
blobfilename = joinpath(store.folder, string(store.label), string(blobId))
if !isfile(blobfilename)
throw(IdNotFoundError("Blob", blobId))
end
rm(blobfilename)
return 1
end

#hasBlob or existsBlob?
function hasBlob(store::FolderStore, blobId::UUID)
blobfilename = joinpath(store.folder, string(store.label), string(blobId))
return isfile(blobfilename)
Expand All @@ -265,12 +243,15 @@
end

function getBlob(store::InMemoryBlobstore, blobId::UUID)
if !haskey(store.blobs, blobId)
throw(IdNotFoundError("Blob", blobId))

Check warning on line 247 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L247

Added line #L247 was not covered by tests
end
return store.blobs[blobId]
end

function addBlob!(store::InMemoryBlobstore{T}, blobId::UUID, data::T) where {T}
if haskey(store.blobs, blobId)
error("Key '$blobId' blob already exists.")
throw(IdExistsError("Blob", blobId))

Check warning on line 254 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L254

Added line #L254 was not covered by tests
end
store.blobs[blobId] = data
return blobId
Expand All @@ -284,6 +265,9 @@
end

function deleteBlob!(store::InMemoryBlobstore, blobId::UUID)
if !haskey(store.blobs, blobId)
throw(IdNotFoundError("Blob", blobId))

Check warning on line 269 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L269

Added line #L269 was not covered by tests
end
pop!(store.blobs, blobId)
return 1
end
Expand Down Expand Up @@ -319,25 +303,28 @@

function getBlob(store::LinkStore, blobId::UUID)
fname = get(store.cache, blobId, nothing)
if isnothing(fname)
throw(IdNotFoundError("Blob", blobId))

Check warning on line 307 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L306-L307

Added lines #L306 - L307 were not covered by tests
end
return read(fname)
end

function addBlob!(store::LinkStore, entry::Blobentry, linkfile::String)
return addBlob!(store, entry.blobId, nothing, linkfile::String)
return addBlob!(store, entry.blobId, linkfile)

Check warning on line 313 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L313

Added line #L313 was not covered by tests
end

function addBlob!(store::LinkStore, blobId::UUID, blob::Any, linkfile::String)
function addBlob!(store::LinkStore, blobId::UUID, linkfile::String)

Check warning on line 316 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L316

Added line #L316 was not covered by tests
if haskey(store.cache, blobId)
error("blobId $blobId already exists in the store")
throw(IdExistsError("Blob", blobId))

Check warning on line 318 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L318

Added line #L318 was not covered by tests
end
push!(store.cache, blobId => linkfile)
open(store.csvfile, "a") do f
return println(f, blobId, ",", linkfile)
end
return getBlob(store, blobId)
return blobId

Check warning on line 324 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L324

Added line #L324 was not covered by tests
end

function deleteBlob!(store::LinkStore, args...)
function deleteBlob!(store::LinkStore)

Check warning on line 327 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L327

Added line #L327 was not covered by tests
return error("deleteDataBlob(::LinkStore) not supported")
end

Expand Down Expand Up @@ -404,12 +391,15 @@

##
function getBlob(store::RowBlobstore, blobId::UUID)
if !haskey(store.blobs, blobId)
throw(IdNotFoundError("Blob", blobId))

Check warning on line 395 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L394-L395

Added lines #L394 - L395 were not covered by tests
end
return getfield(store.blobs[blobId], :blob)
end

function addBlob!(store::RowBlobstore{T}, blobId::UUID, blob::T) where {T}
if haskey(store.blobs, blobId)
error("Key '$blobId' blob already exists.")
throw(IdExistsError("Blob", blobId))

Check warning on line 402 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L402

Added line #L402 was not covered by tests
end
store.blobs[blobId] = RowBlob(blobId, blob)
return blobId
Expand All @@ -423,7 +413,10 @@
end

function deleteBlob!(store::RowBlobstore, blobId::UUID)
getfield(pop!(store.blobs, blobId), :blob)
if !haskey(store.blobs, blobId)
throw(IdNotFoundError("Blob", blobId))

Check warning on line 417 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L416-L417

Added lines #L416 - L417 were not covered by tests
end
pop!(store.blobs, blobId)

Check warning on line 419 in src/DataBlobs/services/BlobStores.jl

View check run for this annotation

Codecov / codecov/patch

src/DataBlobs/services/BlobStores.jl#L419

Added line #L419 was not covered by tests
return 1
end

Expand Down Expand Up @@ -469,3 +462,4 @@
Tables.rowtable(sstore)
end
##
##
4 changes: 2 additions & 2 deletions src/DataBlobs/services/HelpersDataWrapEntryBlob.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function Blobentry(
timestamp::ZonedDateTime = entry.timestamp,
createdTimestamp = entry.createdTimestamp,
lastUpdatedTimestamp = entry.lastUpdatedTimestamp,
_version::String = entry._version,
_version = entry._version,
)
return Blobentry(;
id,
Expand Down Expand Up @@ -267,7 +267,7 @@ function updateData!(
blobstore = getLabel(blobstore),
hash = string(bytes2hex(hashfunction(blob))),
origin = buildSourceString(dfg, label),
_version = string(_getDFGVersion()),
_version = _getDFGVersion(),
)
mergeBlobentry!(dfg, label, newEntry)
updateBlob!(blobstore, newEntry, blob)
Expand Down
98 changes: 98 additions & 0 deletions src/Deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,104 @@
)
end

#NOTE List types funcction do not fit verb noun and will be deprecated.
# should return types

"""
$SIGNATURES

Return `Vector{Symbol}` of all unique variable types in factor graph.
"""
function lsTypes(dfg::AbstractDFG)
vars = getVariables(dfg)
alltypes = Set{Symbol}()
for v in vars
varType = Symbol(typeof(getVariableType(v)))
push!(alltypes, varType)
end
return collect(alltypes)
end

"""
$SIGNATURES

Return `::Dict{Symbol, Vector{Symbol}}` of all unique variable types with labels in a factor graph.
"""
function lsTypesDict(dfg::AbstractDFG)
vars = getVariables(dfg)
alltypes = Dict{Symbol, Vector{Symbol}}()
for v in vars
varType = Symbol(typeof(getVariableType(v)))
d = get!(alltypes, varType, Symbol[])
push!(d, v.label)
end
return alltypes
end

"""
$SIGNATURES

Return `Vector{Symbol}` of all unique factor types in factor graph.
"""
function lsfTypes(dfg::AbstractDFG)
facs = getFactors(dfg)
alltypes = Set{Symbol}()
for f in facs
facType = typeof(getFactorType(f)) |> nameof
push!(alltypes, facType)
end
return collect(alltypes)
end

"""
$SIGNATURES

Return `::Dict{Symbol, Vector{Symbol}}` of all unique factors types with labels in a factor graph.
"""
function lsfTypesDict(dfg::AbstractDFG)
facs = getFactors(dfg)
alltypes = Dict{Symbol, Vector{Symbol}}()
for f in facs
facType = typeof(getFactorType(f)) |> nameof
d = get!(alltypes, facType, Symbol[])
push!(d, f.label)
end
return alltypes
end

# solvekey is deprecated and sync!/copyto! is the better verb.
#TODO replace with syncVariableStates! or similar
"""
$SIGNATURES
Duplicate a `solveKey`` into a destination from a source.

Notes
- Can copy between graphs, or to different solveKeys within one graph.
"""
function cloneSolveKey!(

Check warning on line 128 in src/Deprecated.jl

View check run for this annotation

Codecov / codecov/patch

src/Deprecated.jl#L128

Added line #L128 was not covered by tests
dest_dfg::AbstractDFG,
dest::Symbol,
src_dfg::AbstractDFG,
src::Symbol;
solvable::Int = 0,
labels = intersect(ls(dest_dfg; solvable = solvable), ls(src_dfg; solvable = solvable)),
verbose::Bool = false,
)
#
for x in labels
sd = deepcopy(getVariableState(getVariable(src_dfg, x), src))
copytoVariableState!(dest_dfg, x, dest, sd)
end

Check warning on line 141 in src/Deprecated.jl

View check run for this annotation

Codecov / codecov/patch

src/Deprecated.jl#L138-L141

Added lines #L138 - L141 were not covered by tests

return nothing

Check warning on line 143 in src/Deprecated.jl

View check run for this annotation

Codecov / codecov/patch

src/Deprecated.jl#L143

Added line #L143 was not covered by tests
end

function cloneSolveKey!(dfg::AbstractDFG, dest::Symbol, src::Symbol; kw...)

Check warning on line 146 in src/Deprecated.jl

View check run for this annotation

Codecov / codecov/patch

src/Deprecated.jl#L146

Added line #L146 was not covered by tests
#
@assert dest != src "Must copy to a different solveKey within the same graph, $dest."
return cloneSolveKey!(dfg, dest, dfg, src; kw...)

Check warning on line 149 in src/Deprecated.jl

View check run for this annotation

Codecov / codecov/patch

src/Deprecated.jl#L148-L149

Added lines #L148 - L149 were not covered by tests
end

## ================================================================================
## Deprecated in v0.27
##=================================================================================
Expand Down
Loading
Loading