Skip to content

Commit eca9e1e

Browse files
authored
Merge pull request #167 from linuxboot/bugfix/e2etest_race_condition
fix(e2e_tests): Removing a race condition with flags in the server
2 parents afc42fb + 4a6eb1e commit eca9e1e

File tree

1 file changed

+80
-72
lines changed

1 file changed

+80
-72
lines changed

cmds/contest/server/server.go

Lines changed: 80 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -42,47 +42,55 @@ import (
4242
"github.com/linuxboot/contest/plugins/listeners/httplistener"
4343
)
4444

45-
var (
46-
flagSet *flag.FlagSet
47-
flagDBURI *string
48-
flagListenAddr *string
49-
flagServerID *string
50-
flagProcessTimeout *time.Duration
51-
flagTargetLocker *string
52-
flagInstanceTag *string
53-
flagPauseTimeout *time.Duration
54-
flagResumeJobs *bool
55-
flagTargetLockDuration *time.Duration
45+
type flags struct {
46+
DBURI string
47+
ListenAddr string
48+
ServerID string
49+
ProcessTimeout time.Duration
50+
TargetLocker string
51+
InstanceTag string
52+
PauseTimeout time.Duration
53+
ResumeJobs bool
54+
TargetLockDuration time.Duration
5655
// http logger parameters
57-
flagAdminServerAddr *string
58-
flagHttpLoggerBufferSize *int
59-
flagHttpLoggerMaxBatchSize *int
60-
flagHttpLoggerMaxBatchCount *int
61-
flagHttpLoggerBatchSendFreq *time.Duration
62-
flagHttpLoggerTimeout *time.Duration
63-
logLevel = logger.LevelDebug
64-
)
56+
AdminServerAddr string
57+
HttpLoggerBufferSize int
58+
HttpLoggerMaxBatchSize int
59+
HttpLoggerMaxBatchCount int
60+
HttpLoggerBatchSendFreq time.Duration
61+
HttpLoggerTimeout time.Duration
62+
LogLevel logger.Level
63+
}
6564

