Skip to content

Added logging specification tests #1740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ abstract class BaseCluster implements Cluster {
this.clusterListener = singleClusterListener(settings);
ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId);
clusterListener.clusterOpening(clusterOpeningEvent);
logTopologyOpening(clusterId, clusterOpeningEvent);
description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(),
settings, serverFactory.getSettings());
logTopologyMonitoringStarting(clusterId);
}

@Override
Expand Down Expand Up @@ -221,7 +221,7 @@ public void close() {
description);
ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
clusterListener.clusterClosed(clusterClosedEvent);
logTopologyClosedEvent(clusterId, clusterClosedEvent);
logTopologyMonitoringStopping(clusterId);
stopWaitQueueHandler();
}
}
Expand Down Expand Up @@ -632,9 +632,7 @@ static void logServerSelectionSucceeded(
}
}

static void logTopologyOpening(
final ClusterId clusterId,
final ClusterOpeningEvent clusterOpeningEvent) {
static void logTopologyMonitoringStarting(final ClusterId clusterId) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId,
Expand All @@ -659,9 +657,7 @@ static void logTopologyDescriptionChanged(
}
}

static void logTopologyClosedEvent(
final ClusterId clusterId,
final ClusterClosedEvent clusterClosedEvent) {
static void logTopologyMonitoringStopping(final ClusterId clusterId) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
Expand All @@ -48,6 +49,7 @@

class DefaultServer implements ClusterableServer {
private static final Logger LOGGER = Loggers.getLogger("connection");
private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("connection");
private final ServerId serverId;
private final ConnectionPool connectionPool;
private final ClusterConnectionMode clusterConnectionMode;
Expand Down Expand Up @@ -75,7 +77,6 @@ class DefaultServer implements ClusterableServer {
this.connectionPool = notNull("connectionPool", connectionPool);

this.serverId = serverId;

serverListener.serverOpening(new ServerOpeningEvent(this.serverId));

this.serverMonitor = serverMonitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.ServerApi;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.connection.ServerId;
import com.mongodb.connection.ServerSettings;
Expand All @@ -34,6 +35,8 @@
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.inject.Provider;
import com.mongodb.internal.logging.LogMessage;
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
Expand Down Expand Up @@ -63,14 +66,27 @@
import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription;
import static com.mongodb.internal.connection.ServerDescriptionHelper.unknownConnectingServerDescription;
import static com.mongodb.internal.event.EventListenerHelper.singleServerMonitorListener;
import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.AWAITED;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.DRIVER_CONNECTION_ID;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.DURATION_MS;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.REPLY;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_CONNECTION_ID;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

@ThreadSafe
class DefaultServerMonitor implements ServerMonitor {

private static final Logger LOGGER = Loggers.getLogger("cluster");
private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster");

private final ServerId serverId;
private final ServerMonitorListener serverMonitorListener;
Expand Down Expand Up @@ -116,6 +132,7 @@ class DefaultServerMonitor implements ServerMonitor {

@Override
public void start() {
logStartedServerMonitoring(serverId);
monitor.start();
}

Expand All @@ -137,6 +154,9 @@ public void connect() {
@SuppressWarnings("try")
public void close() {
withLock(lock, () -> {
if (!isClosed) {
logStoppedServerMonitoring(serverId);
}
isClosed = true;
//noinspection EmptyTryBlock
try (ServerMonitor ignoredAutoClosed = monitor;
Expand Down Expand Up @@ -226,6 +246,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
alreadyLoggedHeartBeatStarted = true;
currentCheckCancelled = false;
InternalConnection newConnection = internalConnectionFactory.create(serverId);
logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses);
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
newConnection.getDescription().getConnectionId(), shouldStreamResponses));
newConnection.open(operationContextFactory.create());
Expand All @@ -238,6 +259,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
if (!alreadyLoggedHeartBeatStarted) {
logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses);
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
connection.getDescription().getConnectionId(), shouldStreamResponses));
}
Expand Down Expand Up @@ -269,15 +291,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (!shouldStreamResponses) {
roundTripTimeSampler.addSample(elapsedTimeNanos);
}
logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult);
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
elapsedTimeNanos, shouldStreamResponses));

return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(),
roundTripTimeSampler.getMin());
} catch (Exception e) {
long elapsedTimeNanos = System.nanoTime() - start;
logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e);
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos,
shouldStreamResponses, e));
throw e;
}
Expand Down Expand Up @@ -515,4 +540,94 @@ private void waitForNext() throws InterruptedException {
private String getHandshakeCommandName(final ServerDescription serverDescription) {
return serverDescription.isHelloOk() ? HELLO : LEGACY_HELLO;
}

private static void logHeartbeatStarted(
final ServerId serverId,
final ConnectionDescription connectionDescription,
final boolean awaited) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Server heartbeat started", serverId.getClusterId(),
asList(
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()),
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()),
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()),
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()),
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()),
new LogMessage.Entry(AWAITED, awaited)),
"Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
+ "in topology with ID {}. Awaited: {}"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely minor, but the + is not indented here, but is below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit removed 🐛

}
}

