@@ -113,6 +113,14 @@ func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, o
113113 // it will also adjust the from and to time if the collection order is reverse
114114 r .setCollectionTimeRange (params , granularity )
115115
116+ // After setting the collection time range, trim nil trunk states
117+ // NOTE: this is done here so that the collection state is clean before we call Init on it.
118+ // This is important as we may have nil trunk states in the collection state file if the previous
119+ // collection was done using older version of the code
120+ if artifactState , ok := r .CollectionState .State .(* collection_state.ArtifactCollectionState ); ok {
121+ artifactState .TrimNilTrunkStates ()
122+ }
123+
116124 // init the collection state with the time range and granularity
117125 // NOTE: the collection state will set it;s collection order based on the time range collection order
118126 // (which we set from our CollectionOrder field)
@@ -220,6 +228,17 @@ func (r *RowSourceImpl[S, T]) OnCollectionComplete() error {
220228 slog .Info ("OnCollectionComplete: Collection state is nil - not setting end time" )
221229 return nil
222230 }
231+
232+ // Before saving, trim nil trunk states
233+ // Having a null trunk state in the collection state file is valid. There might be some locations in the
234+ // bucket which have no files in them, which would result in null trunk states, as there is no way to know
235+ // this in advance we add the null trunk states to the collection state.
236+ // However, we don't want to save these null trunk states to the collection state file as they do not
237+ // make sense. So we trim them before saving.
238+ if artifactState , ok := r .CollectionState .State .(* collection_state.ArtifactCollectionState ); ok {
239+ artifactState .TrimNilTrunkStates ()
240+ }
241+
223242 // so the source collection was successful, set the end time of the collection state to the collection `to`
224243 // this ensures that when we run the next collection, we will start from the end time of the previous collection
225244 if err := r .CollectionState .OnCollectionComplete (); err != nil {
0 commit comments