Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions client/kafka/v2/async_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Package kafka provides a client with included tracing capabilities.
package v2

import (
"context"

"github.com/IBM/sarama"
"github.com/beatlabs/patron/observability"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

const (
deliveryTypeAsync = "async"
)

var deliveryTypeAsyncAttr = attribute.String("delivery", deliveryTypeAsync)

// AsyncProducer is an asynchronous Kafka producer.
type AsyncProducer struct {
producer sarama.AsyncProducer
}

// AsyncClose triggers a shutdown of the producer. The shutdown has completed
// when both the Errors and Successes channels have been closed. When calling
// AsyncClose, you *must* continue to read from those channels in order to
// drain the results of any messages in flight.
func (ap *AsyncProducer) AsyncClose() {
ap.producer.AsyncClose()
}

// Close shuts down the producer and waits for any buffered messages to be
// flushed. You must call this function before a producer object passes out of
// scope, as it may otherwise leak memory. You must call this before process
// shutting down, or you may lose messages. You must call this before calling
// Close on the underlying client.
func (ap *AsyncProducer) Close() error {
return ap.producer.Close()
}

// Send a message to a topic, asynchronously. Producer errors are queued on the
// channel obtained during the AsyncProducer creation.
func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) error {
ctx, sp := startSpan(ctx, "send", deliveryTypeAsync, msg.Topic)
defer sp.End()

injectTracingAndCorrelationHeaders(ctx, msg)

ap.producer.Input() <- msg
publishCountAdd(ctx, deliveryTypeAsyncAttr, observability.SucceededAttribute, topicAttribute(msg.Topic))
sp.SetStatus(codes.Ok, "message sent")
return nil
}

