Skip to content

Commit a113283

Browse files
committed
Make events heartbeat interval configurable.
Removes hardcoded global; resolves FIXME. Signed-off-by: Jason Volk <jason@zemos.net>
1 parent 3f28de8 commit a113283

15 files changed

+65
-33
lines changed

server/configs/reload/reload.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ logtime_utc: true # enable on reload
1111
log_file: "nats-server.log" # change on reload
1212
pid_file: "nats-server.pid" # change on reload
1313
max_control_line: 512 # change on reload
14+
hb_interval: 5 # change on reload
1415
ping_interval: 5 # change on reload
1516
ping_max: 1 # change on reload
1617
write_deadline: "3s" # change on reload

server/configs/test.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ max_control_line: 2048
4242
# maximum payload
4343
max_payload: 65536
4444

45+
# heartbeat interval
46+
hb_interval: "30s"
47+
4548
# ping interval and no pong threshold
4649
ping_interval: "60s"
4750
ping_max: 3

server/const.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ const (
8585
// AUTH_TIMEOUT is the authorization wait time.
8686
AUTH_TIMEOUT = 2 * time.Second
8787

88+
// DEFAULT_HB_INTERVAL is how often miscellaneous HeartBeat operations occur.
89+
DEFAULT_HB_INTERVAL = 30 * time.Second
90+
8891
// DEFAULT_PING_INTERVAL is how often pings are sent to clients, etc...
8992
DEFAULT_PING_INTERVAL = 2 * time.Minute
9093

server/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2120,7 +2120,7 @@ func (o *consumer) checkAndSetPendingRequestsOk() {
21202120
if !versionAtLeast(si.(nodeInfo).version, 2, 7, 1) {
21212121
// We expect all of our peers to eventually be up to date.
21222122
// So check again in awhile.
2123-
time.AfterFunc(eventsHBInterval, func() { o.checkAndSetPendingRequestsOk() })
2123+
time.AfterFunc(s.opts.HBInterval, func() { o.checkAndSetPendingRequestsOk() })
21242124
o.setPendingRequestsOk(false)
21252125
return
21262126
}

server/events.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ const (
9494
ocspPeerChainlinkInvalidEventSubj = "$SYS.SERVER.%s.OCSP.PEER.LINK.INVALID"
9595
)
9696

97-
// FIXME(dlc) - make configurable.
98-
var eventsHBInterval = 30 * time.Second
99-
10097
type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte)
10198

10299
// Used if we have to queue things internally to avoid the route/gw path.
@@ -2044,9 +2041,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
20442041
} else {
20452042
// Check to see if we have an HB running and update.
20462043
if a.ctmr == nil {
2047-
a.ctmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
2044+
a.ctmr = time.AfterFunc(s.opts.HBInterval, func() { s.accConnsUpdate(a) })
20482045
} else {
2049-
a.ctmr.Reset(eventsHBInterval)
2046+
a.ctmr.Reset(s.opts.HBInterval)
20502047
}
20512048
}
20522049
for _, sub := range subj {

server/events_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func runTrustedServer(t *testing.T) (*Server, *Options) {
8383
return s, opts
8484
}
8585

86-
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
86+
func startTrustedCluster(t *testing.T, optsA *Options) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
8787
t.Helper()
8888

8989
kp, _ := nkeys.FromSeed(oSeed)
@@ -102,7 +102,6 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
102102

103103
mr.Store(apub, jwt)
104104

105-
optsA := DefaultOptions()
106105
optsA.Cluster.Name = "TEST CLUSTER 22"
107106
optsA.Cluster.Host = "127.0.0.1"
108107
optsA.TrustedKeys = []string{pub}
@@ -127,6 +126,11 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
127126
return sa, optsA, sb, optsB, akp
128127
}
129128

129+
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
130+
opts := DefaultOptions()
131+
return startTrustedCluster(t, opts)
132+
}
133+
130134
func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
131135
t.Helper()
132136