private static void logHeartbeatSucceeded(
final ServerId serverId,
final ConnectionDescription connectionDescription,
final boolean awaited,
final long elapsedTimeNanos,
final BsonDocument reply) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Server heartbeat succeeded", serverId.getClusterId(),
asList(
new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)),
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()),
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()),
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()),
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()),
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()),
new LogMessage.Entry(AWAITED, awaited),
new LogMessage.Entry(REPLY, reply.toJson())),
"Heartbeat succeeded in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
+ "in topology with ID {}. Awaited: {}. Reply: {}"));
}
}

private static void logHeartbeatFailed(
final ServerId serverId,
final ConnectionDescription connectionDescription,
final boolean awaited,
final long elapsedTimeNanos,
final Exception failure) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Server heartbeat failed", serverId.getClusterId(),
asList(
new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)),
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()),
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()),
new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()),
new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()),
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()),
new LogMessage.Entry(AWAITED, awaited),
new LogMessage.Entry(FAILURE, failure.getMessage())),
"Heartbeat failed in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} "
+ "in topology with ID {}. Awaited: {}. Failure: {}"));
}
}


private static void logStartedServerMonitoring(final ServerId serverId) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Starting server monitoring", serverId.getClusterId(),
asList(
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()),
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()),
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())),
"Starting monitoring for server {}:{} in topology with ID {}"));
}
}

private static void logStoppedServerMonitoring(final ServerId serverId) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Stopped server monitoring", serverId.getClusterId(),
asList(
new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()),
new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()),
new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())),
"Stopped monitoring for server {}:{} in topology with ID {}"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded;
import static com.mongodb.internal.connection.BaseCluster.logTopologyClosedEvent;
import static com.mongodb.internal.connection.BaseCluster.logTopologyMonitoringStopping;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -275,7 +275,7 @@ public void close() {
}
ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
clusterListener.clusterClosed(clusterClosedEvent);
logTopologyClosedEvent(clusterId, clusterClosedEvent);
logTopologyMonitoringStopping(clusterId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public SingleServerCluster(final ClusterId clusterId, final ClusterSettings sett
// synchronized in the constructor because the change listener is re-entrant to this instance.
// In other words, we are leaking a reference to "this" from the constructor.
withLock(() -> {
server.set(createServer(settings.getHosts().get(0)));
publishDescription(ServerDescription.builder().state(CONNECTING).address(settings.getHosts().get(0))
Copy link
Member Author

@rozza rozza Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests expect the toplogy description changed to connecting & unknown to be logged before the starting the server monitor log message.

Creating the server first meant that they were the wrong way around.

.build());
server.set(createServer(settings.getHosts().get(0)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public enum Name {
* Not supported.
*/
OPERATION("operation"),
AWAITED("awaited"),
SERVICE_ID("serviceId"),
SERVER_CONNECTION_ID("serverConnectionId"),
DRIVER_CONNECTION_ID("driverConnectionId"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.mongodb.reactivestreams.client.unified;

import org.junit.jupiter.params.provider.Arguments;

import java.util.Collection;

final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedReactiveStreamsTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public String toString() {
+ new BsonDocument("messages", expectedMessages).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"
+ " actualMessages="
+ new BsonDocument("messages", new BsonArray(actualMessages.stream()
.map(LogMatcher::asDocument).collect(Collectors.toList())))
.map(LogMatcher::logMessageAsDocument).collect(Collectors.toList())))
.toJson(JsonWriterSettings.builder().indent(true).build()) + "\n";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public <T> void waitForServerMonitorEvents(final String client, final Class<T> e
BsonDocument expectedEventContents = getEventContents(expectedEvent);
try {
serverMonitorListener.waitForEvents(expectedEventType,
event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(10));
event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(30));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to 15 - I just wanted to ensure that the server monitor through runs at least once. I think it defaults to every 10 seconds.

context.pop();
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Loading