Skip to content

Commit 8eba4b3

Browse files
authored
allow sending unchanged message in Session#send (#395)
* allowPosDup in `Session#send` * Added AllowPosDup as configuration property
1 parent 7b873f7 commit 8eba4b3

File tree

5 files changed

+124
-14
lines changed

5 files changed

+124
-14
lines changed

quickfixj-core/src/main/doc/usermanual/usage/configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,6 +1309,12 @@ <H3>QuickFIX Settings</H3>
13091309
<TD>Y<BR>N</TD>
13101310
<TD>N</TD>
13111311
</TR>
1312+
<TR ALIGN="left" VALIGN="middle">
1313+
<TD><I>AllowPosDup</I></TD>
1314+
<TD>Whether to allow PossDupFlag and OrigSendingTime when sending messages. This is useful on occasions, primarily when a QFJ application is acting as purely a pass-through/monitoring hop.</TD>
1315+
<TD>Y<BR>N</TD>
1316+
<TD>N</TD>
1317+
</TR>
13121318
</tbody>
13131319
</TABLE>
13141320

quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
213213
final boolean enableNextExpectedMsgSeqNum = getSetting(settings, sessionID, Session.SETTING_ENABLE_NEXT_EXPECTED_MSG_SEQ_NUM, false);
214214
final boolean enableLastMsgSeqNumProcessed = getSetting(settings, sessionID, Session.SETTING_ENABLE_LAST_MSG_SEQ_NUM_PROCESSED, false);
215215
final int resendRequestChunkSize = getSetting(settings, sessionID, Session.SETTING_RESEND_REQUEST_CHUNK_SIZE, Session.DEFAULT_RESEND_RANGE_CHUNK_SIZE);
216+
final boolean allowPossDup = getSetting(settings, sessionID, Session.SETTING_ALLOW_POS_DUP_MESSAGES, false);
216217

217218
final int[] logonIntervals = getLogonIntervalsInSeconds(settings, sessionID);
218219
final Set<InetAddress> allowedRemoteAddresses = getInetAddresses(settings, sessionID);
@@ -231,7 +232,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
231232
rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime,
232233
forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage,
233234
resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed,
234-
validateChecksum, logonTags, heartBeatTimeoutMultiplier);
235+
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup);
235236

236237
session.setLogonTimeout(logonTimeout);
237238
session.setLogoutTimeout(logoutTimeout);

