-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Added logging specification tests #1740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
8e94d55
cde8e0b
0b57fd8
1513156
bc4e53c
17d20d1
c381586
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import com.mongodb.ServerApi; | ||
import com.mongodb.annotations.ThreadSafe; | ||
import com.mongodb.connection.ClusterConnectionMode; | ||
import com.mongodb.connection.ConnectionDescription; | ||
import com.mongodb.connection.ServerDescription; | ||
import com.mongodb.connection.ServerId; | ||
import com.mongodb.connection.ServerSettings; | ||
|
@@ -34,6 +35,8 @@ | |
import com.mongodb.internal.diagnostics.logging.Logger; | ||
import com.mongodb.internal.diagnostics.logging.Loggers; | ||
import com.mongodb.internal.inject.Provider; | ||
import com.mongodb.internal.logging.LogMessage; | ||
import com.mongodb.internal.logging.StructuredLogger; | ||
import com.mongodb.internal.validator.NoOpFieldNameValidator; | ||
import com.mongodb.lang.Nullable; | ||
import org.bson.BsonBoolean; | ||
|
@@ -63,14 +66,27 @@ | |
import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; | ||
import static com.mongodb.internal.connection.ServerDescriptionHelper.unknownConnectingServerDescription; | ||
import static com.mongodb.internal.event.EventListenerHelper.singleServerMonitorListener; | ||
import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.AWAITED; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.DRIVER_CONNECTION_ID; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.DURATION_MS; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.REPLY; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_CONNECTION_ID; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; | ||
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID; | ||
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; | ||
import static java.lang.String.format; | ||
import static java.util.Arrays.asList; | ||
import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
|
||
@ThreadSafe | ||
class DefaultServerMonitor implements ServerMonitor { | ||
|
||
private static final Logger LOGGER = Loggers.getLogger("cluster"); | ||
private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); | ||
|
||
private final ServerId serverId; | ||
private final ServerMonitorListener serverMonitorListener; | ||
|
@@ -116,6 +132,7 @@ class DefaultServerMonitor implements ServerMonitor { | |
|
||
@Override | ||
public void start() { | ||
logStartedServerMonitoring(serverId); | ||
monitor.start(); | ||
} | ||
|
||
|
@@ -137,6 +154,9 @@ public void connect() { | |
@SuppressWarnings("try") | ||
public void close() { | ||
withLock(lock, () -> { | ||
if (!isClosed) { | ||
logStoppedServerMonitoring(serverId); | ||
} | ||
isClosed = true; | ||
//noinspection EmptyTryBlock | ||
try (ServerMonitor ignoredAutoClosed = monitor; | ||
|
@@ -226,6 +246,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren | |
alreadyLoggedHeartBeatStarted = true; | ||
currentCheckCancelled = false; | ||
InternalConnection newConnection = internalConnectionFactory.create(serverId); | ||
logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); | ||
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( | ||
newConnection.getDescription().getConnectionId(), shouldStreamResponses)); | ||
newConnection.open(operationContextFactory.create()); | ||
|
@@ -238,6 +259,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren | |
LOGGER.debug(format("Checking status of %s", serverId.getAddress())); | ||
} | ||
if (!alreadyLoggedHeartBeatStarted) { | ||
logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); | ||
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( | ||
connection.getDescription().getConnectionId(), shouldStreamResponses)); | ||
} | ||
|
@@ -269,15 +291,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren | |
if (!shouldStreamResponses) { | ||
roundTripTimeSampler.addSample(elapsedTimeNanos); | ||
} | ||
logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult); | ||
serverMonitorListener.serverHeartbeatSucceeded( | ||
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, | ||
elapsedTimeNanos, shouldStreamResponses)); | ||
|
||
return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), | ||
roundTripTimeSampler.getMin()); | ||
} catch (Exception e) { | ||
long elapsedTimeNanos = System.nanoTime() - start; | ||
logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); | ||
serverMonitorListener.serverHeartbeatFailed( | ||
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start, | ||
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, | ||
shouldStreamResponses, e)); | ||
throw e; | ||
} | ||
|
@@ -515,4 +540,94 @@ private void waitForNext() throws InterruptedException { | |
private String getHandshakeCommandName(final ServerDescription serverDescription) { | ||
return serverDescription.isHelloOk() ? HELLO : LEGACY_HELLO; | ||
} | ||
|
||
private static void logHeartbeatStarted( | ||
final ServerId serverId, | ||
final ConnectionDescription connectionDescription, | ||
final boolean awaited) { | ||
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { | ||
STRUCTURED_LOGGER.log(new LogMessage( | ||
TOPOLOGY, DEBUG, "Server heartbeat started", serverId.getClusterId(), | ||
asList( | ||
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), | ||
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), | ||
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), | ||
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), | ||
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), | ||
new LogMessage.Entry(AWAITED, awaited)), | ||
"Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} " | ||
+ "in topology with ID {}. Awaited: {}")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is extremely minor, but the + is not indented here, but is below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit removed 🐛 |
||
} | ||
} | ||
|
||
private static void logHeartbeatSucceeded( | ||
final ServerId serverId, | ||
final ConnectionDescription connectionDescription, | ||
final boolean awaited, | ||
final long elapsedTimeNanos, | ||
final BsonDocument reply) { | ||
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { | ||
STRUCTURED_LOGGER.log(new LogMessage( | ||
TOPOLOGY, DEBUG, "Server heartbeat succeeded", serverId.getClusterId(), | ||
asList( | ||
new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), | ||
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), | ||
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), | ||
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), | ||
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), | ||
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), | ||
new LogMessage.Entry(AWAITED, awaited), | ||
new LogMessage.Entry(REPLY, reply.toJson())), | ||
"Heartbeat succeeded in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " | ||
+ "in topology with ID {}. Awaited: {}. Reply: {}")); | ||
} | ||
} | ||
|
||
private static void logHeartbeatFailed( | ||
final ServerId serverId, | ||
final ConnectionDescription connectionDescription, | ||
final boolean awaited, | ||
final long elapsedTimeNanos, | ||
final Exception failure) { | ||
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { | ||
STRUCTURED_LOGGER.log(new LogMessage( | ||
TOPOLOGY, DEBUG, "Server heartbeat failed", serverId.getClusterId(), | ||
asList( | ||
new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), | ||
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), | ||
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), | ||
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), | ||
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), | ||
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), | ||
new LogMessage.Entry(AWAITED, awaited), | ||
new LogMessage.Entry(FAILURE, failure.getMessage())), | ||
"Heartbeat failed in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " | ||
+ "in topology with ID {}. Awaited: {}. Failure: {}")); | ||
} | ||
} | ||
|
||
|
||
private static void logStartedServerMonitoring(final ServerId serverId) { | ||
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { | ||
STRUCTURED_LOGGER.log(new LogMessage( | ||
TOPOLOGY, DEBUG, "Starting server monitoring", serverId.getClusterId(), | ||
asList( | ||
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), | ||
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), | ||
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), | ||
"Starting monitoring for server {}:{} in topology with ID {}")); | ||
} | ||
} | ||
|
||
private static void logStoppedServerMonitoring(final ServerId serverId) { | ||
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { | ||
STRUCTURED_LOGGER.log(new LogMessage( | ||
TOPOLOGY, DEBUG, "Stopped server monitoring", serverId.getClusterId(), | ||
asList( | ||
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), | ||
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), | ||
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), | ||
"Stopped monitoring for server {}:{} in topology with ID {}")); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,9 +58,9 @@ public SingleServerCluster(final ClusterId clusterId, final ClusterSettings sett | |
// synchronized in the constructor because the change listener is re-entrant to this instance. | ||
// In other words, we are leaking a reference to "this" from the constructor. | ||
withLock(() -> { | ||
server.set(createServer(settings.getHosts().get(0))); | ||
publishDescription(ServerDescription.builder().state(CONNECTING).address(settings.getHosts().get(0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tests expect the toplogy description changed to connecting & unknown to be logged before the starting the server monitor log message. Creating the server first meant that they were the wrong way around. |
||
.build()); | ||
server.set(createServer(settings.getHosts().get(0))); | ||
}); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -336,7 +336,7 @@ public <T> void waitForServerMonitorEvents(final String client, final Class<T> e | |
BsonDocument expectedEventContents = getEventContents(expectedEvent); | ||
try { | ||
serverMonitorListener.waitForEvents(expectedEventType, | ||
event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(10)); | ||
event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(30)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to 15 - I just wanted to ensure that the server monitor through runs at least once. I think it defaults to every 10 seconds. |
||
context.pop(); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
|
Uh oh!
There was an error while loading. Please reload this page.