Skip to content

Make heartbeat interval configurable. #4352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/configs/reload/reload.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ logtime_utc: true # enable on reload
log_file: "nats-server.log" # change on reload
pid_file: "nats-server.pid" # change on reload
max_control_line: 512 # change on reload
hb_interval: 5 # change on reload
ping_interval: 5 # change on reload
ping_max: 1 # change on reload
write_deadline: "3s" # change on reload
Expand Down
3 changes: 3 additions & 0 deletions server/configs/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ max_control_line: 2048
# maximum payload
max_payload: 65536

# heartbeat interval
hb_interval: "30s"

# ping interval and no pong threshold
ping_interval: "60s"
ping_max: 3
Expand Down
3 changes: 3 additions & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (
// AUTH_TIMEOUT is the authorization wait time.
AUTH_TIMEOUT = 2 * time.Second

// DEFAULT_HB_INTERVAL is how often miscellaneous HeartBeat operations occur.
DEFAULT_HB_INTERVAL = 30 * time.Second

// DEFAULT_PING_INTERVAL is how often pings are sent to clients, etc...
DEFAULT_PING_INTERVAL = 2 * time.Minute

Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2120,7 +2120,7 @@ func (o *consumer) checkAndSetPendingRequestsOk() {
if !versionAtLeast(si.(nodeInfo).version, 2, 7, 1) {
// We expect all of our peers to eventually be up to date.
// So check again in awhile.
time.AfterFunc(eventsHBInterval, func() { o.checkAndSetPendingRequestsOk() })
time.AfterFunc(s.opts.HBInterval, func() { o.checkAndSetPendingRequestsOk() })
o.setPendingRequestsOk(false)
return
}
Expand Down
7 changes: 2 additions & 5 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ const (
ocspPeerChainlinkInvalidEventSubj = "$SYS.SERVER.%s.OCSP.PEER.LINK.INVALID"
)

// FIXME(dlc) - make configurable.
var eventsHBInterval = 30 * time.Second

type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte)

// Used if we have to queue things internally to avoid the route/gw path.
Expand Down Expand Up @@ -2044,9 +2041,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
} else {
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.ctmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
a.ctmr = time.AfterFunc(s.opts.HBInterval, func() { s.accConnsUpdate(a) })
} else {
a.ctmr.Reset(eventsHBInterval)
a.ctmr.Reset(s.opts.HBInterval)
}
}
for _, sub := range subj {
Expand Down
20 changes: 10 additions & 10 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func runTrustedServer(t *testing.T) (*Server, *Options) {
return s, opts
}

func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
func startTrustedCluster(t *testing.T, optsA *Options) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()

kp, _ := nkeys.FromSeed(oSeed)
Expand All @@ -102,7 +102,6 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey

mr.Store(apub, jwt)

optsA := DefaultOptions()
optsA.Cluster.Name = "TEST CLUSTER 22"
optsA.Cluster.Host = "127.0.0.1"
optsA.TrustedKeys = []string{pub}
Expand All @@ -127,6 +126,11 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
return sa, optsA, sb, optsB, akp
}

func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
opts := DefaultOptions()
return startTrustedCluster(t, opts)
}

func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()

Expand Down Expand Up @@ -1776,13 +1780,10 @@ func TestSystemAccountNoAuthUser(t *testing.T) {
}

