Skip to content

Commit f47df93

Browse files
Merge pull request #13 from redis-performance/wait.replicas
Added support for XADD + WAIT on producer workload generator
2 parents b9b904c + 8611e68 commit f47df93

File tree

1 file changed

+21
-6
lines changed

1 file changed

+21
-6
lines changed

cmd/producer.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ var producerCmd = &cobra.Command{
7474
readBufferEachConn, _ := cmd.Flags().GetInt("read-buffer-each-conn")
7575
writeBufferEachConn, _ := cmd.Flags().GetInt("write-buffer-each-conn")
7676
pprofPort, _ := cmd.Flags().GetInt64("pprof-port")
77-
77+
waitReplicas, _ := cmd.Flags().GetInt("wait-replicas")
78+
waitReplicasMs, _ := cmd.Flags().GetInt("wait-replicas-timeout-ms")
7879
if nClients > uint64(keyspaceLen) {
7980
log.Fatalf("The number of clients needs to be smaller or equal to the number of streams")
8081
}
@@ -128,7 +129,7 @@ var producerCmd = &cobra.Command{
128129
client := getClientWithOptions(connectionStr, auth, blockingPoolSize, readBufferEachConn, writeBufferEachConn, clientKeepAlive)
129130
defer client.Close()
130131

131-
go benchmarkRoutine(client, streamPrefix, value, datapointsChan, samplesPerClient, &wg, useRateLimiter, rateLimiter, gen, randSource, streamMaxlen, streamMaxlenExpireSeconds, loop)
132+
go benchmarkRoutine(client, streamPrefix, value, datapointsChan, samplesPerClient, &wg, useRateLimiter, rateLimiter, gen, randSource, streamMaxlen, streamMaxlenExpireSeconds, loop, waitReplicas, waitReplicasMs)
132133

133134
// delay the creation for each additional client
134135
time.Sleep(betweenClientsDelay)
@@ -170,7 +171,7 @@ var producerCmd = &cobra.Command{
170171
},
171172
}
172173

173-
func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapointsChan chan datapoint, samplesPerClient uint64, wg *sync.WaitGroup, useRateLimiter bool, rateLimiter *rate.Limiter, gen *generator.Zipfian, randSource *rand.Rand, streamMaxlen int64, streamMaxlenExpireSeconds int64, loop bool) {
174+
func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapointsChan chan datapoint, samplesPerClient uint64, wg *sync.WaitGroup, useRateLimiter bool, rateLimiter *rate.Limiter, gen *generator.Zipfian, randSource *rand.Rand, streamMaxlen int64, streamMaxlenExpireSeconds int64, loop bool, waitReplicas, waitReplicasMs int) {
174175
streamMessages := make(map[int64]int64, 0)
175176
defer wg.Done()
176177
for i := 0; uint64(i) < samplesPerClient || loop; i++ {
@@ -219,9 +220,21 @@ func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapoi
219220
counter = counter + 1
220221
cmdsIssued = append(cmdsIssued, XADD)
221222
streamEntry := fmt.Sprintf("%d", counter)
222-
startT := time.Now()
223-
err = client.Do(ctx, client.B().Xadd().Key(keyname).Id(streamEntry).FieldValue().FieldValue("field", value).Build()).Error()
224-
endT := time.Now()
223+
var startT time.Time
224+
var endT time.Time
225+
if waitReplicas > 0 {
226+
cmds := make(rueidis.Commands, 0, 2)
227+
cmds = append(cmds, client.B().Xadd().Key(keyname).Id(streamEntry).FieldValue().FieldValue("field", value).Build())
228+
cmds = append(cmds, client.B().Wait().Numreplicas(int64(waitReplicas)).Timeout(int64(waitReplicasMs)).Build())
229+
startT = time.Now()
230+
res := client.DoMulti(ctx, cmds...)
231+
endT = time.Now()
232+
err = res[0].Error()
233+
} else {
234+
startT = time.Now()
235+
err = client.Do(ctx, client.B().Xadd().Key(keyname).Id(streamEntry).FieldValue().FieldValue("field", value).Build()).Error()
236+
endT = time.Now()
237+
}
225238
duration := endT.Sub(startT)
226239
streamMessages[streamId] = counter
227240
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued, 1}
@@ -230,4 +243,6 @@ func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapoi
230243

231244
func init() {
232245
rootCmd.AddCommand(producerCmd)
246+
producerCmd.PersistentFlags().Int("wait-replicas", 0, "If larger than 0 will wait for the specified number of replicas.")
247+
producerCmd.PersistentFlags().Int("wait-replicas-timeout-ms", 1000, "WAIT timeout when used together with -wait-replicas.")
233248
}

0 commit comments

Comments
 (0)