quickfixj-core/src/main/java/quickfix/Session.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,11 @@ public class Session implements Closeable {
373373

374374
public static final String SETTING_VALIDATE_CHECKSUM = "ValidateChecksum";
375375

376+
/**
377+
* Option so that the session does not remove PossDupFlag (43) and OrigSendingTime (122) information when sending.
378+
*/
379+
public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup";
380+
376381
private static final ConcurrentMap<SessionID, Session> sessions = new ConcurrentHashMap<>();
377382

378383
private final Application application;
@@ -423,6 +428,7 @@ public class Session implements Closeable {
423428
private boolean enableNextExpectedMsgSeqNum = false;
424429
private boolean enableLastMsgSeqNumProcessed = false;
425430
private boolean validateChecksum = true;
431+
private boolean allowPosDup = false;
426432

427433
private int maxScheduledWriteRequests = 0;
428434

@@ -464,7 +470,7 @@ public class Session implements Closeable {
464470
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
465471
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
466472
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
467-
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
473+
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
468474
}
469475

470476
Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
@@ -482,7 +488,8 @@ public class Session implements Closeable {
482488
boolean forceResendWhenCorruptedStore, Set<InetAddress> allowedRemoteAddresses,
483489
boolean validateIncomingMessage, int resendRequestChunkSize,
484490
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
485-
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier) {
491+
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
492+
boolean allowPossDup) {
486493
this.application = application;
487494
this.sessionID = sessionID;
488495
this.sessionSchedule = sessionSchedule;
@@ -517,6 +524,7 @@ public class Session implements Closeable {
517524
this.enableLastMsgSeqNumProcessed = enableLastMsgSeqNumProcessed;
518525
this.validateChecksum = validateChecksum;
519526
this.logonTags = logonTags;
527+
this.allowPosDup = allowPossDup;
520528

521529
final Log engineLog = (logFactory != null) ? logFactory.create(sessionID) : null;
522530
if (engineLog instanceof SessionStateListener) {
@@ -2676,7 +2684,7 @@ private void resetState() {
26762684
* information already is present).
26772685
*
26782686
* The returned status flag is included for
2679-
* compatibility with the JNI API but it's usefulness is questionable.
2687+
* compatibility with the JNI API but its usefulness is questionable.
26802688
* In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean
26812689
* only indicates the message was successfully queued for transmission. An error could still
26822690
* occur before the message data is actually sent.
@@ -2685,6 +2693,30 @@ private void resetState() {
26852693
* @return a status flag indicating whether the write to the network layer was successful.
26862694
*/
26872695
public boolean send(Message message) {
2696+
return send(message, this.allowPosDup);
2697+
}
2698+
2699+
/**
2700+
* Send a message to a counterparty. Sequence numbers and information about the sender
2701+
* and target identification will be added automatically (or overwritten if that
2702+
* information already is present).
2703+
*
2704+
* The returned status flag is included for
2705+
* compatibility with the JNI API but its usefulness is questionable.
2706+
* In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean
2707+
* only indicates the message was successfully queued for transmission. An error could still
2708+
* occur before the message data is actually sent.
2709+
*
2710+
* @param message the message to send
2711+
* @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message
2712+
* @return a status flag indicating whether the write to the network layer was successful.
2713+
*/
2714+
public boolean send(Message message, boolean allowPosDup) {
2715+
// Send message as is if allowPosDup flag is set
2716+
if (allowPosDup) {
2717+
return sendRaw(message, 0);
2718+
}
2719+
26882720
message.getHeader().removeField(PossDupFlag.FIELD);
26892721
message.getHeader().removeField(OrigSendingTime.FIELD);
26902722
return sendRaw(message, 0);
@@ -2998,6 +3030,10 @@ public boolean isAllowedForSession(InetAddress remoteInetAddress) {
29983030
|| allowedRemoteAddresses.contains(remoteInetAddress);
29993031
}
30003032

3033+
public void setAllowPosDup(boolean allowPosDup) {
3034+
this.allowPosDup = allowPosDup;
3035+
}
3036+
30013037
/**
30023038
* Closes session resources and unregisters session. This is for internal
30033039
* use and should typically not be called by an user application.

quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public static final class Builder {
111111
private boolean enableNextExpectedMsgSeqNum = false;
112112
private final boolean enableLastMsgSeqNumProcessed = false;
113113
private final boolean validateChecksum = true;
114+
private final boolean allowPosDup = false;
114115
private List<StringField> logonTags = new ArrayList<>();
115116

116117
public Session build() {
@@ -123,7 +124,7 @@ public Session build() {
123124
resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage,
124125
rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore,
125126
allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum,
126-
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier);
127+
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup);
127128
}
128129

129130
public Builder setBeginString(final String beginString) {

quickfixj-core/src/test/java/quickfix/SessionTest.java

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testDisposalOfFileResources() throws Exception {
103103
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
104104
false, false, false, false, true, false, 1.5, null, true,
105105
new int[] { 5 }, false, false, false, false, true, false, true, false,
106-
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
106+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
107107
// Simulate socket disconnect
108108
session.setResponder(null);
109109
}
@@ -144,7 +144,7 @@ public void testNondisposableFileResources() throws Exception {
144144
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
145145
false, false, false, false, true, false, 1.5, null, true,
146146
new int[] { 5 }, false, false, false, false, true, false, true, false,
147-
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
147+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
148148
// Simulate socket disconnect
149149
session.setResponder(null);
150150

@@ -2105,7 +2105,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize)
21052105
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
21062106
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
21072107
false, false, false, false, true, false, true, false, null, true,
2108-
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
2108+
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
21092109

21102110
UnitTestResponder responder = new UnitTestResponder();
21112111
session.setResponder(responder);
@@ -2167,7 +2167,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
21672167
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
21682168
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
21692169
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2170-
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
2170+
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
21712171

21722172
Responder mockResponder = mock(Responder.class);
21732173
when(mockResponder.send(anyString())).thenReturn(true);
@@ -2215,7 +2215,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
22152215
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
22162216
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
22172217
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2218-
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
2218+
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
22192219

22202220
Responder mockResponder = mock(Responder.class);
22212221
when(mockResponder.send(anyString())).thenReturn(true);
@@ -2264,7 +2264,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception {
22642264
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
22652265
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
22662266
false, disconnectOnError, false, false, true, false, true, false,
2267-
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
2267+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
22682268

22692269
UnitTestResponder responder = new UnitTestResponder();
22702270
session.setResponder(responder);
@@ -2300,7 +2300,7 @@ public void testTimestampPrecision() throws Exception {
23002300
UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true,
23012301
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
23022302
false, disconnectOnError, false, false, true, false, true, false,
2303-
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
2303+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
23042304

23052305
UnitTestResponder responder = new UnitTestResponder();
23062306
session.setResponder(responder);
@@ -2352,7 +2352,7 @@ private void testLargeQueue(int N) throws Exception {
23522352
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
23532353
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
23542354
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2355-
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
2355+
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
23562356

23572357
UnitTestResponder responder = new UnitTestResponder();
23582358
session.setResponder(responder);
@@ -2468,7 +2468,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound
24682468
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
24692469
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
24702470
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2471-
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
2471+
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
24722472
UnitTestResponder responder = new UnitTestResponder();
24732473
session.setResponder(responder);
24742474

@@ -2982,4 +2982,70 @@ public void disconnect() {
29822982
}
29832983
}
29842984

2985+
@Test
2986+
public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime() throws Exception {
2987+
final UnitTestApplication application = new UnitTestApplication();
2988+
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
2989+
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
2990+
UnitTestResponder responder = new UnitTestResponder();
2991+
session.setResponder(responder);
2992+
logonTo(session);
2993+
2994+
session.send(createPossDupAppMessage(1), false);
2995+
2996+
final Message sentMessage = new Message(responder.sentMessageData);
2997+
2998+
assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
2999+
assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
3000+
}
3001+
3002+
@Test
3003+
public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception {
3004+
final UnitTestApplication application = new UnitTestApplication();
3005+
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
3006+
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
3007+
UnitTestResponder responder = new UnitTestResponder();
3008+
session.setResponder(responder);
3009+
session.setAllowPosDup(true);
3010+
logonTo(session);
3011+
session.send(createPossDupAppMessage(1), false);
3012+
3013+
final Message sentMessage = new Message(responder.sentMessageData);
3014+
3015+
assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
3016+
assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
3017+
}
3018+
3019+
@Test
3020+
public void testSendWithAllowPosDupAsTrue_ShouldKeepPossDupFlagAndOrigSendingTime() throws Exception {
3021+
final UnitTestApplication application = new UnitTestApplication();
3022+
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
3023+
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
3024+
UnitTestResponder responder = new UnitTestResponder();
3025+
session.setResponder(responder);
3026+
logonTo(session);
3027+
session.send(createPossDupAppMessage(1), true);
3028+
3029+
final Message sentMessage = new Message(responder.sentMessageData);
3030+
3031+
assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
3032+
assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
3033+
}
3034+
3035+
@Test
3036+
public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception {
3037+
final UnitTestApplication application = new UnitTestApplication();
3038+
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
3039+
final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null);
3040+
UnitTestResponder responder = new UnitTestResponder();
3041+
session.setResponder(responder);
3042+
session.setAllowPosDup(true);
3043+
logonTo(session);
3044+
session.send(createPossDupAppMessage(1));
3045+
3046+
final Message sentMessage = new Message(responder.sentMessageData);
3047+
3048+
assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
3049+
assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
3050+
}
29853051
}

0 commit comments

Comments
 (0)