func TestServerAccountConns(t *testing.T) {
// speed up hb
orgHBInterval := eventsHBInterval
eventsHBInterval = time.Millisecond * 100
defer func() { eventsHBInterval = orgHBInterval }()
conf := createConfFile(t, []byte(`
host: 127.0.0.1
port: -1
hb_interval: 100ms # speed up hb
system_account: SYS
accounts: {
SYS: {users: [{user: s, password: s}]}
Expand Down Expand Up @@ -3095,11 +3096,10 @@ func (sr *slowAccResolver) Fetch(name string) (string, error) {
}

func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
origEventsHBInterval := eventsHBInterval
eventsHBInterval = 50 * time.Millisecond
defer func() { eventsHBInterval = origEventsHBInterval }()
opts := DefaultOptions()
opts.HBInterval = 50 * time.Millisecond
sa, _, sb, optsB, _ := startTrustedCluster(t, opts)

sa, _, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()

Expand Down
11 changes: 4 additions & 7 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6154,11 +6154,8 @@ func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T)
}

func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
orgEventsHBInterval := eventsHBInterval
eventsHBInterval = 500 * time.Millisecond
defer func() { eventsHBInterval = orgEventsHBInterval }()

c := createJetStreamClusterExplicit(t, "R3F", 3)
tmpl := jsClusterTempl + "\n\thb_interval: 500ms" // speed up hb
c := createJetStreamClusterWithTemplate(t, tmpl, "R3F", 3)
defer c.shutdown()

s := c.randomServer()
Expand Down Expand Up @@ -6191,9 +6188,9 @@ func TestJetStreamClusterNoOrphanedDueToNoConnection(t *testing.T) {
nc.Close()

s.mu.RLock()
val := (s.sys.orphMax / eventsHBInterval) + 2
val := (s.sys.orphMax / s.opts.HBInterval) + 2
s.mu.RUnlock()
time.Sleep(val * eventsHBInterval)
time.Sleep(val * s.opts.HBInterval)
checkSysServers()
}

Expand Down
6 changes: 2 additions & 4 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3837,14 +3837,11 @@ func TestJetStreamSuperClusterGWReplyRewrite(t *testing.T) {
}

func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
orgEventsHBInterval := eventsHBInterval
eventsHBInterval = 500 * time.Millisecond //time.Second
defer func() { eventsHBInterval = orgEventsHBInterval }()

tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
hb_interval: 500ms

gateway {
name: "local"
Expand Down Expand Up @@ -3883,6 +3880,7 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
hb_interval: 500ms

gateway {
name: "remote"
Expand Down
6 changes: 3 additions & 3 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3335,9 +3335,6 @@ func writeJWT(t *testing.T, dir string, pub string, jwt string) {
}

func TestJWTAccountNATSResolverFetch(t *testing.T) {
origEventsHBInterval := eventsHBInterval
eventsHBInterval = 50 * time.Millisecond // speed up eventing
defer func() { eventsHBInterval = origEventsHBInterval }()
require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) {
t.Helper()
for _, srv := range srvs {
Expand Down Expand Up @@ -3446,6 +3443,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
server_name: srv-A
operator: %s
system_account: %s
hb_interval: 50ms
resolver: {
type: full
dir: '%s'
Expand All @@ -3471,6 +3469,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
server_name: srv-B
operator: %s
system_account: %s
hb_interval: 50ms
resolver: {
type: full

Expand All @@ -3495,6 +3494,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) {
server_name: srv-C
operator: %s
system_account: %s
hb_interval: 50ms
resolver: {
type: cache
dir: '%s'
Expand Down
2 changes: 2 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ type Varz struct {
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
HBInterval time.Duration `json:"hb_interval"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
Expand Down Expand Up @@ -1639,6 +1640,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
v.TLSRequired = info.TLSRequired
v.TLSVerify = info.TLSVerify
v.MaxConn = opts.MaxConn
v.HBInterval = opts.HBInterval
v.PingInterval = opts.PingInterval
v.MaxPingsOut = opts.MaxPingsOut
v.AuthTimeout = opts.AuthTimeout
Expand Down
6 changes: 6 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type Options struct {
AuthCallout *AuthCallout `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HBInterval time.Duration `json:"hb_interval"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
Expand Down Expand Up @@ -1035,6 +1036,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
} else {
o.MaxSubTokens = uint8(n)
}
case "hb_interval":
o.HBInterval = parseDuration("hb_interval", tk, v, errors, warnings)
case "ping_interval":
o.PingInterval = parseDuration("ping_interval", tk, v, errors, warnings)
case "ping_max":
Expand Down Expand Up @@ -4846,6 +4849,9 @@ func setBaselineOptions(opts *Options) {
if opts.MaxConn == 0 {
opts.MaxConn = DEFAULT_MAX_CONNECTIONS
}
if opts.HBInterval == 0 {
opts.HBInterval = DEFAULT_HB_INTERVAL
}
if opts.PingInterval == 0 {
opts.PingInterval = DEFAULT_PING_INTERVAL
}
Expand Down
4 changes: 4 additions & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestDefaultOptions(t *testing.T) {
Port: DEFAULT_PORT,
MaxConn: DEFAULT_MAX_CONNECTIONS,
HTTPHost: DEFAULT_HOST,
HBInterval: DEFAULT_HB_INTERVAL,
PingInterval: DEFAULT_PING_INTERVAL,
MaxPingsOut: DEFAULT_PING_MAX_OUT,
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
Expand Down Expand Up @@ -113,6 +114,7 @@ func TestConfigFile(t *testing.T) {
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
HBInterval: 30 * time.Second,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
Expand Down Expand Up @@ -272,6 +274,7 @@ func TestMergeOverrides(t *testing.T) {
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
HBInterval: 30 * time.Second,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
Cluster: ClusterOpts{
Expand Down Expand Up @@ -1251,6 +1254,7 @@ func TestOptionsClone(t *testing.T) {
MaxControlLine: 2048,
MaxPayload: 65536,
MaxConn: 100,
HBInterval: 30 * time.Second,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
Cluster: ClusterOpts{
Expand Down
15 changes: 15 additions & 0 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,19 @@ func (m *maxPayloadOption) Apply(server *Server) {
server.Noticef("Reloaded: max_payload = %d", m.newValue)
}

// hbIntervalOption implements the option interface for the `hb_interval`
// setting.
type hbIntervalOption struct {
noopOption
newValue time.Duration
}

// Apply is a no-op because the heartbeat interval will be reloaded after
// options are applied.
func (p *hbIntervalOption) Apply(server *Server) {
server.Noticef("Reloaded: hb_interval = %s", p.newValue)
}

// pingIntervalOption implements the option interface for the `ping_interval`
// setting.
type pingIntervalOption struct {
Expand Down Expand Up @@ -1267,6 +1280,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)})
case "maxpayload":
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)})
case "hbinterval":
diffOpts = append(diffOpts, &hbIntervalOption{newValue: newValue.(time.Duration)})
case "pinginterval":
diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
case "maxpingsout":
Expand Down
6 changes: 6 additions & 0 deletions server/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
MaxControlLine: 4096,
MaxPayload: 1048576,
MaxConn: 65536,
HBInterval: 30 * time.Second,
PingInterval: 2 * time.Minute,
MaxPingsOut: 2,
WriteDeadline: 10 * time.Second,
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) {
MaxControlLine: 4096,
MaxPayload: 1048576,
MaxConn: 65536,
HBInterval: 30 * time.Second,
PingInterval: 2 * time.Minute,
MaxPingsOut: 2,
WriteDeadline: 10 * time.Second,
Expand Down Expand Up @@ -278,6 +280,7 @@ func TestConfigReload(t *testing.T) {
MaxControlLine: 4096,
MaxPayload: 1048576,
MaxConn: 65536,
HBInterval: 30 * time.Second,
PingInterval: 2 * time.Minute,
MaxPingsOut: 2,
WriteDeadline: 10 * time.Second,
Expand Down Expand Up @@ -354,6 +357,9 @@ func TestConfigReload(t *testing.T) {
if updated.MaxControlLine != 512 {
t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine)
}
if updated.HBInterval != 5*time.Second {
t.Fatalf("HBInterval is incorrect.\nexpected 5s\ngot: %s", updated.HBInterval)
}
if updated.PingInterval != 5*time.Second {
t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval)
}
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,9 +1647,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
recvq: newIPQueue[*inSysMsg](s, "System recvQ"),
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
statsz: s.opts.HBInterval,
orphMax: 5 * s.opts.HBInterval,
chkOrph: 3 * s.opts.HBInterval,
}
s.sys.wg.Add(1)
s.mu.Unlock()
Expand Down