diff --git a/server/configs/reload/reload.conf b/server/configs/reload/reload.conf index 068500b328b..88d7dbd9dec 100644 --- a/server/configs/reload/reload.conf +++ b/server/configs/reload/reload.conf @@ -11,6 +11,7 @@ logtime_utc: true # enable on reload log_file: "nats-server.log" # change on reload pid_file: "nats-server.pid" # change on reload max_control_line: 512 # change on reload +hb_interval: 5 # change on reload ping_interval: 5 # change on reload ping_max: 1 # change on reload write_deadline: "3s" # change on reload diff --git a/server/configs/test.conf b/server/configs/test.conf index 3daa21687d0..0bd79d3521a 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -42,6 +42,9 @@ max_control_line: 2048 # maximum payload max_payload: 65536 +# heartbeat interval +hb_interval: "30s" + # ping interval and no pong threshold ping_interval: "60s" ping_max: 3 diff --git a/server/const.go b/server/const.go index 6b0bda6f955..a75976b5a1c 100644 --- a/server/const.go +++ b/server/const.go @@ -85,6 +85,9 @@ const ( // AUTH_TIMEOUT is the authorization wait time. AUTH_TIMEOUT = 2 * time.Second + // DEFAULT_HB_INTERVAL is how often miscellaneous HeartBeat operations occur. + DEFAULT_HB_INTERVAL = 30 * time.Second + // DEFAULT_PING_INTERVAL is how often pings are sent to clients, etc... DEFAULT_PING_INTERVAL = 2 * time.Minute diff --git a/server/consumer.go b/server/consumer.go index fa593c4378b..e66f60286d9 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2120,7 +2120,7 @@ func (o *consumer) checkAndSetPendingRequestsOk() { if !versionAtLeast(si.(nodeInfo).version, 2, 7, 1) { // We expect all of our peers to eventually be up to date. // So check again in awhile. - time.AfterFunc(eventsHBInterval, func() { o.checkAndSetPendingRequestsOk() }) + time.AfterFunc(s.opts.HBInterval, func() { o.checkAndSetPendingRequestsOk() }) o.setPendingRequestsOk(false) return } diff --git a/server/events.go b/server/events.go index e5e6af5d52d..47e1356a5a3 100644 --- a/server/events.go +++ b/server/events.go @@ -94,9 +94,6 @@ const ( ocspPeerChainlinkInvalidEventSubj = "$SYS.SERVER.%s.OCSP.PEER.LINK.INVALID" ) -// FIXME(dlc) - make configurable. -var eventsHBInterval = 30 * time.Second - type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte) // Used if we have to queue things internally to avoid the route/gw path. @@ -2044,9 +2041,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { } else { // Check to see if we have an HB running and update. if a.ctmr == nil { - a.ctmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) }) + a.ctmr = time.AfterFunc(s.opts.HBInterval, func() { s.accConnsUpdate(a) }) } else { - a.ctmr.Reset(eventsHBInterval) + a.ctmr.Reset(s.opts.HBInterval) } } for _, sub := range subj { diff --git a/server/events_test.go b/server/events_test.go index e3541999bad..87ad711ef93 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -83,7 +83,7 @@ func runTrustedServer(t *testing.T) (*Server, *Options) { return s, opts } -func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { +func startTrustedCluster(t *testing.T, optsA *Options) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { t.Helper() kp, _ := nkeys.FromSeed(oSeed) @@ -102,7 +102,6 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey mr.Store(apub, jwt) - optsA := DefaultOptions() optsA.Cluster.Name = "TEST CLUSTER 22" optsA.Cluster.Host = "127.0.0.1" optsA.TrustedKeys = []string{pub} @@ -127,6 +126,11 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey return sa, optsA, sb, optsB, akp } +func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { + opts := DefaultOptions() + return startTrustedCluster(t, opts) +} + func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { t.Helper() @@ -1776,13 +1780,10 @@ func TestSystemAccountNoAuthUser(t *testing.T) { } func TestServerAccountConns(t *testing.T) { - // speed up hb - orgHBInterval := eventsHBInterval - eventsHBInterval = time.Millisecond * 100 - defer func() { eventsHBInterval = orgHBInterval }() conf := createConfFile(t, []byte(` host: 127.0.0.1 port: -1 + hb_interval: 100ms # speed up hb system_account: SYS accounts: { SYS: {users: [{user: s, password: s}]} @@ -3095,11 +3096,10 @@ func (sr *slowAccResolver) Fetch(name string) (string, error) { } func TestConnectionUpdatesTimerProperlySet(t *testing.T) { - origEventsHBInterval := eventsHBInterval - eventsHBInterval = 50 * time.Millisecond - defer func() { eventsHBInterval = origEventsHBInterval }() + opts := DefaultOptions() + opts.HBInterval = 50 * time.Millisecond + sa, _, sb, optsB, _ := startTrustedCluster(t, opts) - sa, _, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 31ac47c5fdc..0172cce7d43 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6154,11 +6154,8 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) } func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) { - orgEventsHBInterval := eventsHBInterval - eventsHBInterval = 500 * time.Millisecond - defer func() { eventsHBInterval = orgEventsHBInterval }() - - c := createJetStreamClusterExplicit(t, "R3F", 3) + tmpl := jsClusterTempl + "\n\thb_interval: 500ms" // speed up hb + c := createJetStreamClusterWithTemplate(t, tmpl, "R3F", 3) defer c.shutdown() s := c.randomServer() @@ -6191,9 +6188,9 @@ func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) { nc.Close() s.mu.RLock() - val := (s.sys.orphMax / eventsHBInterval) + 2 + val := (s.sys.orphMax / s.opts.HBInterval) + 2 s.mu.RUnlock() - time.Sleep(val * eventsHBInterval) + time.Sleep(val * s.opts.HBInterval) checkSysServers() } diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 779de18b88e..f19c60daf2a 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3837,14 +3837,11 @@ func TestJetStreamSuperClusterGWReplyRewrite(t *testing.T) { } func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) { - orgEventsHBInterval := eventsHBInterval - eventsHBInterval = 500 * time.Millisecond //time.Second - defer func() { eventsHBInterval = orgEventsHBInterval }() - tmpl := ` listen: 127.0.0.1:-1 server_name: %s jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + hb_interval: 500ms gateway { name: "local" @@ -3883,6 +3880,7 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) { listen: 127.0.0.1:-1 server_name: %s jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + hb_interval: 500ms gateway { name: "remote" diff --git a/server/jwt_test.go b/server/jwt_test.go index 2a41b6edd16..65e5181a890 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3335,9 +3335,6 @@ func writeJWT(t *testing.T, dir string, pub string, jwt string) { } func TestJWTAccountNATSResolverFetch(t *testing.T) { - origEventsHBInterval := eventsHBInterval - eventsHBInterval = 50 * time.Millisecond // speed up eventing - defer func() { eventsHBInterval = origEventsHBInterval }() require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) { t.Helper() for _, srv := range srvs { @@ -3446,6 +3443,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { server_name: srv-A operator: %s system_account: %s + hb_interval: 50ms resolver: { type: full dir: '%s' @@ -3471,6 +3469,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { server_name: srv-B operator: %s system_account: %s + hb_interval: 50ms resolver: { type: full @@ -3495,6 +3494,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { server_name: srv-C operator: %s system_account: %s + hb_interval: 50ms resolver: { type: cache dir: '%s' diff --git a/server/monitor.go b/server/monitor.go index 64a8259d928..af63b7cea3d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1170,6 +1170,7 @@ type Varz struct { WSConnectURLs []string `json:"ws_connect_urls,omitempty"` MaxConn int `json:"max_connections"` MaxSubs int `json:"max_subscriptions,omitempty"` + HBInterval time.Duration `json:"hb_interval"` PingInterval time.Duration `json:"ping_interval"` MaxPingsOut int `json:"ping_max"` HTTPHost string `json:"http_host"` @@ -1639,6 +1640,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.TLSRequired = info.TLSRequired v.TLSVerify = info.TLSVerify v.MaxConn = opts.MaxConn + v.HBInterval = opts.HBInterval v.PingInterval = opts.PingInterval v.MaxPingsOut = opts.MaxPingsOut v.AuthTimeout = opts.AuthTimeout diff --git a/server/opts.go b/server/opts.go index 02c3489b4ec..ddd0d40b73c 100644 --- a/server/opts.go +++ b/server/opts.go @@ -278,6 +278,7 @@ type Options struct { AuthCallout *AuthCallout `json:"-"` PingInterval time.Duration `json:"ping_interval"` MaxPingsOut int `json:"ping_max"` + HBInterval time.Duration `json:"hb_interval"` HTTPHost string `json:"http_host"` HTTPPort int `json:"http_port"` HTTPBasePath string `json:"http_base_path"` @@ -1035,6 +1036,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } else { o.MaxSubTokens = uint8(n) } + case "hb_interval": + o.HBInterval = parseDuration("hb_interval", tk, v, errors, warnings) case "ping_interval": o.PingInterval = parseDuration("ping_interval", tk, v, errors, warnings) case "ping_max": @@ -4846,6 +4849,9 @@ func setBaselineOptions(opts *Options) { if opts.MaxConn == 0 { opts.MaxConn = DEFAULT_MAX_CONNECTIONS } + if opts.HBInterval == 0 { + opts.HBInterval = DEFAULT_HB_INTERVAL + } if opts.PingInterval == 0 { opts.PingInterval = DEFAULT_PING_INTERVAL } diff --git a/server/opts_test.go b/server/opts_test.go index c618b260fba..cd768b9ef96 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -53,6 +53,7 @@ func TestDefaultOptions(t *testing.T) { Port: DEFAULT_PORT, MaxConn: DEFAULT_MAX_CONNECTIONS, HTTPHost: DEFAULT_HOST, + HBInterval: DEFAULT_HB_INTERVAL, PingInterval: DEFAULT_PING_INTERVAL, MaxPingsOut: DEFAULT_PING_MAX_OUT, TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), @@ -113,6 +114,7 @@ func TestConfigFile(t *testing.T) { MaxConn: 100, MaxSubs: 1000, MaxPending: 10000000, + HBInterval: 30 * time.Second, PingInterval: 60 * time.Second, MaxPingsOut: 3, WriteDeadline: 3 * time.Second, @@ -272,6 +274,7 @@ func TestMergeOverrides(t *testing.T) { MaxConn: 100, MaxSubs: 1000, MaxPending: 10000000, + HBInterval: 30 * time.Second, PingInterval: 60 * time.Second, MaxPingsOut: 3, Cluster: ClusterOpts{ @@ -1251,6 +1254,7 @@ func TestOptionsClone(t *testing.T) { MaxControlLine: 2048, MaxPayload: 65536, MaxConn: 100, + HBInterval: 30 * time.Second, PingInterval: 60 * time.Second, MaxPingsOut: 3, Cluster: ClusterOpts{ diff --git a/server/reload.go b/server/reload.go index a098da7ad8f..58987fb2759 100644 --- a/server/reload.go +++ b/server/reload.go @@ -612,6 +612,19 @@ func (m *maxPayloadOption) Apply(server *Server) { server.Noticef("Reloaded: max_payload = %d", m.newValue) } +// hbIntervalOption implements the option interface for the `hb_interval` +// setting. +type hbIntervalOption struct { + noopOption + newValue time.Duration +} + +// Apply is a no-op because the heartbeat interval will be reloaded after +// options are applied. +func (p *hbIntervalOption) Apply(server *Server) { + server.Noticef("Reloaded: hb_interval = %s", p.newValue) +} + // pingIntervalOption implements the option interface for the `ping_interval` // setting. type pingIntervalOption struct { @@ -1267,6 +1280,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)}) case "maxpayload": diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)}) + case "hbinterval": + diffOpts = append(diffOpts, &hbIntervalOption{newValue: newValue.(time.Duration)}) case "pinginterval": diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)}) case "maxpingsout": diff --git a/server/reload_test.go b/server/reload_test.go index 9f0410c71b5..cb5b4d3924a 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -143,6 +143,7 @@ func TestConfigReloadUnsupported(t *testing.T) { MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, + HBInterval: 30 * time.Second, PingInterval: 2 * time.Minute, MaxPingsOut: 2, WriteDeadline: 10 * time.Second, @@ -215,6 +216,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) { MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, + HBInterval: 30 * time.Second, PingInterval: 2 * time.Minute, MaxPingsOut: 2, WriteDeadline: 10 * time.Second, @@ -278,6 +280,7 @@ func TestConfigReload(t *testing.T) { MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, + HBInterval: 30 * time.Second, PingInterval: 2 * time.Minute, MaxPingsOut: 2, WriteDeadline: 10 * time.Second, @@ -354,6 +357,9 @@ func TestConfigReload(t *testing.T) { if updated.MaxControlLine != 512 { t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine) } + if updated.HBInterval != 5*time.Second { + t.Fatalf("HBInterval is incorrect.\nexpected 5s\ngot: %s", updated.HBInterval) + } if updated.PingInterval != 5*time.Second { t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval) } diff --git a/server/server.go b/server/server.go index 8ca5a3c3c4a..3f4fe43f9cf 100644 --- a/server/server.go +++ b/server/server.go @@ -1647,9 +1647,9 @@ func (s *Server) setSystemAccount(acc *Account) error { recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), sq: s.newSendQ(), - statsz: eventsHBInterval, - orphMax: 5 * eventsHBInterval, - chkOrph: 3 * eventsHBInterval, + statsz: s.opts.HBInterval, + orphMax: 5 * s.opts.HBInterval, + chkOrph: 3 * s.opts.HBInterval, } s.sys.wg.Add(1) s.mu.Unlock()