Skip to content

Commit 263d179

Browse files
committed
Add shard tombstone handling in compaction worker
1 parent 24b35ed commit 263d179

File tree

7 files changed

+790
-139
lines changed

7 files changed

+790
-139
lines changed

.mockery.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ packages:
3737
github.com/grafana/pyroscope/pkg/experiment/metastore/discovery:
3838
interfaces:
3939
Discovery:
40-
github.com/grafana/pyroscope/pkg/experiment/metastore/dlq:
41-
interfaces:
42-
LocalServer:
4340
github.com/grafana/pyroscope/pkg/experiment/metastore/index:
4441
interfaces:
4542
Store:
43+
github.com/grafana/pyroscope/pkg/experiment/metastore/index/dlq:
44+
interfaces:
45+
Metastore:
4646
github.com/grafana/pyroscope/pkg/experiment/distributor/placement:
4747
interfaces:
4848
Placement:

pkg/experiment/block/object.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99

1010
"github.com/grafana/dskit/multierror"
11+
"github.com/oklog/ulid"
1112
"golang.org/x/sync/errgroup"
1213

1314
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
@@ -75,6 +76,23 @@ func ObjectPath(md *metastorev1.BlockMeta) string {
7576
return BuildObjectPath(metadata.Tenant(md), md.Shard, md.CompactionLevel, md.Id)
7677
}
7778

79+
func BuildObjectDir(tenant string, shard uint32) string {
80+
topLevel := DirNameBlock
81+
tenantDirName := tenant
82+
if tenant == "" {
83+
topLevel = DirNameSegment
84+
tenantDirName = DirNameAnonTenant
85+
}
86+
var b strings.Builder
87+
b.WriteString(topLevel)
88+
b.WriteByte('/')
89+
b.WriteString(strconv.Itoa(int(shard)))
90+
b.WriteByte('/')
91+
b.WriteString(tenantDirName)
92+
b.WriteByte('/')
93+
return b.String()
94+
}
95+
7896
func BuildObjectPath(tenant string, shard uint32, level uint32, block string) string {
7997
topLevel := DirNameBlock
8098
tenantDirName := tenant
@@ -113,6 +131,18 @@ func MetadataDLQObjectPath(md *metastorev1.BlockMeta) string {
113131
return b.String()
114132
}
115133

134+
func ParseBlockIDFromPath(path string) (ulid.ULID, error) {
135+
tokens := strings.Split(path, "/")
136+
if len(tokens) < 2 {
137+
return ulid.ULID{}, fmt.Errorf("invalid path format: %s", path)
138+
}
139+
blockID, err := ulid.Parse(tokens[len(tokens)-2])
140+
if err != nil {
141+
return ulid.ULID{}, fmt.Errorf("expected ULID: %s: %w", path, err)
142+
}
143+
return blockID, nil
144+
}
145+
116146
// Open opens the object, loading the data into memory if it's small enough.
117147
//
118148
// Open may be called multiple times concurrently, but the

pkg/experiment/compactor/compaction_worker_metrics.go renamed to pkg/experiment/compactor/metrics.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type compactionWorkerMetrics struct {
1313
jobsCompleted *prometheus.CounterVec
1414
jobDuration *prometheus.HistogramVec
1515
timeToCompaction *prometheus.HistogramVec
16+
blocksDeleted *prometheus.CounterVec
1617
}
1718

1819
func newMetrics(r prometheus.Registerer) *compactionWorkerMetrics {
@@ -31,7 +32,7 @@ func newMetrics(r prometheus.Registerer) *compactionWorkerMetrics {
3132
Name: "job_duration_seconds",
3233
Help: "Duration of compaction job runs",
3334

34-
Buckets: prometheus.ExponentialBuckets(1, 300, 16),
35+
Buckets: prometheus.ExponentialBuckets(1, 2, 30),
3536
NativeHistogramBucketFactor: 1.1,
3637
NativeHistogramMaxBucketNumber: 16,
3738
NativeHistogramMinResetDuration: time.Hour,
@@ -41,18 +42,24 @@ func newMetrics(r prometheus.Registerer) *compactionWorkerMetrics {
4142
Name: "time_to_compaction_seconds",
4243
Help: "The time elapsed since the oldest compacted block was created.",
4344

44-
Buckets: prometheus.ExponentialBuckets(1, 3600, 16),
45+
Buckets: prometheus.ExponentialBuckets(1, 2, 30),
4546
NativeHistogramBucketFactor: 1.1,
4647
NativeHistogramMaxBucketNumber: 16,
4748
NativeHistogramMinResetDuration: time.Hour,
4849
}, []string{"tenant", "level"}),
50+
51+
blocksDeleted: prometheus.NewCounterVec(prometheus.CounterOpts{
52+
Name: "blocks_deleted_total",
53+
Help: "Total number of block deletion attempts.",
54+
}, []string{"status"}),
4955
}
5056

5157
util.Register(r,
5258
m.jobsInProgress,
5359
m.jobsCompleted,
5460
m.jobDuration,
5561
m.timeToCompaction,
62+
m.blocksDeleted,
5663
)
5764

5865
return m

0 commit comments

Comments
 (0)