// Successes is the success output channel back to the user when Return.Successes is
// enabled. If Return.Successes is true, you MUST read from this channel or the
// Producer will deadlock. It is suggested that you send and read messages
// together in a single select statement.
func (ap *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {
return ap.producer.Successes()
}

// Errors is the error output channel back to the user. You MUST read from this
// channel or the Producer will deadlock when the channel is full. Alternatively,
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
func (ap *AsyncProducer) Errors() <-chan *sarama.ProducerError {
return ap.producer.Errors()
}

// IsTransactional return true when current producer is transactional.
func (ap *AsyncProducer) IsTransactional() bool {
return ap.producer.IsTransactional()
}

// TxnStatus return current producer transaction status.
func (ap *AsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag {
return ap.producer.TxnStatus()
}

// BeginTxn mark current transaction as ready.
func (ap *AsyncProducer) BeginTxn() error {
return ap.producer.BeginTxn()
}

// CommitTxn commit current transaction.
func (ap *AsyncProducer) CommitTxn() error {
return ap.producer.CommitTxn()
}

// AbortTxn abort current transaction.
func (ap *AsyncProducer) AbortTxn() error {
return ap.producer.AbortTxn()
}

// AddOffsetsToTxn add associated offsets to current transaction.
func (ap *AsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error {

Check failure on line 97 in client/kafka/v2/async_producer.go

View workflow job for this annotation

GitHub Actions / Lint and fmt check

var-naming: method parameter groupId should be groupID (revive)
return ap.producer.AddOffsetsToTxn(offsets, groupId)
}

// AddMessageToTxn add message offsets to current transaction.
func (ap *AsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error {

Check failure on line 102 in client/kafka/v2/async_producer.go

View workflow job for this annotation

GitHub Actions / Lint and fmt check

var-naming: method parameter groupId should be groupID (revive)
return ap.producer.AddMessageToTxn(msg, groupId, metadata)
}
178 changes: 178 additions & 0 deletions client/kafka/v2/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//go:build integration

package v2

import (
"context"
"os"
"testing"

"github.com/IBM/sarama"
"github.com/beatlabs/patron/internal/test"
"github.com/beatlabs/patron/observability/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

const (
clientTopic = "clientTopic"
)

var (
brokers = []string{"127.0.0.1:9092"}
tracePublisher *sdktrace.TracerProvider
traceExporter *tracetest.InMemoryExporter
)

func TestMain(m *testing.M) {
traceExporter = tracetest.NewInMemoryExporter()
tracePublisher = trace.Setup("test", nil, traceExporter)

code := m.Run()

os.Exit(code)
}

func TestNewAsyncProducer_Success(t *testing.T) {
saramaCfg, err := DefaultConfig("test-producer", true)
require.NoError(t, err)

ap, err := NewAsyncProducer(brokers, saramaCfg)
require.NoError(t, err)
assert.NotNil(t, ap)
}

func TestNewSyncProducer_Success(t *testing.T) {
saramaCfg, err := DefaultConfig("test-producer", true)
require.NoError(t, err)

p, err := NewSyncProducer(brokers, saramaCfg)
require.NoError(t, err)
assert.NotNil(t, p)
}

func TestAsyncProducer_SendMessage_Close(t *testing.T) {
t.Cleanup(func() { traceExporter.Reset() })

ctx := context.Background()

shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t)
defer shutdownProvider()

saramaCfg, err := DefaultConfig("test-consumer", false)
require.NoError(t, err)

ap, err := NewAsyncProducer(brokers, saramaCfg)
require.NoError(t, err)
assert.NotNil(t, ap)
msg := &sarama.ProducerMessage{
Topic: clientTopic,
Value: sarama.StringEncoder("TEST"),
Headers: []sarama.RecordHeader{{Key: []byte("123"), Value: []byte("123")}},
}
err = ap.Send(context.Background(), msg)
require.NoError(t, err)
require.NoError(t, ap.Close())

// Tracing
require.NoError(t, tracePublisher.ForceFlush(context.Background()))

expected := tracetest.SpanStub{
Name: "send",
Attributes: []attribute.KeyValue{
attribute.String("delivery", "async"),
attribute.String("client", "kafka"),
attribute.String("topic", "clientTopic"),
},
}

snaps := traceExporter.GetSpans().Snapshots()

assert.Len(t, snaps, 1)
assert.Equal(t, expected.Name, snaps[0].Name())
assert.Equal(t, expected.Attributes, snaps[0].Attributes())

// Metrics
_ = assertCollectMetrics(1)
}

func TestSyncProducer_SendMessage_Close(t *testing.T) {
t.Cleanup(func() {
traceExporter.Reset()
})
saramaCfg, err := DefaultConfig("test-producer", true)
require.NoError(t, err)

p, err := NewSyncProducer(brokers, saramaCfg)
require.NoError(t, err)
assert.NotNil(t, p)
msg := &sarama.ProducerMessage{
Topic: clientTopic,
Value: sarama.StringEncoder("TEST"),
}
partition, offset, err := p.SendMessage(context.Background(), msg)
require.NoError(t, err)
assert.GreaterOrEqual(t, partition, int32(0))
assert.GreaterOrEqual(t, offset, int64(0))
require.NoError(t, p.Close())

// Tracing
require.NoError(t, tracePublisher.ForceFlush(context.Background()))

expected := tracetest.SpanStub{
Name: "send",
Attributes: []attribute.KeyValue{
attribute.String("delivery", "sync"),
attribute.String("client", "kafka"),
attribute.String("topic", "clientTopic"),
},
}

snaps := traceExporter.GetSpans().Snapshots()

assert.Len(t, snaps, 1)
assert.Equal(t, expected.Name, snaps[0].Name())
assert.Equal(t, expected.Attributes, snaps[0].Attributes())
}

func TestSyncProducer_SendMessages_Close(t *testing.T) {
t.Cleanup(func() {
traceExporter.Reset()
})
saramaCfg, err := DefaultConfig("test-producer", true)
require.NoError(t, err)

p, err := NewSyncProducer(brokers, saramaCfg)
require.NoError(t, err)
assert.NotNil(t, p)
msg1 := &sarama.ProducerMessage{
Topic: clientTopic,
Value: sarama.StringEncoder("TEST1"),
}
msg2 := &sarama.ProducerMessage{
Topic: clientTopic,
Value: sarama.StringEncoder("TEST2"),
}
err = p.SendBatch(context.Background(), []*sarama.ProducerMessage{msg1, msg2})
require.NoError(t, err)
require.NoError(t, p.Close())
// Tracing
require.NoError(t, tracePublisher.ForceFlush(context.Background()))

expected := tracetest.SpanStub{
Name: "send-batch",
Attributes: []attribute.KeyValue{
attribute.String("delivery", "sync"),
attribute.String("client", "kafka"),
},
}

snaps := traceExporter.GetSpans().Snapshots()

assert.Len(t, snaps, 1)
assert.Equal(t, expected.Name, snaps[0].Name())
assert.Equal(t, expected.Attributes, snaps[0].Attributes())
}
66 changes: 66 additions & 0 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package v2

import (
"errors"
"fmt"
"os"

"github.com/IBM/sarama"
"github.com/beatlabs/patron/internal/validation"
)

// DefaultConfig creates a default Sarama configuration with idempotency enabled.
// See also:
// * https://pkg.go.dev/github.com/Shopify/sarama#RequiredAcks
// * https://pkg.go.dev/github.com/Shopify/sarama#Config
func DefaultConfig(name string, idempotent bool) (*sarama.Config, error) {
host, err := os.Hostname()
if err != nil {
return nil, errors.New("failed to get hostname")
}

cfg := sarama.NewConfig()
cfg.ClientID = fmt.Sprintf("%s-%s", host, name)

if idempotent {
cfg.Net.MaxOpenRequests = 1
cfg.Producer.Idempotent = idempotent
}

cfg.Producer.RequiredAcks = sarama.WaitForAll

return cfg, nil
}

func NewSyncProducer(brokers []string, cfg *sarama.Config) (*SyncProducer, error) {
if validation.IsStringSliceEmpty(brokers) {
return nil, errors.New("brokers are empty or have an empty value")
}
if cfg == nil {
return nil, errors.New("no sarama configuration specified")
}

// required for any SyncProducer; 'Errors' is already true by default for both async/sync producers
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create sync producer: %w", err)
}

return &SyncProducer{producer: producer}, nil
}

func NewAsyncProducer(brokers []string, cfg *sarama.Config) (*AsyncProducer, error) {
if validation.IsStringSliceEmpty(brokers) {
return nil, errors.New("brokers are empty or have an empty value")
}
if cfg == nil {
return nil, errors.New("no sarama configuration specified")
}
producer, err := sarama.NewAsyncProducer(brokers, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create async producer: %w", err)
}
return &AsyncProducer{producer: producer}, nil
}
Loading
Loading