22
22
import com .mongodb .ServerApi ;
23
23
import com .mongodb .annotations .ThreadSafe ;
24
24
import com .mongodb .connection .ClusterConnectionMode ;
25
+ import com .mongodb .connection .ConnectionDescription ;
25
26
import com .mongodb .connection .ServerDescription ;
26
27
import com .mongodb .connection .ServerId ;
27
28
import com .mongodb .connection .ServerSettings ;
34
35
import com .mongodb .internal .diagnostics .logging .Logger ;
35
36
import com .mongodb .internal .diagnostics .logging .Loggers ;
36
37
import com .mongodb .internal .inject .Provider ;
38
+ import com .mongodb .internal .logging .LogMessage ;
39
+ import com .mongodb .internal .logging .StructuredLogger ;
37
40
import com .mongodb .internal .validator .NoOpFieldNameValidator ;
38
41
import com .mongodb .lang .Nullable ;
39
42
import org .bson .BsonBoolean ;
63
66
import static com .mongodb .internal .connection .DescriptionHelper .createServerDescription ;
64
67
import static com .mongodb .internal .connection .ServerDescriptionHelper .unknownConnectingServerDescription ;
65
68
import static com .mongodb .internal .event .EventListenerHelper .singleServerMonitorListener ;
69
+ import static com .mongodb .internal .logging .LogMessage .Component .TOPOLOGY ;
70
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .AWAITED ;
71
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .DRIVER_CONNECTION_ID ;
72
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .DURATION_MS ;
73
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .FAILURE ;
74
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .REPLY ;
75
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .SERVER_CONNECTION_ID ;
76
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .SERVER_HOST ;
77
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .SERVER_PORT ;
78
+ import static com .mongodb .internal .logging .LogMessage .Entry .Name .TOPOLOGY_ID ;
79
+ import static com .mongodb .internal .logging .LogMessage .Level .DEBUG ;
66
80
import static java .lang .String .format ;
81
+ import static java .util .Arrays .asList ;
67
82
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
68
83
import static java .util .concurrent .TimeUnit .NANOSECONDS ;
69
84
70
85
@ ThreadSafe
71
86
class DefaultServerMonitor implements ServerMonitor {
72
87
73
88
private static final Logger LOGGER = Loggers .getLogger ("cluster" );
89
+ private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger ("cluster" );
74
90
75
91
private final ServerId serverId ;
76
92
private final ServerMonitorListener serverMonitorListener ;
@@ -116,6 +132,7 @@ class DefaultServerMonitor implements ServerMonitor {
116
132
117
133
@ Override
118
134
public void start () {
135
+ logStartedServerMonitoring (serverId );
119
136
monitor .start ();
120
137
}
121
138
@@ -137,6 +154,9 @@ public void connect() {
137
154
@ SuppressWarnings ("try" )
138
155
public void close () {
139
156
withLock (lock , () -> {
157
+ if (!isClosed ) {
158
+ logStoppedServerMonitoring (serverId );
159
+ }
140
160
isClosed = true ;
141
161
//noinspection EmptyTryBlock
142
162
try (ServerMonitor ignoredAutoClosed = monitor ;
@@ -226,6 +246,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
226
246
alreadyLoggedHeartBeatStarted = true ;
227
247
currentCheckCancelled = false ;
228
248
InternalConnection newConnection = internalConnectionFactory .create (serverId );
249
+ logHeartbeatStarted (serverId , newConnection .getDescription (), shouldStreamResponses );
229
250
serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
230
251
newConnection .getDescription ().getConnectionId (), shouldStreamResponses ));
231
252
newConnection .open (operationContextFactory .create ());
@@ -238,6 +259,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
238
259
LOGGER .debug (format ("Checking status of %s" , serverId .getAddress ()));
239
260
}
240
261
if (!alreadyLoggedHeartBeatStarted ) {
262
+ logHeartbeatStarted (serverId , connection .getDescription (), shouldStreamResponses );
241
263
serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
242
264
connection .getDescription ().getConnectionId (), shouldStreamResponses ));
243
265
}
@@ -269,15 +291,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
269
291
if (!shouldStreamResponses ) {
270
292
roundTripTimeSampler .addSample (elapsedTimeNanos );
271
293
}
294
+ logHeartbeatSucceeded (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , helloResult );
272
295
serverMonitorListener .serverHeartbeatSucceeded (
273
296
new ServerHeartbeatSucceededEvent (connection .getDescription ().getConnectionId (), helloResult ,
274
297
elapsedTimeNanos , shouldStreamResponses ));
275
298
276
299
return createServerDescription (serverId .getAddress (), helloResult , roundTripTimeSampler .getAverage (),
277
300
roundTripTimeSampler .getMin ());
278
301
} catch (Exception e ) {
302
+ long elapsedTimeNanos = System .nanoTime () - start ;
303
+ logHeartbeatFailed (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , e );
279
304
serverMonitorListener .serverHeartbeatFailed (
280
- new ServerHeartbeatFailedEvent (connection .getDescription ().getConnectionId (), System . nanoTime () - start ,
305
+ new ServerHeartbeatFailedEvent (connection .getDescription ().getConnectionId (), elapsedTimeNanos ,
281
306
shouldStreamResponses , e ));
282
307
throw e ;
283
308
}
@@ -515,4 +540,94 @@ private void waitForNext() throws InterruptedException {
515
540
private String getHandshakeCommandName (final ServerDescription serverDescription ) {
516
541
return serverDescription .isHelloOk () ? HELLO : LEGACY_HELLO ;
517
542
}
543
+
544
+ private static void logHeartbeatStarted (
545
+ final ServerId serverId ,
546
+ final ConnectionDescription connectionDescription ,
547
+ final boolean awaited ) {
548
+ if (STRUCTURED_LOGGER .isRequired (DEBUG , serverId .getClusterId ())) {
549
+ STRUCTURED_LOGGER .log (new LogMessage (
550
+ TOPOLOGY , DEBUG , "Server heartbeat started" , serverId .getClusterId (),
551
+ asList (
552
+ new LogMessage .Entry (SERVER_HOST , serverId .getAddress ().getHost ()),
553
+ new LogMessage .Entry (SERVER_PORT , serverId .getAddress ().getPort ()),
554
+ new LogMessage .Entry (DRIVER_CONNECTION_ID , connectionDescription .getConnectionId ().getLocalValue ()),
555
+ new LogMessage .Entry (SERVER_CONNECTION_ID , connectionDescription .getConnectionId ().getServerValue ()),
556
+ new LogMessage .Entry (TOPOLOGY_ID , serverId .getClusterId ()),
557
+ new LogMessage .Entry (AWAITED , awaited )),
558
+ "Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
559
+ + "in topology with ID {}. Awaited: {}" ));
560
+ }
561
+ }
562
+
563
+ private static void logHeartbeatSucceeded (
564
+ final ServerId serverId ,
565
+ final ConnectionDescription connectionDescription ,
566
+ final boolean awaited ,
567
+ final long elapsedTimeNanos ,
568
+ final BsonDocument reply ) {
569
+ if (STRUCTURED_LOGGER .isRequired (DEBUG , serverId .getClusterId ())) {
570
+ STRUCTURED_LOGGER .log (new LogMessage (
571
+ TOPOLOGY , DEBUG , "Server heartbeat succeeded" , serverId .getClusterId (),
572
+ asList (
573
+ new LogMessage .Entry (DURATION_MS , MILLISECONDS .convert (elapsedTimeNanos , NANOSECONDS )),
574
+ new LogMessage .Entry (SERVER_HOST , serverId .getAddress ().getHost ()),
575
+ new LogMessage .Entry (SERVER_PORT , serverId .getAddress ().getPort ()),
576
+ new LogMessage .Entry (DRIVER_CONNECTION_ID , connectionDescription .getConnectionId ().getLocalValue ()),
577
+ new LogMessage .Entry (SERVER_CONNECTION_ID , connectionDescription .getConnectionId ().getServerValue ()),
578
+ new LogMessage .Entry (TOPOLOGY_ID , serverId .getClusterId ()),
579
+ new LogMessage .Entry (AWAITED , awaited ),
580
+ new LogMessage .Entry (REPLY , reply .toJson ())),
581
+ "Heartbeat succeeded in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
582
+ + "in topology with ID {}. Awaited: {}. Reply: {}" ));
583
+ }
584
+ }
585
+
586
+ private static void logHeartbeatFailed (
587
+ final ServerId serverId ,
588
+ final ConnectionDescription connectionDescription ,
589
+ final boolean awaited ,
590
+ final long elapsedTimeNanos ,
591
+ final Exception failure ) {
592
+ if (STRUCTURED_LOGGER .isRequired (DEBUG , serverId .getClusterId ())) {
593
+ STRUCTURED_LOGGER .log (new LogMessage (
594
+ TOPOLOGY , DEBUG , "Server heartbeat failed" , serverId .getClusterId (),
595
+ asList (
596
+ new LogMessage .Entry (DURATION_MS , MILLISECONDS .convert (elapsedTimeNanos , NANOSECONDS )),
597
+ new LogMessage .Entry (SERVER_HOST , serverId .getAddress ().getHost ()),
598
+ new LogMessage .Entry (SERVER_PORT , serverId .getAddress ().getPort ()),
599
+ new LogMessage .Entry (DRIVER_CONNECTION_ID , connectionDescription .getConnectionId ().getLocalValue ()),
600
+ new LogMessage .Entry (SERVER_CONNECTION_ID , connectionDescription .getConnectionId ().getServerValue ()),
601
+ new LogMessage .Entry (TOPOLOGY_ID , serverId .getClusterId ()),
602
+ new LogMessage .Entry (AWAITED , awaited ),
603
+ new LogMessage .Entry (FAILURE , failure .getMessage ())),
604
+ "Heartbeat failed in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
605
+ + "in topology with ID {}. Awaited: {}. Failure: {}" ));
606
+ }
607
+ }
608
+
609
+
610
+ private static void logStartedServerMonitoring (final ServerId serverId ) {
611
+ if (STRUCTURED_LOGGER .isRequired (DEBUG , serverId .getClusterId ())) {
612
+ STRUCTURED_LOGGER .log (new LogMessage (
613
+ TOPOLOGY , DEBUG , "Starting server monitoring" , serverId .getClusterId (),
614
+ asList (
615
+ new LogMessage .Entry (SERVER_HOST , serverId .getAddress ().getHost ()),
616
+ new LogMessage .Entry (SERVER_PORT , serverId .getAddress ().getPort ()),
617
+ new LogMessage .Entry (TOPOLOGY_ID , serverId .getClusterId ())),
618
+ "Starting monitoring for server {}:{} in topology with ID {}" ));
619
+ }
620
+ }
621
+
622
+ private static void logStoppedServerMonitoring (final ServerId serverId ) {
623
+ if (STRUCTURED_LOGGER .isRequired (DEBUG , serverId .getClusterId ())) {
624
+ STRUCTURED_LOGGER .log (new LogMessage (
625
+ TOPOLOGY , DEBUG , "Stopped server monitoring" , serverId .getClusterId (),
626
+ asList (
627
+ new LogMessage .Entry (SERVER_HOST , serverId .getAddress ().getHost ()),
628
+ new LogMessage .Entry (SERVER_PORT , serverId .getAddress ().getPort ()),
629
+ new LogMessage .Entry (TOPOLOGY_ID , serverId .getClusterId ())),
630
+ "Stopped monitoring for server {}:{} in topology with ID {}" ));
631
+ }
632
+ }
518
633
}
0 commit comments