@@ -1776,13 +1780,10 @@ func TestSystemAccountNoAuthUser(t *testing.T) {
17761780
}
17771781

17781782
func TestServerAccountConns(t *testing.T) {
1779-
// speed up hb
1780-
orgHBInterval := eventsHBInterval
1781-
eventsHBInterval = time.Millisecond * 100
1782-
defer func() { eventsHBInterval = orgHBInterval }()
17831783
conf := createConfFile(t, []byte(`
17841784
host: 127.0.0.1
17851785
port: -1
1786+
hb_interval: 100ms # speed up hb
17861787
system_account: SYS
17871788
accounts: {
17881789
SYS: {users: [{user: s, password: s}]}
@@ -3095,11 +3096,10 @@ func (sr *slowAccResolver) Fetch(name string) (string, error) {
30953096
}
30963097

30973098
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
3098-
origEventsHBInterval := eventsHBInterval
3099-
eventsHBInterval = 50 * time.Millisecond
3100-
defer func() { eventsHBInterval = origEventsHBInterval }()
3099+
opts := DefaultOptions()
3100+
opts.HBInterval = 50 * time.Millisecond
3101+
sa, _, sb, optsB, _ := startTrustedCluster(t, opts)
31013102

3102-
sa, _, sb, optsB, _ := runTrustedCluster(t)
31033103
defer sa.Shutdown()
31043104
defer sb.Shutdown()
31053105

server/jetstream_cluster_2_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6154,11 +6154,8 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T)
61546154
}
61556155

61566156
func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
6157-
orgEventsHBInterval := eventsHBInterval
6158-
eventsHBInterval = 500 * time.Millisecond
6159-
defer func() { eventsHBInterval = orgEventsHBInterval }()
6160-
6161-
c := createJetStreamClusterExplicit(t, "R3F", 3)
6157+
tmpl := jsClusterTempl + "\n\thb_interval: 500ms" // speed up hb
6158+
c := createJetStreamClusterWithTemplate(t, tmpl, "R3F", 3)
61626159
defer c.shutdown()
61636160

61646161
s := c.randomServer()
@@ -6191,9 +6188,9 @@ func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
61916188
nc.Close()
61926189

61936190
s.mu.RLock()
6194-
val := (s.sys.orphMax / eventsHBInterval) + 2
6191+
val := (s.sys.orphMax / s.opts.HBInterval) + 2
61956192
s.mu.RUnlock()
6196-
time.Sleep(val * eventsHBInterval)
6193+
time.Sleep(val * s.opts.HBInterval)
61976194
checkSysServers()
61986195
}
61996196

server/jetstream_super_cluster_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3837,14 +3837,11 @@ func TestJetStreamSuperClusterGWReplyRewrite(t *testing.T) {
38373837
}
38383838

38393839
func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
3840-
orgEventsHBInterval := eventsHBInterval
3841-
eventsHBInterval = 500 * time.Millisecond //time.Second
3842-
defer func() { eventsHBInterval = orgEventsHBInterval }()
3843-
38443840
tmpl := `
38453841
listen: 127.0.0.1:-1
38463842
server_name: %s
38473843
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
3844+
hb_interval: 500ms
38483845
38493846
gateway {
38503847
name: "local"
@@ -3883,6 +3880,7 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
38833880
listen: 127.0.0.1:-1
38843881
server_name: %s
38853882
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
3883+
hb_interval: 500ms
38863884
38873885
gateway {
38883886
name: "remote"

server/jwt_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3335,9 +3335,6 @@ func writeJWT(t *testing.T, dir string, pub string, jwt string) {
33353335
}
33363336

33373337
func TestJWTAccountNATSResolverFetch(t *testing.T) {
3338-
origEventsHBInterval := eventsHBInterval
3339-
eventsHBInterval = 50 * time.Millisecond // speed up eventing
3340-
defer func() { eventsHBInterval = origEventsHBInterval }()
33413338
require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) {
33423339
t.Helper()
33433340
for _, srv := range srvs {
@@ -3446,6 +3443,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34463443
server_name: srv-A
34473444
operator: %s
34483445
system_account: %s
3446+
hb_interval: 50ms
34493447
resolver: {
34503448
type: full
34513449
dir: '%s'
@@ -3471,6 +3469,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34713469
server_name: srv-B
34723470
operator: %s
34733471
system_account: %s
3472+
hb_interval: 50ms
34743473
resolver: {
34753474
type: full
34763475
@@ -3495,6 +3494,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
34953494
server_name: srv-C
34963495
operator: %s
34973496
system_account: %s
3497+
hb_interval: 50ms
34983498
resolver: {
34993499
type: cache
35003500
dir: '%s'

server/monitor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,6 +1170,7 @@ type Varz struct {
11701170
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
11711171
MaxConn int `json:"max_connections"`
11721172
MaxSubs int `json:"max_subscriptions,omitempty"`
1173+
HBInterval time.Duration `json:"hb_interval"`
11731174
PingInterval time.Duration `json:"ping_interval"`
11741175
MaxPingsOut int `json:"ping_max"`
11751176
HTTPHost string `json:"http_host"`
@@ -1639,6 +1640,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
16391640
v.TLSRequired = info.TLSRequired
16401641
v.TLSVerify = info.TLSVerify
16411642
v.MaxConn = opts.MaxConn
1643+
v.HBInterval = opts.HBInterval
16421644
v.PingInterval = opts.PingInterval
16431645
v.MaxPingsOut = opts.MaxPingsOut
16441646
v.AuthTimeout = opts.AuthTimeout

server/opts.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ type Options struct {
278278
AuthCallout *AuthCallout `json:"-"`
279279
PingInterval time.Duration `json:"ping_interval"`
280280
MaxPingsOut int `json:"ping_max"`
281+
HBInterval time.Duration `json:"hb_interval"`
281282
HTTPHost string `json:"http_host"`
282283
HTTPPort int `json:"http_port"`
283284
HTTPBasePath string `json:"http_base_path"`
@@ -1035,6 +1036,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
10351036
} else {
10361037
o.MaxSubTokens = uint8(n)
10371038
}
1039+
case "hb_interval":
1040+
o.HBInterval = parseDuration("hb_interval", tk, v, errors, warnings)
10381041
case "ping_interval":
10391042
o.PingInterval = parseDuration("ping_interval", tk, v, errors, warnings)
10401043
case "ping_max":
@@ -4846,6 +4849,9 @@ func setBaselineOptions(opts *Options) {
48464849
if opts.MaxConn == 0 {
48474850
opts.MaxConn = DEFAULT_MAX_CONNECTIONS
48484851
}
4852+
if opts.HBInterval == 0 {
4853+
opts.HBInterval = DEFAULT_HB_INTERVAL
4854+
}
48494855
if opts.PingInterval == 0 {
48504856
opts.PingInterval = DEFAULT_PING_INTERVAL
48514857
}

server/opts_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestDefaultOptions(t *testing.T) {
5353
Port: DEFAULT_PORT,
5454
MaxConn: DEFAULT_MAX_CONNECTIONS,
5555
HTTPHost: DEFAULT_HOST,
56+
HBInterval: DEFAULT_HB_INTERVAL,
5657
PingInterval: DEFAULT_PING_INTERVAL,
5758
MaxPingsOut: DEFAULT_PING_MAX_OUT,
5859
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
@@ -113,6 +114,7 @@ func TestConfigFile(t *testing.T) {
113114
MaxConn: 100,
114115
MaxSubs: 1000,
115116
MaxPending: 10000000,
117+
HBInterval: 30 * time.Second,
116118
PingInterval: 60 * time.Second,
117119
MaxPingsOut: 3,
118120
WriteDeadline: 3 * time.Second,
@@ -272,6 +274,7 @@ func TestMergeOverrides(t *testing.T) {
272274
MaxConn: 100,
273275
MaxSubs: 1000,
274276
MaxPending: 10000000,
277+
HBInterval: 30 * time.Second,
275278
PingInterval: 60 * time.Second,
276279
MaxPingsOut: 3,
277280
Cluster: ClusterOpts{
@@ -1251,6 +1254,7 @@ func TestOptionsClone(t *testing.T) {
12511254
MaxControlLine: 2048,
12521255
MaxPayload: 65536,
12531256
MaxConn: 100,
1257+
HBInterval: 30 * time.Second,
12541258
PingInterval: 60 * time.Second,
12551259
MaxPingsOut: 3,
12561260
Cluster: ClusterOpts{

server/reload.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,19 @@ func (m *maxPayloadOption) Apply(server *Server) {
612612
server.Noticef("Reloaded: max_payload = %d", m.newValue)
613613
}
614614

615+
// hbIntervalOption implements the option interface for the `hb_interval`
616+
// setting.
617+
type hbIntervalOption struct {
618+
noopOption
619+
newValue time.Duration
620+
}
621+
622+
// Apply is a no-op because the heartbeat interval will be reloaded after
623+
// options are applied.
624+
func (p *hbIntervalOption) Apply(server *Server) {
625+
server.Noticef("Reloaded: hb_interval = %s", p.newValue)
626+
}
627+
615628
// pingIntervalOption implements the option interface for the `ping_interval`
616629
// setting.
617630
type pingIntervalOption struct {
@@ -1267,6 +1280,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
12671280
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)})
12681281
case "maxpayload":
12691282
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)})
1283+
case "hbinterval":
1284+
diffOpts = append(diffOpts, &hbIntervalOption{newValue: newValue.(time.Duration)})
12701285
case "pinginterval":
12711286
diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
12721287
case "maxpingsout":

server/reload_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
143143
MaxControlLine: 4096,
144144
MaxPayload: 1048576,
145145
MaxConn: 65536,
146+
HBInterval: 30 * time.Second,
146147
PingInterval: 2 * time.Minute,
147148
MaxPingsOut: 2,
148149
WriteDeadline: 10 * time.Second,
@@ -215,6 +216,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) {
215216
MaxControlLine: 4096,
216217
MaxPayload: 1048576,
217218
MaxConn: 65536,
219+
HBInterval: 30 * time.Second,
218220
PingInterval: 2 * time.Minute,
219221
MaxPingsOut: 2,
220222
WriteDeadline: 10 * time.Second,
@@ -278,6 +280,7 @@ func TestConfigReload(t *testing.T) {
278280
MaxControlLine: 4096,
279281
MaxPayload: 1048576,
280282
MaxConn: 65536,
283+
HBInterval: 30 * time.Second,
281284
PingInterval: 2 * time.Minute,
282285
MaxPingsOut: 2,
283286
WriteDeadline: 10 * time.Second,
@@ -354,6 +357,9 @@ func TestConfigReload(t *testing.T) {
354357
if updated.MaxControlLine != 512 {
355358
t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine)
356359
}
360+
if updated.HBInterval != 5*time.Second {
361+
t.Fatalf("HBInterval is incorrect.\nexpected 5s\ngot: %s", updated.HBInterval)
362+
}
357363
if updated.PingInterval != 5*time.Second {
358364
t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval)
359365
}

server/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1647,9 +1647,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
16471647
recvq: newIPQueue[*inSysMsg](s, "System recvQ"),
16481648
resetCh: make(chan struct{}),
16491649
sq: s.newSendQ(),
1650-
statsz: eventsHBInterval,
1651-
orphMax: 5 * eventsHBInterval,
1652-
chkOrph: 3 * eventsHBInterval,
1650+
statsz: s.opts.HBInterval,
1651+
orphMax: 5 * s.opts.HBInterval,
1652+
chkOrph: 3 * s.opts.HBInterval,
16531653
}
16541654
s.sys.wg.Add(1)
16551655
s.mu.Unlock()

0 commit comments

Comments
 (0)