Skip to content

Commit 27252ec

Browse files
committed
Make events heartbeat interval configurable.
Removes hardcoded global; resolves FIXME.
1 parent 039f63e commit 27252ec

15 files changed

+60
-30
lines changed

server/configs/reload/reload.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ logtime_utc: true # enable on reload
1111
log_file: "nats-server.log" # change on reload
1212
pid_file: "nats-server.pid" # change on reload
1313
max_control_line: 512 # change on reload
14+
hb_interval: 5 # change on reload
1415
ping_interval: 5 # change on reload
1516
ping_max: 1 # change on reload
1617
write_deadline: "3s" # change on reload

server/configs/test.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ max_control_line: 2048
4242
# maximum payload
4343
max_payload: 65536
4444

45+
# heartbeat interval
46+
hb_interval: "30s"
47+
4548
# ping interval and no pong threshold
4649
ping_interval: "60s"
4750
ping_max: 3

server/const.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ const (
8585
// AUTH_TIMEOUT is the authorization wait time.
8686
AUTH_TIMEOUT = 2 * time.Second
8787

88+
// DEFAULT_HB_INTERVAL is how often miscellaneous HeartBeat operations occur.
89+
DEFAULT_HB_INTERVAL = 30 * time.Second
90+
8891
// DEFAULT_PING_INTERVAL is how often pings are sent to clients and routes.
8992
DEFAULT_PING_INTERVAL = 2 * time.Minute
9093

server/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1968,7 +1968,7 @@ func (o *consumer) checkAndSetPendingRequestsOk() {
19681968
if !versionAtLeast(si.(nodeInfo).version, 2, 7, 1) {
19691969
// We expect all of our peers to eventually be up to date.
19701970
// So check again in awhile.
1971-
time.AfterFunc(eventsHBInterval, func() { o.checkAndSetPendingRequestsOk() })
1971+
time.AfterFunc(s.opts.HBInterval, func() { o.checkAndSetPendingRequestsOk() })
19721972
o.setPendingRequestsOk(false)
19731973
return
19741974
}

server/events.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ const (
8080
accReqAccIndex = 3
8181
)
8282

83-
// FIXME(dlc) - make configurable.
84-
var eventsHBInterval = 30 * time.Second
85-
8683
type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte)
8784

8885
// Used if we have to queue things internally to avoid the route/gw path.
@@ -1817,9 +1814,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
18171814
} else {
18181815
// Check to see if we have an HB running and update.
18191816
if a.ctmr == nil {
1820-
a.ctmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
1817+
a.ctmr = time.AfterFunc(s.opts.HBInterval, func() { s.accConnsUpdate(a) })
18211818
} else {
1822-
a.ctmr.Reset(eventsHBInterval)
1819+
a.ctmr.Reset(s.opts.HBInterval)
18231820
}
18241821
}
18251822
for _, sub := range subj {

server/events_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,13 +1780,10 @@ func TestSystemAccountNoAuthUser(t *testing.T) {
17801780
}
17811781

17821782
func TestServerAccountConns(t *testing.T) {
1783-
// speed up hb
1784-
orgHBInterval := eventsHBInterval
1785-
eventsHBInterval = time.Millisecond * 100
1786-
defer func() { eventsHBInterval = orgHBInterval }()
17871783
conf := createConfFile(t, []byte(`
17881784
host: 127.0.0.1
17891785
port: -1
1786+
hb_interval: 100ms # speed up hb
17901787
system_account: SYS
17911788
accounts: {
17921789
SYS: {users: [{user: s, password: s}]}
@@ -2293,11 +2290,10 @@ func (sr *slowAccResolver) Fetch(name string) (string, error) {
22932290
}
22942291

22952292
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
2296-
origEventsHBInterval := eventsHBInterval
2297-
eventsHBInterval = 50 * time.Millisecond
2298-
defer func() { eventsHBInterval = origEventsHBInterval }()
2299-
23002293
sa, _, sb, optsB, _ := runTrustedCluster(t)
2294+
sa.opts.HBInterval = 50 * time.Millisecond
2295+
sb.opts.HBInterval = sa.opts.HBInterval
2296+
23012297
defer sa.Shutdown()
23022298
defer sb.Shutdown()
23032299

server/jetstream_cluster_2_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6147,13 +6147,14 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T)
61476147
}
61486148

61496149
func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
6150-
orgEventsHBInterval := eventsHBInterval
6151-
eventsHBInterval = 500 * time.Millisecond
6152-
defer func() { eventsHBInterval = orgEventsHBInterval }()
6153-
61546150
c := createJetStreamClusterExplicit(t, "R3F", 3)
61556151
defer c.shutdown()
61566152

6153+
// Speed up HB
6154+
for _, s := range c.servers {
6155+
s.opts.HBInterval = 500 * time.Millisecond
6156+
}
6157+
61576158
s := c.randomServer()
61586159
nc, js := jsClientConnect(t, s)
61596160
defer nc.Close()
@@ -6184,9 +6185,9 @@ func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
61846185
nc.Close()
61856186

61866187
s.mu.RLock()
6187-
val := (s.sys.orphMax / eventsHBInterval) + 2
6188+
val := (s.sys.orphMax / s.opts.HBInterval) + 2
61886189
s.mu.RUnlock()
6189-
time.Sleep(val * eventsHBInterval)
6190+
time.Sleep(val * s.opts.HBInterval)
61906191
checkSysServers()
61916192
}
61926193

server/jetstream_super_cluster_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3837,14 +3837,11 @@ func TestJetStreamSuperClusterGWReplyRewrite(t *testing.T) {
38373837
}
38383838

38393839
func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
3840-
orgEventsHBInterval := eventsHBInterval
3841-
eventsHBInterval = 500 * time.Millisecond //time.Second
3842-
defer func() { eventsHBInterval = orgEventsHBInterval }()
3843-
38443840
tmpl := `
38453841
listen: 127.0.0.1:-1
38463842
server_name: %s
38473843
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
3844+
hb_interval: 500ms
38483845
38493846
gateway {
38503847
name: "local"
@@ -3883,6 +3880,7 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
38833880
listen: 127.0.0.1:-1
38843881
server_name: %s
38853882
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
3883+
hb_interval: 500ms
38863884
38873885
gateway {
38883886
name: "remote"

server/jwt_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3335,9 +3335,6 @@ func writeJWT(t *testing.T, dir string, pub string, jwt string) {
33353335
}
33363336

33373337
func TestJWTAccountNATSResolverFetch(t *testing.T) {
3338-
origEventsHBInterval := eventsHBInterval
3339-
eventsHBInterval = 50 * time.Millisecond // speed up eventing
3340-
defer func() { eventsHBInterval = origEventsHBInterval }()
33413338
require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) {
33423339
t.Helper()
33433340
for _, srv := range srvs {
@@ -3446,6 +3443,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34463443
server_name: srv-A
34473444
operator: %s
34483445
system_account: %s
3446+
hb_interval: 50ms
34493447
resolver: {
34503448
type: full
34513449
dir: '%s'
@@ -3471,6 +3469,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34713469
server_name: srv-B
34723470
operator: %s
34733471
system_account: %s
3472+
hb_interval: 50ms
34743473
resolver: {
34753474
type: full
34763475
@@ -3495,6 +3494,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34953494
server_name: srv-C
34963495
operator: %s
34973496
system_account: %s
3497+
hb_interval: 50ms
34983498
resolver: {
34993499
type: cache
35003500
dir: '%s'

server/monitor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,7 @@ type Varz struct {
11591159
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
11601160
MaxConn int `json:"max_connections"`
11611161
MaxSubs int `json:"max_subscriptions,omitempty"`
1162+
HBInterval time.Duration `json:"hb_interval"`
11621163
PingInterval time.Duration `json:"ping_interval"`
11631164
MaxPingsOut int `json:"ping_max"`
11641165
HTTPHost string `json:"http_host"`
@@ -1592,6 +1593,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
15921593
v.TLSRequired = info.TLSRequired
15931594
v.TLSVerify = info.TLSVerify
15941595
v.MaxConn = opts.MaxConn
1596+
v.HBInterval = opts.HBInterval
15951597
v.PingInterval = opts.PingInterval
15961598
v.MaxPingsOut = opts.MaxPingsOut
15971599
v.AuthTimeout = opts.AuthTimeout

server/opts.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ type Options struct {
236236
Authorization string `json:"-"`
237237
PingInterval time.Duration `json:"ping_interval"`
238238
MaxPingsOut int `json:"ping_max"`
239+
HBInterval time.Duration `json:"hb_interval"`
239240
HTTPHost string `json:"http_host"`
240241
HTTPPort int `json:"http_port"`
241242
HTTPBasePath string `json:"http_base_path"`
@@ -974,6 +975,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
974975
} else {
975976
o.MaxSubTokens = uint8(n)
976977
}
978+
case "hb_interval":
979+
o.HBInterval = parseDuration("hb_interval", tk, v, errors, warnings)
977980
case "ping_interval":
978981
o.PingInterval = parseDuration("ping_interval", tk, v, errors, warnings)
979982
case "ping_max":
@@ -4575,6 +4578,9 @@ func setBaselineOptions(opts *Options) {
45754578
if opts.MaxConn == 0 {
45764579
opts.MaxConn = DEFAULT_MAX_CONNECTIONS
45774580
}
4581+
if opts.HBInterval == 0 {
4582+
opts.HBInterval = DEFAULT_HB_INTERVAL
4583+
}
45784584
if opts.PingInterval == 0 {
45794585
opts.PingInterval = DEFAULT_PING_INTERVAL
45804586
}

server/opts_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestDefaultOptions(t *testing.T) {
5353
Port: DEFAULT_PORT,
5454
MaxConn: DEFAULT_MAX_CONNECTIONS,
5555
HTTPHost: DEFAULT_HOST,
56+
HBInterval: DEFAULT_HB_INTERVAL,
5657
PingInterval: DEFAULT_PING_INTERVAL,
5758
MaxPingsOut: DEFAULT_PING_MAX_OUT,
5859
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
@@ -113,6 +114,7 @@ func TestConfigFile(t *testing.T) {
113114
MaxConn: 100,
114115
MaxSubs: 1000,
115116
MaxPending: 10000000,
117+
HBInterval: 30 * time.Second,
116118
PingInterval: 60 * time.Second,
117119
MaxPingsOut: 3,
118120
WriteDeadline: 3 * time.Second,
@@ -272,6 +274,7 @@ func TestMergeOverrides(t *testing.T) {
272274
MaxConn: 100,
273275
MaxSubs: 1000,
274276
MaxPending: 10000000,
277+
HBInterval: 30 * time.Second,
275278
PingInterval: 60 * time.Second,
276279
MaxPingsOut: 3,
277280
Cluster: ClusterOpts{
@@ -1247,6 +1250,7 @@ func TestOptionsClone(t *testing.T) {
12471250
MaxControlLine: 2048,
12481251
MaxPayload: 65536,
12491252
MaxConn: 100,
1253+
HBInterval: 30 * time.Second,
12501254
PingInterval: 60 * time.Second,
12511255
MaxPingsOut: 3,
12521256
Cluster: ClusterOpts{

server/reload.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,19 @@ func (m *maxPayloadOption) Apply(server *Server) {
537537
server.Noticef("Reloaded: max_payload = %d", m.newValue)
538538
}
539539

540+
// hbIntervalOption implements the option interface for the `hb_interval`
541+
// setting.
542+
type hbIntervalOption struct {
543+
noopOption
544+
newValue time.Duration
545+
}
546+
547+
// Apply is a no-op because the heartbeat interval will be reloaded after
548+
// options are applied.
549+
func (p *hbIntervalOption) Apply(server *Server) {
550+
server.Noticef("Reloaded: hb_interval = %s", p.newValue)
551+
}
552+
540553
// pingIntervalOption implements the option interface for the `ping_interval`
541554
// setting.
542555
type pingIntervalOption struct {

server/reload_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
141141
MaxControlLine: 4096,
142142
MaxPayload: 1048576,
143143
MaxConn: 65536,
144+
HBInterval: 30 * time.Second,
144145
PingInterval: 2 * time.Minute,
145146
MaxPingsOut: 2,
146147
WriteDeadline: 10 * time.Second,
@@ -213,6 +214,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) {
213214
MaxControlLine: 4096,
214215
MaxPayload: 1048576,
215216
MaxConn: 65536,
217+
HBInterval: 30 * time.Second,
216218
PingInterval: 2 * time.Minute,
217219
MaxPingsOut: 2,
218220
WriteDeadline: 10 * time.Second,
@@ -276,6 +278,7 @@ func TestConfigReload(t *testing.T) {
276278
MaxControlLine: 4096,
277279
MaxPayload: 1048576,
278280
MaxConn: 65536,
281+
HBInterval: 30 * time.Second,
279282
PingInterval: 2 * time.Minute,
280283
MaxPingsOut: 2,
281284
WriteDeadline: 10 * time.Second,
@@ -352,6 +355,9 @@ func TestConfigReload(t *testing.T) {
352355
if updated.MaxControlLine != 512 {
353356
t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine)
354357
}
358+
if updated.HBInterval != 5*time.Second {
359+
t.Fatalf("HBInterval is incorrect.\nexpected 5s\ngot: %s", updated.HBInterval)
360+
}
355361
if updated.PingInterval != 5*time.Second {
356362
t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval)
357363
}

server/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,9 +1336,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
13361336
recvq: newIPQueue[*inSysMsg](s, "System recvQ"),
13371337
resetCh: make(chan struct{}),
13381338
sq: s.newSendQ(),
1339-
statsz: eventsHBInterval,
1340-
orphMax: 5 * eventsHBInterval,
1341-
chkOrph: 3 * eventsHBInterval,
1339+
statsz: s.opts.HBInterval,
1340+
orphMax: 5 * s.opts.HBInterval,
1341+
chkOrph: 3 * s.opts.HBInterval,
13421342
}
13431343
s.sys.wg.Add(1)
13441344
s.mu.Unlock()

0 commit comments

Comments
 (0)