Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,30 @@ type Recorder struct {
func NewRecorder() *Recorder {
r := &Recorder{}

ticker := time.NewTicker(1 * time.Second)
// Create a reference to r that will be captured by the goroutine
recorder := r

go func() {
ticker := time.NewTicker(1 * time.Second)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move ticker creation inside the goroutine to avoid race conditions, to ensures the ticker is only accessible from this goroutine

defer ticker.Stop() // Ensure ticker is stopped when goroutine exits

for range ticker.C {
if sensor.Agent().Ready() {
go func() {
if err := r.Flush(context.Background()); err != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
muSensor.Lock()
agentReady := sensor != nil && sensor.Agent().Ready()
muSensor.Unlock()

if agentReady {
// Create a new reference to recorder for this goroutine to avoid race conditions
r := recorder
go func(rec *Recorder) {
if err := rec.Flush(context.Background()); err != nil {
muSensor.Lock()
if sensor != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
}
muSensor.Unlock()
}
}()
}(r)
}
}
}()
Expand All @@ -59,32 +74,51 @@ func NewTestRecorder() *Recorder {
// RecordSpan accepts spans to be recorded and added to the span queue
// for eventual reporting to the host agent.
func (r *Recorder) RecordSpan(span *spanS) {
// Get all sensor-related values under a single lock to minimize contention
muSensor.Lock()
if sensor == nil {
muSensor.Unlock()
return
}

agentReady := sensor.Agent().Ready()
maxBufferedSpans := sensor.options.MaxBufferedSpans
forceTransmissionAt := sensor.options.ForceTransmissionStartingAt
logger := sensor.logger
muSensor.Unlock()

// If we're not announced and not in test mode then just
// return
if !r.testMode && !sensor.Agent().Ready() {
if !r.testMode && !agentReady {
return
}

r.Lock()
defer r.Unlock()

if len(r.spans) == sensor.options.MaxBufferedSpans {
if len(r.spans) == maxBufferedSpans {
r.spans = r.spans[1:]
}

r.spans = append(r.spans, newSpan(span))

if r.testMode || !sensor.Agent().Ready() {
if r.testMode || !agentReady {
return
}

if len(r.spans) >= sensor.options.ForceTransmissionStartingAt {
sensor.logger.Debug("forcing ", len(r.spans), "span(s) to the agent")
go func() {
if err := r.Flush(context.Background()); err != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
if len(r.spans) >= forceTransmissionAt {
logger.Debug("forcing ", len(r.spans), "span(s) to the agent")
// Create a reference to r for this goroutine to avoid race conditions
rec := r
go func(recorder *Recorder) {
if err := recorder.Flush(context.Background()); err != nil {
muSensor.Lock()
if sensor != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
}
muSensor.Unlock()
}
}()
}(rec)
}
}

Expand Down Expand Up @@ -118,7 +152,15 @@ func (r *Recorder) Flush(ctx context.Context) error {
return nil
}

if err := sensor.Agent().SendSpans(spansToSend); err != nil {
muSensor.Lock()
if sensor == nil {
muSensor.Unlock()
return nil
}
agent := sensor.Agent()
muSensor.Unlock()

if err := agent.SendSpans(spansToSend); err != nil {
r.Lock()
defer r.Unlock()

Expand Down
6 changes: 6 additions & 0 deletions sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ func StartMetrics(options *Options) {

// Ready returns whether the Instana collector is ready to collect and send data to the agent
func Ready() bool {
muSensor.Lock()
defer muSensor.Unlock()

if sensor == nil {
return false
}
Expand All @@ -267,6 +270,9 @@ func Ready() bool {
// graceful service shutdown and not recommended for intermittent use. Once Flush() is called, it's not guaranteed
// that collector remains in operational state.
func Flush(ctx context.Context) error {
muSensor.Lock()
defer muSensor.Unlock()

if sensor == nil {
return nil
}
Expand Down
Loading