From 9491d53b6fbde15835deed2d5dade85f26a51792 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 08:55:23 +0530 Subject: [PATCH 1/6] fix: fix race conditions in the sensor.go file --- sensor.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sensor.go b/sensor.go index c67ab8333..a2f51efa4 100644 --- a/sensor.go +++ b/sensor.go @@ -256,6 +256,9 @@ func StartMetrics(options *Options) { // Ready returns whether the Instana collector is ready to collect and send data to the agent func Ready() bool { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return false } @@ -267,6 +270,9 @@ func Ready() bool { // graceful service shutdown and not recommended for intermittent use. Once Flush() is called, it's not guaranteed // that collector remains in operational state. func Flush(ctx context.Context) error { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return nil } From a7e2846ff4801a9acc281a2a3ea646a3ef170106 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 08:55:34 +0530 Subject: [PATCH 2/6] fix: fix race conditions in the recorder.go file --- recorder.go | 74 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/recorder.go b/recorder.go index 44db7f6d3..12eefea18 100644 --- a/recorder.go +++ b/recorder.go @@ -32,15 +32,30 @@ type Recorder struct { func NewRecorder() *Recorder { r := &Recorder{} - ticker := time.NewTicker(1 * time.Second) + // Create a reference to r that will be captured by the goroutine + recorder := r + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() // Ensure ticker is stopped when goroutine exits + for range ticker.C { - if sensor.Agent().Ready() { - go func() { - if err := r.Flush(context.Background()); err != nil { - sensor.logger.Error("failed to flush the spans: ", err.Error()) + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { + // Create a new reference to recorder for this goroutine to avoid race conditions + r := recorder + go func(rec *Recorder) { + if err := rec.Flush(context.Background()); err != nil { + muSensor.Lock() + if sensor != nil { + sensor.logger.Error("failed to flush the spans: ", err.Error()) + } + muSensor.Unlock() } - }() + }(r) } } }() @@ -59,32 +74,51 @@ func NewTestRecorder() *Recorder { // RecordSpan accepts spans to be recorded and added to the span queue // for eventual reporting to the host agent. func (r *Recorder) RecordSpan(span *spanS) { + // Get all sensor-related values under a single lock to minimize contention + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return + } + + agentReady := sensor.Agent().Ready() + maxBufferedSpans := sensor.options.MaxBufferedSpans + forceTransmissionAt := sensor.options.ForceTransmissionStartingAt + logger := sensor.logger + muSensor.Unlock() + // If we're not announced and not in test mode then just // return - if !r.testMode && !sensor.Agent().Ready() { + if !r.testMode && !agentReady { return } r.Lock() defer r.Unlock() - if len(r.spans) == sensor.options.MaxBufferedSpans { + if len(r.spans) == maxBufferedSpans { r.spans = r.spans[1:] } r.spans = append(r.spans, newSpan(span)) - if r.testMode || !sensor.Agent().Ready() { + if r.testMode || !agentReady { return } - if len(r.spans) >= sensor.options.ForceTransmissionStartingAt { - sensor.logger.Debug("forcing ", len(r.spans), "span(s) to the agent") - go func() { - if err := r.Flush(context.Background()); err != nil { - sensor.logger.Error("failed to flush the spans: ", err.Error()) + if len(r.spans) >= forceTransmissionAt { + logger.Debug("forcing ", len(r.spans), "span(s) to the agent") + // Create a reference to r for this goroutine to avoid race conditions + rec := r + go func(recorder *Recorder) { + if err := recorder.Flush(context.Background()); err != nil { + muSensor.Lock() + if sensor != nil { + sensor.logger.Error("failed to flush the spans: ", err.Error()) + } + muSensor.Unlock() } - }() + }(rec) } } @@ -118,7 +152,15 @@ func (r *Recorder) Flush(ctx context.Context) error { return nil } - if err := sensor.Agent().SendSpans(spansToSend); err != nil { + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return nil + } + agent := sensor.Agent() + muSensor.Unlock() + + if err := agent.SendSpans(spansToSend); err != nil { r.Lock() defer r.Unlock() From fd98860400bdf506e1c0343502711630e7ac320d Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 12:27:27 +0530 Subject: [PATCH 3/6] fix: fix race conditions in the agent.go file --- agent.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/agent.go b/agent.go index 5fef356b6..f004a06c4 100644 --- a/agent.go +++ b/agent.go @@ -144,6 +144,9 @@ func (agent *agentS) Ready() bool { // SendMetrics sends collected entity data to the host agent func (agent *agentS) SendMetrics(data acceptor.Metrics) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + pid, err := strconv.Atoi(agent.agentComm.from.EntityID) if err != nil && agent.agentComm.from.EntityID != "" { agent.logger.Debug("agent got malformed PID %q", agent.agentComm.from.EntityID) @@ -159,7 +162,11 @@ func (agent *agentS) SendMetrics(data acceptor.Metrics) error { } agent.logger.Error("failed to send metrics to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } @@ -186,6 +193,9 @@ func (agent *agentS) SendEvent(event *EventData) error { // SendSpans sends collected spans to the host agent func (agent *agentS) SendSpans(spans []Span) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + for i := range spans { spans[i].From = agent.agentComm.from } @@ -202,7 +212,11 @@ func (agent *agentS) SendSpans(spans []Span) error { return nil } else { agent.logger.Error("failed to send spans to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() } return err @@ -221,6 +235,9 @@ type hostAgentProfile struct { // SendProfiles sends profile data to the agent func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + agentProfiles := make([]hostAgentProfile, 0, len(profiles)) for _, p := range profiles { agentProfiles = append(agentProfiles, hostAgentProfile{p, agent.agentComm.from.EntityID}) @@ -233,7 +250,11 @@ func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { } agent.logger.Error("failed to send profile data to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } From 340e0637192f68e72e470fcab3a047afbb4d17ee Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Tue, 14 Oct 2025 09:10:32 +0530 Subject: [PATCH 4/6] fix: fix unit test failure --- delayed_spans.go | 7 ++++++- delayed_spans_test.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/delayed_spans.go b/delayed_spans.go index 3e3080dea..9e37f135f 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -42,7 +42,12 @@ func (ds *delayedSpans) flush() { continue } - if sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { s.tracer.recorder.RecordSpan(s) } else { ds.append(s) diff --git a/delayed_spans_test.go b/delayed_spans_test.go index fa5f463bb..b36031aa8 100644 --- a/delayed_spans_test.go +++ b/delayed_spans_test.go @@ -56,7 +56,7 @@ func TestPartiallyFlushDelayedSpans(t *testing.T) { notReadyAfter := maxDelayedSpans / 10 sensor.agent = &eventuallyNotReadyClient{ - notReadyAfter: uint64(notReadyAfter), + notReadyAfter: uint64(notReadyAfter * 2), } delayed.flush() From edb53f36af771d6f398a30b6ee97a97b1eb999a2 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Tue, 14 Oct 2025 10:41:13 +0530 Subject: [PATCH 5/6] fix: fix race conditions --- delayed_spans.go | 12 ++++++++++-- event.go | 6 +++++- meter.go | 14 ++++++++++++-- recorder.go | 2 +- sensor.go | 14 ++++++++++++-- span.go | 7 ++++++- tracer.go | 7 +++++++ 7 files changed, 53 insertions(+), 9 deletions(-) diff --git a/delayed_spans.go b/delayed_spans.go index 9e37f135f..acdbfa902 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -33,12 +33,20 @@ func (ds *delayedSpans) flush() { case s := <-ds.spans: t, ok := s.Tracer().(Tracer) if !ok { - sensor.logger.Debug("span tracer has unexpected type") + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("span tracer has unexpected type") + } + muSensor.Unlock() continue } if err := ds.processSpan(s, t.Options()); err != nil { - sensor.logger.Debug("error while processing spans:", err.Error()) + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("error while processing spans:", err.Error()) + } + muSensor.Unlock() continue } diff --git a/event.go b/event.go index 8fa4c345a..5877a30d1 100644 --- a/event.go +++ b/event.go @@ -80,6 +80,10 @@ func sendEvent(event *EventData) { // we do fire & forget here, because the whole pid dance isn't necessary to send events go func() { - _ = sensor.Agent().SendEvent(event) + muSensor.Lock() + if sensor != nil { + _ = sensor.Agent().SendEvent(event) + } + muSensor.Unlock() }() } diff --git a/meter.go b/meter.go index 8d41c1c74..6e8596c8c 100644 --- a/meter.go +++ b/meter.go @@ -43,9 +43,19 @@ func (m *meterS) Run(collectInterval time.Duration) { case <-m.done: return case <-ticker.C: - if sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { go func() { - _ = sensor.Agent().SendMetrics(m.collectMetrics()) + metrics := m.collectMetrics() + muSensor.Lock() + if sensor != nil { + _ = sensor.Agent().SendMetrics(metrics) + } + muSensor.Unlock() }() } } diff --git a/recorder.go b/recorder.go index 12eefea18..e20eb527e 100644 --- a/recorder.go +++ b/recorder.go @@ -81,7 +81,7 @@ func (r *Recorder) RecordSpan(span *spanS) { return } - agentReady := sensor.Agent().Ready() + agentReady := sensor != nil && sensor.Agent().Ready() maxBufferedSpans := sensor.options.MaxBufferedSpans forceTransmissionAt := sensor.options.ForceTransmissionStartingAt logger := sensor.logger diff --git a/sensor.go b/sensor.go index a2f51efa4..9f292229f 100644 --- a/sensor.go +++ b/sensor.go @@ -224,13 +224,23 @@ func InitSensor(options *Options) { }) autoprofile.SetSendProfilesFunc(func(profiles []autoprofile.Profile) error { - if !sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if !agentReady { return errors.New("sender not ready") } sensor.logger.Debug("sending profiles to agent") - return sensor.Agent().SendProfiles(profiles) + // Use the same lock for sending profiles + muSensor.Lock() + err := sensor.Agent().SendProfiles(profiles) + muSensor.Unlock() + + return err }) if _, ok := os.LookupEnv("INSTANA_AUTO_PROFILE"); ok || options.EnableAutoProfile { diff --git a/span.go b/span.go index f3faee6d5..45797cc61 100644 --- a/span.go +++ b/span.go @@ -86,7 +86,12 @@ func (r *spanS) FinishWithOptions(opts ot.FinishOptions) { r.Duration = duration if r.sendSpanToAgent() { - if sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { r.tracer.recorder.RecordSpan(r) } else { delayed.append(r) diff --git a/tracer.go b/tracer.go index 2f14f7d74..9dc1bfda2 100644 --- a/tracer.go +++ b/tracer.go @@ -135,5 +135,12 @@ func (r *tracerS) Flush(ctx context.Context) error { return err } + muSensor.Lock() + defer muSensor.Unlock() + + if sensor == nil { + return nil + } + return sensor.Agent().Flush(ctx) } From 761a9273bac53aae1164c0a736cb09ac9764166f Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Wed, 15 Oct 2025 14:44:49 +0530 Subject: [PATCH 6/6] chore: added extra unit test cases for recorder.go --- recorder.go | 17 ++++- recorder_test.go | 157 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/recorder.go b/recorder.go index e20eb527e..805533001 100644 --- a/recorder.go +++ b/recorder.go @@ -147,19 +147,32 @@ func (r *Recorder) GetQueuedSpans() []Span { // Flush sends queued spans to the agent func (r *Recorder) Flush(ctx context.Context) error { - spansToSend := r.GetQueuedSpans() - if len(spansToSend) == 0 { + // For test mode, we don't want to actually send spans + if r.testMode { return nil } + // Check if agent is ready before getting and clearing spans muSensor.Lock() if sensor == nil { muSensor.Unlock() return nil } + agent := sensor.Agent() + agentReady := agent.Ready() muSensor.Unlock() + // If agent is not ready, don't flush spans + if !agentReady { + return nil + } + + spansToSend := r.GetQueuedSpans() + if len(spansToSend) == 0 { + return nil + } + if err := agent.SendSpans(spansToSend); err != nil { r.Lock() defer r.Unlock() diff --git a/recorder_test.go b/recorder_test.go index 7dad4ad79..6e14c98ec 100644 --- a/recorder_test.go +++ b/recorder_test.go @@ -4,9 +4,14 @@ package instana_test import ( + "context" + "fmt" "testing" + "time" instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/acceptor" + "github.com/instana/go-sensor/autoprofile" ot "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" @@ -68,3 +73,155 @@ func TestRecorder_BatchSpan_Single(t *testing.T) { assert.Nil(t, spans[0].Batch) } + +func TestRecorder_Flush_EmptyQueue(t *testing.T) { + recorder := instana.NewTestRecorder() + + // Test flushing an empty queue + err := recorder.Flush(context.Background()) + assert.NoError(t, err) +} + +func TestRecorder_MaxBufferedSpans(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + MaxBufferedSpans: 3, // Set a small buffer size for testing + }) + defer instana.ShutdownCollector() + + // Create more spans than the buffer can hold + for i := 0; i < 5; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Verify that only the most recent MaxBufferedSpans are kept + spans := recorder.GetQueuedSpans() + assert.Len(t, spans, 3) + + // Verify that only the most recent MaxBufferedSpans are kept + assert.Len(t, spans, 3) +} + +func TestRecorder_ForceTransmission(t *testing.T) { + // Create a mock agent client that tracks when spans are sent + mockAgent := &mockAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + MaxBufferedSpans: 10, + ForceTransmissionStartingAt: 2, // Force transmission after 2 spans + }) + defer instana.ShutdownCollector() + + // Create spans to trigger force transmission + for i := 0; i < 2; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Give some time for the async flush to happen + time.Sleep(100 * time.Millisecond) + + // Verify that SendSpans was called + assert.True(t, mockAgent.spansSent, "Expected spans to be sent to the agent") +} + +// Mock agent client for testing +type mockAgentClient struct { + ready bool + spansSent bool +} + +func (m *mockAgentClient) Ready() bool { return m.ready } +func (m *mockAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *mockAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *mockAgentClient) SendSpans(spans []instana.Span) error { + m.spansSent = true + return nil +} +func (m *mockAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *mockAgentClient) Flush(context.Context) error { return nil } + +// alwaysReadyClient is already defined in instrumentation_http_test.go + +func TestRecorder_Flush_Error(t *testing.T) { + // Create a mock agent client that returns an error on SendSpans + mockAgent := &errorAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Flush should return an error + err := recorder.Flush(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to send collected spans") + + // Verify that spans are put back in the queue + assert.Greater(t, recorder.QueuedSpansCount(), 0) +} + +// Mock agent client that returns an error on SendSpans +type errorAgentClient struct { + ready bool +} + +func (m *errorAgentClient) Ready() bool { return m.ready } +func (m *errorAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *errorAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *errorAgentClient) SendSpans(spans []instana.Span) error { return fmt.Errorf("mock error") } +func (m *errorAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *errorAgentClient) Flush(context.Context) error { return nil } + +// TestRecorder_Flush_AgentNotReady tests the behavior when the agent is not ready +func TestRecorder_Flush_AgentNotReady(t *testing.T) { + // Create a mock agent client that is not ready + mockAgent := ¬ReadyAgentClient{} + + // Use a regular recorder, not a test recorder + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Wait a bit for the span to be processed + time.Sleep(100 * time.Millisecond) + + // Get the initial count + initialCount := recorder.QueuedSpansCount() + + // Flush should not return an error when agent is not ready + err := recorder.Flush(context.Background()) + assert.NoError(t, err) + + // Spans should still be in the queue when agent is not ready + assert.Equal(t, initialCount, recorder.QueuedSpansCount(), "Spans should remain in queue when agent is not ready") +} + +// Mock agent client that is never ready +type notReadyAgentClient struct{} + +func (notReadyAgentClient) Ready() bool { return false } +func (notReadyAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (notReadyAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (notReadyAgentClient) SendSpans(spans []instana.Span) error { return nil } +func (notReadyAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (notReadyAgentClient) Flush(context.Context) error { return nil }