Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions collection_state/artifact_collection_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ func (s *ArtifactCollectionState) MigrateFromLegacyState(bytes []byte) error {
func (s *ArtifactCollectionState) Validate() error {
var errorList []error
for _, trunkState := range s.TrunkStates {
if trunkState == nil {
continue // skip nil trunk states
}
if trunkErr := trunkState.Validate(); trunkErr != nil {
errorList = append(errorList, trunkErr)
}
Expand Down Expand Up @@ -320,3 +323,12 @@ func (s *ArtifactCollectionState) String() any {
}
return stringBuilder.String()
}

// TrimNilTrunkStates removes all entries from TrunkStates where the value is nil.
func (s *ArtifactCollectionState) TrimNilTrunkStates() {
for k, v := range s.TrunkStates {
if v == nil {
delete(s.TrunkStates, k)
}
}
}
52 changes: 52 additions & 0 deletions collection_state/artifact_collection_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,58 @@ func TestArtifactCollectionState_MigrateFromLegacyState(t *testing.T) {
}
}

func TestArtifactCollectionState_Validate_NilTrunks(t *testing.T) {
t.Run("all nil trunks", func(t *testing.T) {
state := &ArtifactCollectionState{
TrunkStates: map[string]*TimeRangeCollectionState{
"/trunk1": nil,
"/trunk2": nil,
},
}
err := state.Validate()
if err != nil {
t.Errorf("expected Validate to return nil for all nil trunks, got: %v", err)
}
})

t.Run("mixed nil and valid trunks", func(t *testing.T) {
valid := &TimeRangeCollectionState{
TimeRanges: []*TimeRangeObjectState{},
}
state := &ArtifactCollectionState{
TrunkStates: map[string]*TimeRangeCollectionState{
"/trunk1": nil,
"/trunk2": valid,
},
}
err := state.Validate()
if err != nil {
t.Errorf("expected Validate to return nil for valid trunks, got: %v", err)
}
})

t.Run("mixed nil and invalid trunks", func(t *testing.T) {
invalid := &TimeRangeCollectionState{
TimeRanges: []*TimeRangeObjectState{
{
Granularity: 1,
TimeRange: DirectionalTimeRange{}, // invalid
},
},
}
state := &ArtifactCollectionState{
TrunkStates: map[string]*TimeRangeCollectionState{
"/trunk1": nil,
"/trunk2": invalid,
},
}
err := state.Validate()
if err == nil {
t.Errorf("expected Validate to return error for invalid trunks, got nil")
}
})
}

// buildArtifactCollectionStateLegacy constructs a legacy artifact collection state for tests
func buildArtifactCollectionStateLegacy(trunks map[string]*TimeRangeCollectionStateLegacy, lastModifiedTime time.Time) *ArtifactCollectionStateLegacy {
return &ArtifactCollectionStateLegacy{
Expand Down
5 changes: 0 additions & 5 deletions collection_state/saveable_collection_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (s *SaveableCollectionState) Init(collectionTimeRange DirectionalTimeRange,
// save the granularity
s.granularity = granularity

// NOTE: if granularity is zero, we DO NOT support collecting for a time range so clear the time range
if granularity == 0 {
slog.Info("Granularity is zero - clearing collection time range")
collectionTimeRange = DirectionalTimeRange{}
}
// if we are recollecting, clear BEFORE call to Init, as Init will set the active range
// which we must not do until we have cleared the state
if recollect {
Expand Down
5 changes: 0 additions & 5 deletions collection_state/time_range_collection_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,6 @@ func (t *TimeRangeCollectionState) OnCollectionComplete() error {
if t.currentCollectionTimeRange == nil {
return fmt.Errorf("cannot complete collection - no current collection set, Init must be called first")
}
// if we have no granularity we have nothing to do
if t.Granularity == 0 {
slog.Info("Granularity is zero - no collection complete action required")
return nil
}

// set the upper boundary time of the active range to the upper boundary time of the collection time range
t.activeRange.setUpperBoundaryTime(t.currentCollectionTimeRange.EndTime())
Expand Down
5 changes: 0 additions & 5 deletions collection_state/time_range_object_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func (s *TimeRangeObjectState) IsEmpty() bool {

// so we have no end objects

// if we have no time information and no end objects then we are empty
if s.Granularity == 0 {
return true
}

// if the start time equals the end time (the initial state) then we are empty
return s.TimeRange.UpperBoundary.Equal(s.TimeRange.LowerBoundary)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/go-viper/mapstructure/v2 v2.3.0 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/goccy/go-yaml v1.11.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down Expand Up @@ -182,7 +182,7 @@ require (
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBEx
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk=
github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-yaml v1.11.2 h1:joq77SxuyIs9zzxEjgyLBugMQ9NEgTWxXfz2wVqwAaQ=
Expand Down Expand Up @@ -881,8 +881,8 @@ golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/oauth2 v0.1.0/go.mod h1:G9FE4dLTsbXUu90h/Pf85g4w1D+SSAgR+q46nJZ8M4A=
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
19 changes: 19 additions & 0 deletions row_source/row_source_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, o
// it will also adjust the from and to time if the collection order is reverse
r.setCollectionTimeRange(params, granularity)

// After setting the collection time range, trim nil trunk states
// NOTE: this is done here so that the collection state is clean before we call Init on it.
// This is important as we may have nil trunk states in the collection state file if the previous
// collection was done using older version of the code
if artifactState, ok := r.CollectionState.State.(*collection_state.ArtifactCollectionState); ok {
artifactState.TrimNilTrunkStates()
}

// init the collection state with the time range and granularity
// NOTE: the collection state will set it;s collection order based on the time range collection order
// (which we set from our CollectionOrder field)
Expand Down Expand Up @@ -220,6 +228,17 @@ func (r *RowSourceImpl[S, T]) OnCollectionComplete() error {
slog.Info("OnCollectionComplete: Collection state is nil - not setting end time")
return nil
}

// Before saving, trim nil trunk states
// Having a null trunk state in the collection state file is valid. There might be some locations in the
// bucket which have no files in them, which would result in null trunk states, as there is no way to know
// this in advance we add the null trunk states to the collection state.
// However, we don't want to save these null trunk states to the collection state file as they do not
// make sense. So we trim them before saving.
if artifactState, ok := r.CollectionState.State.(*collection_state.ArtifactCollectionState); ok {
artifactState.TrimNilTrunkStates()
}

// so the source collection was successful, set the end time of the collection state to the collection `to`
// this ensures that when we run the next collection, we will start from the end time of the previous collection
if err := r.CollectionState.OnCollectionComplete(); err != nil {
Expand Down