Skip to content

allow sending unchanged message in Session#send #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,12 @@ <H3>QuickFIX Settings</H3>
<TD>Y<BR>N</TD>
<TD>N</TD>
</TR>
<TR ALIGN="left" VALIGN="middle">
<TD><I>AllowPosDup</I></TD>
<TD>Whether to allow PossDupFlag and OrigSendingTime when sending messages. This is useful on occasions, primarily when a QFJ application is acting as purely a pass-through/monitoring hop.</TD>
<TD>Y<BR>N</TD>
<TD>N</TD>
</TR>
</tbody>
</TABLE>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
final boolean enableNextExpectedMsgSeqNum = getSetting(settings, sessionID, Session.SETTING_ENABLE_NEXT_EXPECTED_MSG_SEQ_NUM, false);
final boolean enableLastMsgSeqNumProcessed = getSetting(settings, sessionID, Session.SETTING_ENABLE_LAST_MSG_SEQ_NUM_PROCESSED, false);
final int resendRequestChunkSize = getSetting(settings, sessionID, Session.SETTING_RESEND_REQUEST_CHUNK_SIZE, Session.DEFAULT_RESEND_RANGE_CHUNK_SIZE);
final boolean allowPossDup = getSetting(settings, sessionID, Session.SETTING_ALLOW_POS_DUP_MESSAGES, false);

final int[] logonIntervals = getLogonIntervalsInSeconds(settings, sessionID);
final Set<InetAddress> allowedRemoteAddresses = getInetAddresses(settings, sessionID);
Expand All @@ -231,7 +232,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime,
forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage,
resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed,
validateChecksum, logonTags, heartBeatTimeoutMultiplier);
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup);