66-
func initFlags(cmd string) {
67-
flagSet = flag.NewFlagSet(cmd, flag.ContinueOnError)
68-
flagDBURI = flagSet.String("dbURI", config.DefaultDBURI, "Database URI")
69-
flagListenAddr = flagSet.String("listenAddr", ":8080", "Listen address and port")
70-
flagAdminServerAddr = flagSet.String("adminServerAddr", "", "Addr of the admin server to connect to")
71-
flagHttpLoggerBufferSize = flagSet.Int("loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
72-
flagHttpLoggerMaxBatchSize = flagSet.Int("loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
73-
flagHttpLoggerMaxBatchCount = flagSet.Int("loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
74-
flagHttpLoggerBatchSendFreq = flagSet.Duration("loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
75-
flagHttpLoggerTimeout = flagSet.Duration("loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
76-
flagServerID = flagSet.String("serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
77-
flagProcessTimeout = flagSet.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout")
78-
flagTargetLocker = flagSet.String("targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
79-
flagInstanceTag = flagSet.String("instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
80-
flagSet.Var(&logLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
81-
flagPauseTimeout = flagSet.Duration("pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
82-
flagResumeJobs = flagSet.Bool("resumeJobs", false, "Attempt to resume paused jobs")
83-
flagTargetLockDuration = flagSet.Duration("targetLockDuration", config.DefaultTargetLockDuration,
65+
func parseFlags(cmd string, args ...string) (*flags, error) {
66+
f := &flags{
67+
LogLevel: logger.LevelDebug,
68+
}
69+
flagSet := flag.NewFlagSet(cmd, flag.ContinueOnError)
70+
flagSet.StringVar(&f.DBURI, "dbURI", config.DefaultDBURI, "Database URI")
71+
flagSet.StringVar(&f.ListenAddr, "listenAddr", ":8080", "Listen address and port")
72+
flagSet.StringVar(&f.AdminServerAddr, "adminServerAddr", "", "Addr of the admin server to connect to")
73+
flagSet.IntVar(&f.HttpLoggerBufferSize, "loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
74+
flagSet.IntVar(&f.HttpLoggerMaxBatchSize, "loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
75+
flagSet.IntVar(&f.HttpLoggerMaxBatchCount, "loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
76+
flagSet.DurationVar(&f.HttpLoggerBatchSendFreq, "loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
77+
flagSet.DurationVar(&f.HttpLoggerTimeout, "loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
78+
flagSet.StringVar(&f.ServerID, "serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
79+
flagSet.DurationVar(&f.ProcessTimeout, "processTimeout", api.DefaultEventTimeout, "API request processing timeout")
80+
flagSet.StringVar(&f.TargetLocker, "targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
81+
flagSet.StringVar(&f.InstanceTag, "instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
82+
flagSet.Var(&f.LogLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
83+
flagSet.DurationVar(&f.PauseTimeout, "pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
84+
flagSet.BoolVar(&f.ResumeJobs, "resumeJobs", false, "Attempt to resume paused jobs")
85+
flagSet.DurationVar(&f.TargetLockDuration, "targetLockDuration", config.DefaultTargetLockDuration,
8486
"The amount of time target lock is extended by while the job is running. "+
8587
"This is the maximum amount of time a job can stay paused safely.")
88+
89+
if err := flagSet.Parse(args); err != nil {
90+
return nil, err
91+
}
92+
93+
return f, nil
8694
}
8795

8896
var userFunctions = []map[string]interface{}{
@@ -153,24 +161,24 @@ func registerPlugins(pluginRegistry *pluginregistry.PluginRegistry, pluginConfig
153161

154162
// Main is the main function that executes the ConTest server.
155163
func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error {
156-
initFlags(cmd)
157-
if err := flagSet.Parse(args); err != nil {
158-
return err
164+
flags, err := parseFlags(cmd, args...)
165+
if err != nil {
166+
return fmt.Errorf("unable to parse the flags: %w", err)
159167
}
160168

161169
clk := clock.New()
162170

163171
ctx, cancel := context.WithCancel(context.Background())
164-
ctx = logging.WithBelt(ctx, logLevel)
172+
ctx = logging.WithBelt(ctx, flags.LogLevel)
165173

166-
if *flagAdminServerAddr != "" {
174+
if flags.AdminServerAddr != "" {
167175
httpHook, err := loggerhook.NewHttpHook(loggerhook.Config{
168-
Addr: *flagAdminServerAddr,
169-
BufferSize: *flagHttpLoggerBufferSize,
170-
MaxBatchSize: *flagHttpLoggerMaxBatchSize,
171-
MaxBatchCount: *flagHttpLoggerMaxBatchCount,
172-
BatchSendFreq: *flagHttpLoggerBatchSendFreq,
173-
LogTimeout: *flagHttpLoggerTimeout,
176+
Addr: flags.AdminServerAddr,
177+
BufferSize: flags.HttpLoggerBufferSize,
178+
MaxBatchSize: flags.HttpLoggerMaxBatchSize,
179+
MaxBatchCount: flags.HttpLoggerMaxBatchCount,
180+
BatchSendFreq: flags.HttpLoggerBatchSendFreq,
181+
LogTimeout: flags.HttpLoggerTimeout,
174182
})
175183
errmon.ObserveErrorCtx(ctx, err)
176184
if httpHook != nil {
@@ -200,8 +208,8 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
200208
}()
201209

202210
// primary storage initialization
203-
if *flagDBURI != "" {
204-
primaryDBURI := *flagDBURI
211+
if flags.DBURI != "" {
212+
primaryDBURI := flags.DBURI
205213
log.Infof("Using database URI for primary storage: %s", primaryDBURI)
206214
s, err := rdbms.New(primaryDBURI)
207215
if err != nil {
@@ -221,7 +229,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
221229

222230
// replica storage initialization
223231
// pointing to main database for now but can be used to point to replica
224-
replicaDBURI := *flagDBURI
232+
replicaDBURI := flags.DBURI
225233
log.Infof("Using database URI for replica storage: %s", replicaDBURI)
226234
r, err := rdbms.New(replicaDBURI)
227235
if err != nil {
@@ -258,54 +266,54 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
258266
}
259267

260268
// set Locker engine
261-
if *flagTargetLocker == "auto" {
262-
if *flagDBURI != "" {
263-
*flagTargetLocker = dblocker.Name
269+
if flags.TargetLocker == "auto" {
270+
if flags.DBURI != "" {
271+
flags.TargetLocker = dblocker.Name
264272
} else {
265-
*flagTargetLocker = inmemory.Name
273+
flags.TargetLocker = inmemory.Name
266274
}
267-
log.Infof("Locker engine set to auto, using %s", *flagTargetLocker)
275+
log.Infof("Locker engine set to auto, using %s", flags.TargetLocker)
268276
}
269-
switch *flagTargetLocker {
277+
switch flags.TargetLocker {
270278
case inmemory.Name:
271279
target.SetLocker(inmemory.New(clk))
272280
case dblocker.Name:
273-
if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil {
281+
if l, err := dblocker.New(flags.DBURI, dblocker.WithClock(clk)); err == nil {
274282
target.SetLocker(l)
275283
} else {
276-
log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err)
284+
log.Fatalf("Failed to create locker %q: %v", flags.TargetLocker, err)
277285
}
278286
default:
279-
log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
287+
log.Fatalf("Invalid target locker name %q", flags.TargetLocker)
280288
}
281289

282290
// spawn JobManager
283-
listener := httplistener.New(*flagListenAddr)
291+
listener := httplistener.New(flags.ListenAddr)
284292

285293
opts := []jobmanager.Option{
286-
jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
294+
jobmanager.APIOption(api.OptionEventTimeout(flags.ProcessTimeout)),
287295
}
288-
if *flagServerID != "" {
289-
opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID)))
296+
if flags.ServerID != "" {
297+
opts = append(opts, jobmanager.APIOption(api.OptionServerID(flags.ServerID)))
290298
}
291-
if *flagInstanceTag != "" {
292-
opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
299+
if flags.InstanceTag != "" {
300+
opts = append(opts, jobmanager.OptionInstanceTag(flags.InstanceTag))
293301
}
294-
if *flagTargetLockDuration != 0 {
295-
opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration))
302+
if flags.TargetLockDuration != 0 {
303+
opts = append(opts, jobmanager.OptionTargetLockDuration(flags.TargetLockDuration))
296304
}
297305

298306
jm, err := jobmanager.New(listener, pluginRegistry, storageEngineVault, opts...)
299307
if err != nil {
300308
log.Fatalf("%v", err)
301309
}
302310

303-
pauseTimeout := *flagPauseTimeout
311+
pauseTimeout := flags.PauseTimeout
304312

305313
go func() {
306314
intLevel := 0
307315
// cancel immediately if pauseTimeout is zero
308-
if *flagPauseTimeout == 0 {
316+
if flags.PauseTimeout == 0 {
309317
intLevel = 1
310318
}
311319
for {
@@ -325,7 +333,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
325333
if intLevel == 0 {
326334
log.Infof("Signal %q, pausing jobs", sig)
327335
pause()
328-
if *flagPauseTimeout > 0 {
336+
if flags.PauseTimeout > 0 {
329337
go func() {
330338
select {
331339
case <-ctx.Done():
@@ -344,7 +352,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
344352
}
345353
}()
346354

347-
err = jm.Run(ctx, *flagResumeJobs)
355+
err = jm.Run(ctx, flags.ResumeJobs)
348356

349357
target.SetLocker(nil)
350358

0 commit comments

Comments
 (0)