Skip to content

Commit 2b9502d

Browse files
authored
Merge branch 'v0.9.x' into develop
2 parents 7d90498 + b3b4ca1 commit 2b9502d

19 files changed

+1805
-1579
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
## v0.9.0 [2025-07-02]
2+
_Whats new_
3+
4+
- Refactor collection state to support time ranges, enabling `--to` flag support. ([#241](https://github.com/turbot/tailpipe-plugin-sdk/issues/241))
5+
- TimeRangeCollectionState supports array of time ranges
6+
- ShouldCollect ensures that the gaps between the ranges are filled but we do not collect for times we have already collected
7+
- The time ranges, of type DirectionalTimeRange, are direction aware and work for collection in forwards or backwards direction
8+
- A Clear function allows clearing the state for a specified time range
9+
- Move all persistence logic into SaveableCollectionState
10+
- Added `overwrite` parameter to CollectRequest - if set, clear collection state becore collecting
11+
- Added migration support for legacy collection states
12+
13+
_Bug fixes_
14+
* Fix issue where collection state end-objects are cleared when collection is complete,
15+
meaning no further data will be collected for that day. ([#250](https://github.com/turbot/tailpipe-plugin-sdk/issues/250))
16+
17+
118
## v0.8.0 [2025-06-23]
219
_Whats new_
320
* Remove row validation and rely entirely on CLI to execute validation. ([#202](https://github.com/turbot/tailpipe-plugin-sdk/issues/202))

artifact_source/artifact_source_impl.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -538,8 +538,10 @@ func (a *ArtifactSourceImpl[S, T]) getMetadataAndApplyFilters(targetPath string,
538538
satisfied := match && metadataSatisfiesFilters(metadata, filterMap)
539539

540540
// if we have a from time, check whether that excludes this directory
541-
if satisfied && isDir && !a.FromTime.IsZero() {
542-
satisfied = dirSatisfiesFromTime(a.FromTime, metadata)
541+
// NOTE: we only support forward collection for artifacts, so we only check the LowerBoundary
542+
// - we do not need the directional logic here
543+
if satisfied && isDir {
544+
satisfied = dirSatisfiesFromTime(a.CollectionTimeRange.LowerBoundary, metadata)
543545
}
544546

545547
return metadata, satisfied, nil
@@ -570,12 +572,14 @@ func (a *ArtifactSourceImpl[S, T]) walkFileNode(ctx context.Context, targetPath
570572

571573
// if the artifact has a timestamp, check the from and to time
572574
if !artifactInfo.Timestamp.IsZero() {
575+
from := a.CollectionTimeRange.LowerBoundary
576+
to := a.CollectionTimeRange.UpperBoundary
573577
// if we have a from time, check if the artifact is newer than the from time
574-
if !a.FromTime.IsZero() && artifactInfo.Timestamp.Compare(a.FromTime) < 0 {
578+
if !from.IsZero() && artifactInfo.Timestamp.Compare(from) < 0 {
575579
return nil
576580
}
577581
// if we have a to time, check if the artifact is older than the to time
578-
if !a.ToTime.IsZero() && artifactInfo.Timestamp.Compare(a.ToTime) > 0 {
582+
if !to.IsZero() && artifactInfo.Timestamp.Compare(to) > 0 {
579583
return nil
580584
}
581585
}

artifact_source/nil_artifact_collection_state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (s *NilArtifactCollectionState) GetToTime() time.Time {
2121
func (s *NilArtifactCollectionState) SetEndTime(_ time.Time) {
2222
}
2323

24-
func (*NilArtifactCollectionState) Init(collection_state.CollectionTimeRange, time.Duration) {
24+
func (*NilArtifactCollectionState) Init(collection_state.DirectionalTimeRange, time.Duration) {
2525
}
2626

2727
func (s *NilArtifactCollectionState) RegisterPath(_ string, _ map[string]string) {
@@ -53,5 +53,5 @@ func (*NilArtifactCollectionState) MigrateFromLegacyState(_ []byte) error {
5353
func (*NilArtifactCollectionState) Validate() error {
5454
return nil
5555
}
56-
func (*NilArtifactCollectionState) Clear(_ collection_state.CollectionTimeRange) {
56+
func (*NilArtifactCollectionState) Clear(_ collection_state.DirectionalTimeRange) {
5757
}

artifact_source/plugin_source_wrapper.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,15 @@ func (w *PluginSourceWrapper) Init(ctx context.Context, params *row_source.RowSo
8787
if err != nil {
8888
return err
8989
}
90+
// the init call returns the resolved from time - use to store on the wrapper
9091
// set the from time
9192
fromTime := row_source.ResolvedFromTimeFromProto(resp.FromTime)
9293

93-
w.FromTime = fromTime.Time
94+
// store the collection time range
95+
w.CollectionTimeRange = collection_state.DirectionalTimeRange{
96+
LowerBoundary: fromTime.Time,
97+
UpperBoundary: params.To,
98+
}
9499
w.FromTimeSource = fromTime.Source
95100

96101
return nil

collection_state/artifact_collection_state.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type ArtifactCollectionState struct {
2626
granularity time.Duration
2727

2828
// the time range for the underway collection - populated by Init
29-
currentCollectionTimeRange *CollectionTimeRange
29+
currentCollectionTimeRange *DirectionalTimeRange
3030
// map of the trunk state for each object which has been passed to ShouldCollect
3131
// this is to avoid recomputing the trunk state for each object on every OnCollected call
3232
objectTrunkMap map[string]*TimeRangeCollectionState
@@ -40,7 +40,7 @@ func NewArtifactCollectionState() CollectionState {
4040
}
4141

4242
// Init sets the filepath of the collection state and loads the state from the file if it exists
43-
func (s *ArtifactCollectionState) Init(collectionTimeRange CollectionTimeRange, granularity time.Duration) {
43+
func (s *ArtifactCollectionState) Init(collectionTimeRange DirectionalTimeRange, granularity time.Duration) {
4444
// ensure the granularity is no smaller than the minimum
4545
if granularity < MinArtifactGranularity && granularity != 0 {
4646
granularity = MinArtifactGranularity
@@ -79,7 +79,6 @@ func (s *ArtifactCollectionState) GetFromTime() time.Time {
7979
// (we may have collected some data after this - within the granularity period)
8080
// return the earliest end time of all the trunk states
8181
func (s *ArtifactCollectionState) GetToTime() time.Time {
82-
// TODO #CS KAI think about continuation for reverse order
8382
// find the earliest end time of all the trunk states
8483
var endTime time.Time
8584
for _, trunkState := range s.TrunkStates {
@@ -223,7 +222,7 @@ func (s *ArtifactCollectionState) IsEmpty() bool {
223222
return true
224223
}
225224

226-
func (s *ArtifactCollectionState) Clear(timeRange CollectionTimeRange) {
225+
func (s *ArtifactCollectionState) Clear(timeRange DirectionalTimeRange) {
227226
for _, trunkState := range s.TrunkStates {
228227
if trunkState == nil {
229228
continue

collection_state/artifact_collection_state_test.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,6 @@ func TestArtifactCollectionState_MigrateFromLegacyState(t *testing.T) {
1212
legacy *ArtifactCollectionStateLegacy
1313
expected *ArtifactCollectionState
1414
}{
15-
{
16-
name: "migrate two trunks with different orders",
17-
legacy: buildArtifactCollectionStateLegacy(map[string]*TimeRangeCollectionStateLegacy{
18-
"/trunk1": buildTimeRangeCollectionStateLegacy("2023-10-01 00:00:00", "2023-12-01 01:00:00", time.Hour*24, CollectionOrderChronological, "object1", "object2"),
19-
"/trunk2": buildTimeRangeCollectionStateLegacy("2023-11-01 00:00:00", "2023-11-30 00:00:00", time.Hour*24, CollectionOrderReverse, "object3"),
20-
}, timeString("2023-12-01 12:00:00")),
21-
expected: buildArtifactCollectionState(map[string]*TimeRangeCollectionState{
22-
"/trunk1": buildTimeRangeCollectionState(CollectionOrderChronological, time.Hour*24,
23-
buildTimeRangeState("2023-10-01 00:00:00", "2023-11-30 01:00:00", time.Hour*24, CollectionOrderChronological, "object1", "object2"),
24-
),
25-
"/trunk2": buildTimeRangeCollectionState(CollectionOrderReverse, time.Hour*24,
26-
buildTimeRangeState("2023-11-30 00:00:00", "2023-11-01 00:00:00", time.Hour*24, CollectionOrderReverse, "object3"),
27-
),
28-
}, time.Hour*24),
29-
},
3015
{
3116
name: "empty trunks",
3217
legacy: buildArtifactCollectionStateLegacy(map[string]*TimeRangeCollectionStateLegacy{}, timeString("2023-12-01 12:00:00")),
@@ -46,7 +31,8 @@ func TestArtifactCollectionState_MigrateFromLegacyState(t *testing.T) {
4631
}, timeString("2023-12-01 12:00:00")),
4732
expected: buildArtifactCollectionState(map[string]*TimeRangeCollectionState{
4833
"/trunk1": buildTimeRangeCollectionState(CollectionOrderChronological, time.Hour*24,
49-
buildTimeRangeState("2023-10-01 00:00:00", "2023-11-30 01:00:00", time.Hour*24, CollectionOrderChronological),
34+
35+
buildTimeRangeState("2023-10-01 00:00:00", "2023-12-01 01:00:00", time.Hour*24, CollectionOrderChronological),
5036
),
5137
}, time.Hour*24),
5238
},
@@ -57,7 +43,7 @@ func TestArtifactCollectionState_MigrateFromLegacyState(t *testing.T) {
5743
}, timeString("2023-12-01 12:00:00")),
5844
expected: buildArtifactCollectionState(map[string]*TimeRangeCollectionState{
5945
"/trunk1": buildTimeRangeCollectionState(CollectionOrderReverse, time.Hour*24,
60-
buildTimeRangeState("2023-12-01 01:00:00", "2023-10-01 00:00:00", time.Hour*24, CollectionOrderReverse, "object1", "object2"),
46+
buildTimeRangeState("2023-10-01 00:00:00", "2023-12-01 01:00:00", time.Hour*24, CollectionOrderReverse, "object1", "object2"),
6147
),
6248
}, time.Hour*24),
6349
},
@@ -68,7 +54,7 @@ func TestArtifactCollectionState_MigrateFromLegacyState(t *testing.T) {
6854
}, timeString("2023-12-01 12:00:00")),
6955
expected: buildArtifactCollectionState(map[string]*TimeRangeCollectionState{
7056
"/trunk1": buildTimeRangeCollectionState(CollectionOrderChronological, time.Hour*24,
71-
buildTimeRangeState("2023-10-01 00:00:00", "2023-10-01 00:00:00", time.Hour*24, CollectionOrderChronological, "object1"),
57+
buildTimeRangeState("2023-10-01 00:00:00", "2023-10-02 00:00:00", time.Hour*24, CollectionOrderChronological, "object1"),
7258
),
7359
}, time.Hour*24),
7460
},
@@ -102,6 +88,7 @@ func buildArtifactCollectionStateLegacy(trunks map[string]*TimeRangeCollectionSt
10288
}
10389
}
10490

91+
// TODO update this to set end time on or after last entry time to me more realistic
10592
// buildTimeRangeCollectionStateLegacy constructs a legacy time range collection state for tests
10693
func buildTimeRangeCollectionStateLegacy(fromStr, toStr string, granularity time.Duration, order CollectionOrder, endObjects ...string) *TimeRangeCollectionStateLegacy {
10794
from, err := time.Parse("2006-01-02 15:04:05", fromStr)

collection_state/collection_time_range.go

Lines changed: 0 additions & 162 deletions
This file was deleted.

0 commit comments

Comments
 (0)