session.setLogonTimeout(logonTimeout);
session.setLogoutTimeout(logoutTimeout);
Expand Down
42 changes: 39 additions & 3 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ public class Session implements Closeable {

public static final String SETTING_VALIDATE_CHECKSUM = "ValidateChecksum";

/**
* Option so that the session does not remove PossDupFlag (43) and OrigSendingTime (122) information when sending.
*/
public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup";

private static final ConcurrentMap<SessionID, Session> sessions = new ConcurrentHashMap<>();

private final Application application;
Expand Down Expand Up @@ -423,6 +428,7 @@ public class Session implements Closeable {
private boolean enableNextExpectedMsgSeqNum = false;
private boolean enableLastMsgSeqNumProcessed = false;
private boolean validateChecksum = true;
private boolean allowPosDup = false;

private int maxScheduledWriteRequests = 0;

Expand Down Expand Up @@ -464,7 +470,7 @@ public class Session implements Closeable {
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
}

Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
Expand All @@ -482,7 +488,8 @@ public class Session implements Closeable {
boolean forceResendWhenCorruptedStore, Set<InetAddress> allowedRemoteAddresses,
boolean validateIncomingMessage, int resendRequestChunkSize,
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier) {
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
boolean allowPossDup) {
this.application = application;
this.sessionID = sessionID;
this.sessionSchedule = sessionSchedule;
Expand Down Expand Up @@ -517,6 +524,7 @@ public class Session implements Closeable {
this.enableLastMsgSeqNumProcessed = enableLastMsgSeqNumProcessed;
this.validateChecksum = validateChecksum;
this.logonTags = logonTags;
this.allowPosDup = allowPossDup;

final Log engineLog = (logFactory != null) ? logFactory.create(sessionID) : null;
if (engineLog instanceof SessionStateListener) {
Expand Down Expand Up @@ -2676,7 +2684,7 @@ private void resetState() {
* information already is present).
*
* The returned status flag is included for
* compatibility with the JNI API but it's usefulness is questionable.
* compatibility with the JNI API but its usefulness is questionable.
* In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean
* only indicates the message was successfully queued for transmission. An error could still
* occur before the message data is actually sent.
Expand All @@ -2685,6 +2693,30 @@ private void resetState() {
* @return a status flag indicating whether the write to the network layer was successful.
*/
public boolean send(Message message) {
return send(message, this.allowPosDup);
}

/**
* Send a message to a counterparty. Sequence numbers and information about the sender
* and target identification will be added automatically (or overwritten if that
* information already is present).
*
* The returned status flag is included for
* compatibility with the JNI API but its usefulness is questionable.
* In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean
* only indicates the message was successfully queued for transmission. An error could still
* occur before the message data is actually sent.
*
* @param message the message to send
* @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message
* @return a status flag indicating whether the write to the network layer was successful.
*/
public boolean send(Message message, boolean allowPosDup) {
// Send message as is if allowPosDup flag is set
if (allowPosDup) {
return sendRaw(message, 0);
}

message.getHeader().removeField(PossDupFlag.FIELD);
message.getHeader().removeField(OrigSendingTime.FIELD);
return sendRaw(message, 0);
Expand Down Expand Up @@ -2998,6 +3030,10 @@ public boolean isAllowedForSession(InetAddress remoteInetAddress) {
|| allowedRemoteAddresses.contains(remoteInetAddress);
}

public void setAllowPosDup(boolean allowPosDup) {
this.allowPosDup = allowPosDup;
}

/**
* Closes session resources and unregisters session. This is for internal
* use and should typically not be called by an user application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static final class Builder {
private boolean enableNextExpectedMsgSeqNum = false;
private final boolean enableLastMsgSeqNumProcessed = false;
private final boolean validateChecksum = true;
private final boolean allowPosDup = false;
private List<StringField> logonTags = new ArrayList<>();

public Session build() {
Expand All @@ -123,7 +124,7 @@ public Session build() {
resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage,
rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore,
allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum,
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier);
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup);
}

public Builder setBeginString(final String beginString) {
Expand Down
84 changes: 75 additions & 9 deletions quickfixj-core/src/test/java/quickfix/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testDisposalOfFileResources() throws Exception {
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
false, false, false, false, true, false, 1.5, null, true,
new int[] { 5 }, false, false, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
// Simulate socket disconnect
session.setResponder(null);
}
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testNondisposableFileResources() throws Exception {
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
false, false, false, false, true, false, 1.5, null, true,
new int[] { 5 }, false, false, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
// Simulate socket disconnect
session.setResponder(null);

Expand Down Expand Up @@ -2105,7 +2105,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize)
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, false, false, false, true, false, true, false, null, true,
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2167,7 +2167,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);

Responder mockResponder = mock(Responder.class);
when(mockResponder.send(anyString())).thenReturn(true);
Expand Down Expand Up @@ -2215,7 +2215,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);

Responder mockResponder = mock(Responder.class);
when(mockResponder.send(anyString())).thenReturn(true);
Expand Down Expand Up @@ -2264,7 +2264,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception {
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, disconnectOnError, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2300,7 +2300,7 @@ public void testTimestampPrecision() throws Exception {
UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, disconnectOnError, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2352,7 +2352,7 @@ private void testLargeQueue(int N) throws Exception {
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2468,7 +2468,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);

Expand Down Expand Up @@ -2982,4 +2982,70 @@ public void disconnect() {
}
}

@Test
public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime() throws Exception {
final UnitTestApplication application = new UnitTestApplication();
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
logonTo(session);

session.send(createPossDupAppMessage(1), false);

final Message sentMessage = new Message(responder.sentMessageData);

assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
}

@Test
public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception {
final UnitTestApplication application = new UnitTestApplication();
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
session.setAllowPosDup(true);
logonTo(session);
session.send(createPossDupAppMessage(1), false);

final Message sentMessage = new Message(responder.sentMessageData);

assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
}

@Test
public void testSendWithAllowPosDupAsTrue_ShouldKeepPossDupFlagAndOrigSendingTime() throws Exception {
final UnitTestApplication application = new UnitTestApplication();
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
logonTo(session);
session.send(createPossDupAppMessage(1), true);

final Message sentMessage = new Message(responder.sentMessageData);

assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
}

@Test
public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception {
final UnitTestApplication application = new UnitTestApplication();
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
session.setAllowPosDup(true);
logonTo(session);
session.send(createPossDupAppMessage(1));

final Message sentMessage = new Message(responder.sentMessageData);

assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
}
}