Skip to content

Commit 1041d15

Browse files
committed
Merge pull request #833 from the-thing/proxy_testing
Mina proxy handshake fix and proxy integration tests (cherry picked from commit 3427f08)
1 parent 37add8a commit 1041d15

File tree

8 files changed

+374
-9
lines changed

8 files changed

+374
-9
lines changed

quickfixj-core/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@
5555
<version>${slf4j.version}</version>
5656
<scope>test</scope>
5757
</dependency>
58-
58+
<dependency>
59+
<groupId>io.netty</groupId>
60+
<artifactId>netty-example</artifactId>
61+
<version>4.1.111.Final</version>
62+
<scope>test</scope>
63+
</dependency>
5964
<dependency>
6065
<groupId>org.apache.mina</groupId>
6166
<artifactId>mina-core</artifactId>
@@ -403,6 +408,15 @@
403408
</configuration>
404409
</plugin>
405410
</plugins>
411+
412+
<extensions>
413+
<!-- required by Netty cross-platform build -->
414+
<extension>
415+
<groupId>kr.motd.maven</groupId>
416+
<artifactId>os-maven-plugin</artifactId>
417+
<version>1.4.0.Final</version>
418+
</extension>
419+
</extensions>
406420
</build>
407421

408422
<reporting>
@@ -438,3 +452,4 @@
438452
</profile>
439453
</profiles>
440454
</project>
455+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package quickfix.mina;
2+
3+
import org.apache.mina.core.filterchain.IoFilterChain;
4+
import org.apache.mina.core.session.IoSession;
5+
import org.apache.mina.filter.ssl.SslFilter;
6+
7+
import javax.net.ssl.SSLContext;
8+
9+
/**
10+
* Temporary {@link SslFilter} wrapper that prevents auto connect for initiators.
11+
*/
12+
public class CustomSslFilter extends SslFilter {
13+
14+
private static final boolean DEFAULT_AUTO_START = true;
15+
16+
private final boolean autoStart;
17+
18+
public CustomSslFilter(SSLContext sslContext) {
19+
this(sslContext, DEFAULT_AUTO_START);
20+
}
21+
22+
public CustomSslFilter(SSLContext sslContext, boolean autoStart) {
23+
super(sslContext);
24+
this.autoStart = autoStart;
25+
}
26+
27+
@Override
28+
public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
29+
IoSession session = parent.getSession();
30+
31+
if (session.isConnected() && autoStart) {
32+
onConnected(next, session);
33+
}
34+
}
35+
}

quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,24 +52,21 @@ public class ProtocolFactory {
5252

5353
public final static int SOCKET = 0;
5454
public final static int VM_PIPE = 1;
55-
public final static int PROXY = 2;
5655

5756
public static String getTypeString(int type) {
5857
switch (type) {
5958
case SOCKET:
6059
return "SOCKET";
6160
case VM_PIPE:
6261
return "VM_PIPE";
63-
case PROXY:
64-
return "PROXY";
6562
default:
6663
return "unknown";
6764
}
6865
}
6966

7067
public static SocketAddress createSocketAddress(int transportType, String host,
7168
int port) throws ConfigError {
72-
if (transportType == SOCKET || transportType == PROXY) {
69+
if (transportType == SOCKET) {
7370
return host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(port);
7471
} else if (transportType == VM_PIPE) {
7572
return new VmPipeAddress(port);
@@ -94,8 +91,6 @@ public static int getTransportType(String string) {
9491
return SOCKET;
9592
} else if (string.equalsIgnoreCase("VM_PIPE")) {
9693
return VM_PIPE;
97-
} else if (string.equalsIgnoreCase("PROXY")) {
98-
return PROXY;
9994
} else {
10095
throw new RuntimeError("Unknown Transport Type type: " + string);
10196
}

quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import quickfix.SessionID;
4040
import quickfix.SessionSettings;
4141
import quickfix.mina.CompositeIoFilterChainBuilder;
42+
import quickfix.mina.CustomSslFilter;
4243
import quickfix.mina.EventHandlingStrategy;
4344
import quickfix.mina.NetworkingOptions;
4445
import quickfix.mina.ProtocolFactory;
@@ -132,7 +133,7 @@ private void installSSL(AcceptorSocketDescriptor descriptor,
132133
log.info("Installing SSL filter for {}", descriptor.getAddress());
133134
SSLConfig sslConfig = descriptor.getSslConfig();
134135
SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
135-
SslFilter sslFilter = new SslFilter(sslContext);
136+
SslFilter sslFilter = new CustomSslFilter(sslContext);
136137
sslFilter.setNeedClientAuth(sslConfig.isNeedClientAuth());
137138
sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites()
138139
: SSLSupport.getDefaultCipherSuites(sslContext));

quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import quickfix.SessionID;
3434
import quickfix.SystemTime;
3535
import quickfix.mina.CompositeIoFilterChainBuilder;
36+
import quickfix.mina.CustomSslFilter;
3637
import quickfix.mina.EventHandlingStrategy;
3738
import quickfix.mina.NetworkingOptions;
3839
import quickfix.mina.ProtocolFactory;
@@ -186,7 +187,7 @@ private void setupIoConnector() throws ConfigError, GeneralSecurityException {
186187
private SslFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder)
187188
throws GeneralSecurityException {
188189
final SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
189-
final SslFilter sslFilter = new SslFilter(sslContext);
190+
final SslFilter sslFilter = new CustomSslFilter(sslContext, false);
190191
sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites()
191192
: SSLSupport.getDefaultCipherSuites(sslContext));
192193
sslFilter.setEnabledProtocols(sslConfig.getEnabledProtocols() != null ? sslConfig.getEnabledProtocols()
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package quickfix.mina;
2+
3+
import io.netty.bootstrap.ServerBootstrap;
4+
import io.netty.channel.Channel;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.netty.channel.socket.nio.NioServerSocketChannel;
7+
import io.netty.example.socksproxy.SocksServerInitializer;
8+
import io.netty.handler.logging.LogLevel;
9+
import io.netty.handler.logging.LoggingHandler;
10+
import org.apache.mina.util.DaemonThreadFactory;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.util.concurrent.ThreadFactory;
15+
16+
/**
17+
* Simple SOCKS proxy server based on Netty examples. Only SOCKS protocols are currently supported.
18+
* The implementation performs the proxy handshake, but it doesn't perform any user authentication.
19+
*/
20+
public class SocksProxyServer {
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(SocksProxyServer.class);
23+
private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory();
24+
25+
private final ServerBootstrap bootstrap;
26+
private final int port;
27+
private Channel channel;
28+
29+
public SocksProxyServer(int port) {
30+
this.bootstrap = new ServerBootstrap();
31+
this.bootstrap.group(new NioEventLoopGroup(THREAD_FACTORY), new NioEventLoopGroup(THREAD_FACTORY))
32+
.channel(NioServerSocketChannel.class)
33+
.handler(new LoggingHandler(LogLevel.DEBUG))
34+
.childHandler(new SocksServerInitializer());
35+
this.port = port;
36+
}
37+
38+
public synchronized void start() {
39+
if (channel != null) {
40+
throw new IllegalStateException("SOCKS proxy server is running already");
41+
}
42+
43+
try {
44+
channel = bootstrap.bind(port)
45+
.sync()
46+
.channel();
47+
} catch (InterruptedException e) {
48+
Thread.currentThread().interrupt();
49+
throw new RuntimeException(e);
50+
}
51+
52+
LOGGER.info("SOCKS proxy server started at port: {}", port);
53+
}
54+
55+
public synchronized void stop() {
56+
if (channel == null) {
57+
throw new IllegalStateException("SOCKS proxy server is not running");
58+
}
59+
60+
try {
61+
channel.close().sync();
62+
channel = null;
63+
} catch (InterruptedException e) {
64+
Thread.currentThread().interrupt();
65+
throw new RuntimeException("Failed to close SOCKS proxy server");
66+
}
67+
68+
LOGGER.info("SOCKS proxy server stopped at port {}", port);
69+
}
70+
71+
public int getPort() {
72+
return port;
73+
}
74+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package quickfix.mina;
2+
3+
import org.apache.mina.util.AvailablePortFinder;
4+
import org.junit.After;
5+
import org.junit.Before;
6+
import org.junit.Test;
7+
import quickfix.Acceptor;
8+
import quickfix.ApplicationAdapter;
9+
import quickfix.ConfigError;
10+
import quickfix.DefaultMessageFactory;
11+
import quickfix.FixVersions;
12+
import quickfix.Initiator;
13+
import quickfix.MemoryStoreFactory;
14+
import quickfix.MessageFactory;
15+
import quickfix.MessageStoreFactory;
16+
import quickfix.Session;
17+
import quickfix.SessionFactory;
18+
import quickfix.SessionID;
19+
import quickfix.SessionSettings;
20+
import quickfix.ThreadedSocketAcceptor;
21+
import quickfix.ThreadedSocketInitiator;
22+
23+
import java.util.HashMap;
24+
import java.util.concurrent.TimeUnit;
25+
26+
/**
27+
* Performs end to end tests against SOCKS proxy server.
28+
*/
29+
public class SocksProxyTest {
30+
31+
// maximum time to wait for session logon
32+
private static final long TIMEOUT_SECONDS = 5;
33+
34+
private SocksProxyServer proxyServer;
35+
36+
@Before
37+
public void setUp() {
38+
int proxyPort = AvailablePortFinder.getNextAvailable();
39+
40+
proxyServer = new SocksProxyServer(proxyPort);
41+
proxyServer.start();
42+
}
43+
44+
@After
45+
public void tearDown() {
46+
proxyServer.stop();
47+
}
48+
49+
@Test
50+
public void shouldLoginViaSocks4Proxy() throws ConfigError {
51+
shouldLoginSocksProxy("4");
52+
}
53+
54+
@Test
55+
public void shouldLoginViaSocks4aProxy() throws ConfigError {
56+
shouldLoginSocksProxy("4a");
57+
}
58+
59+
@Test
60+
public void shouldLoginViaSocks5Proxy() throws ConfigError {
61+
shouldLoginSocksProxy("5");
62+
}
63+
64+
private void shouldLoginSocksProxy(String proxyVersion) throws ConfigError {
65+
int port = AvailablePortFinder.getNextAvailable();
66+
SessionConnector acceptor = createAcceptor(port);
67+
68+
try {
69+
acceptor.start();
70+
71+
SessionConnector initiator = createInitiator(proxyVersion, proxyServer.getPort(), port);
72+
73+
try {
74+
initiator.start();
75+
assertLoggedOn(acceptor, new SessionID(FixVersions.BEGINSTRING_FIX44, "ALICE", "BOB"));
76+
assertLoggedOn(initiator, new SessionID(FixVersions.BEGINSTRING_FIX44, "BOB", "ALICE"));
77+
} finally {
78+
initiator.stop();
79+
}
80+
} finally {
81+
acceptor.stop();
82+
}
83+
}
84+
85+
private void assertLoggedOn(SessionConnector connector, SessionID sessionID) {
86+
long startTimeNanos = System.nanoTime();
87+
88+
while (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTimeNanos) < TIMEOUT_SECONDS) {
89+
if (isLoggedOn(connector, sessionID)) {
90+
return;
91+
}
92+
93+
try {
94+
Thread.sleep(100);
95+
} catch (InterruptedException e) {
96+
Thread.currentThread().interrupt();
97+
throw new RuntimeException("Interrupted", e);
98+
}
99+
}
100+
101+
throw new AssertionError("Session " + sessionID + " is not logged on");
102+
}
103+
104+
private boolean isLoggedOn(SessionConnector connector, SessionID sessionID) {
105+
Session session = connector.getSessionMap().get(sessionID);
106+
107+
if (session == null) {
108+
return false;
109+
}
110+
111+
return session.isLoggedOn();
112+
}
113+
114+
private SessionConnector createAcceptor(int port) throws ConfigError {
115+
MessageStoreFactory messageStoreFactory = new MemoryStoreFactory();
116+
MessageFactory messageFactory = new DefaultMessageFactory();
117+
SessionSettings acceptorSettings = createAcceptorSettings("ALICE", "BOB", port);
118+
return new ThreadedSocketAcceptor(new ApplicationAdapter(), messageStoreFactory, acceptorSettings, messageFactory);
119+
}
120+
121+
private SessionConnector createInitiator(String proxyVersion, int proxyPort, int port) throws ConfigError {
122+
MessageStoreFactory messageStoreFactory = new MemoryStoreFactory();
123+
MessageFactory messageFactory = new DefaultMessageFactory();
124+
SessionSettings initiatorSettings = createInitiatorSettings("BOB", "ALICE", proxyVersion, proxyPort, port);
125+
return new ThreadedSocketInitiator(new ApplicationAdapter(), messageStoreFactory, initiatorSettings, messageFactory);
126+
}
127+
128+
private SessionSettings createAcceptorSettings(String senderId, String targetId, int port) {
129+
HashMap<Object, Object> defaults = new HashMap<>();
130+
defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "acceptor");
131+
defaults.put(Acceptor.SETTING_SOCKET_ACCEPT_PORT, Integer.toString(port));
132+
defaults.put(Session.SETTING_START_TIME, "00:00:00");
133+
defaults.put(Session.SETTING_END_TIME, "00:00:00");
134+
defaults.put(Session.SETTING_HEARTBTINT, "30");
135+
136+
SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId);
137+
138+
SessionSettings sessionSettings = new SessionSettings();
139+
sessionSettings.set(defaults);
140+
sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44);
141+
sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml");
142+
sessionSettings.setString(sessionID, "SenderCompID", senderId);
143+
sessionSettings.setString(sessionID, "TargetCompID", targetId);
144+
145+
return sessionSettings;
146+
}
147+
148+
private SessionSettings createInitiatorSettings(String senderId, String targetId, String proxyVersion, int proxyPort, int port) {
149+
HashMap<Object, Object> defaults = new HashMap<>();
150+
defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "initiator");
151+
defaults.put(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, ProtocolFactory.getTypeString(ProtocolFactory.SOCKET));
152+
defaults.put(Initiator.SETTING_SOCKET_CONNECT_HOST, "localhost");
153+
defaults.put(Initiator.SETTING_SOCKET_CONNECT_PORT, Integer.toString(port));
154+
defaults.put(Initiator.SETTING_RECONNECT_INTERVAL, "2");
155+
defaults.put(Initiator.SETTING_PROXY_HOST, "localhost");
156+
defaults.put(Initiator.SETTING_PROXY_PORT, Integer.toString(proxyPort));
157+
defaults.put(Initiator.SETTING_PROXY_TYPE, "socks");
158+
defaults.put(Initiator.SETTING_PROXY_VERSION, proxyVersion);
159+
defaults.put(Initiator.SETTING_PROXY_USER, "proxy-user");
160+
defaults.put(Initiator.SETTING_PROXY_PASSWORD, "proxy-password");
161+
defaults.put(Session.SETTING_START_TIME, "00:00:00");
162+
defaults.put(Session.SETTING_END_TIME, "00:00:00");
163+
defaults.put(Session.SETTING_HEARTBTINT, "30");
164+
165+
SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId);
166+
167+
SessionSettings sessionSettings = new SessionSettings();
168+
sessionSettings.set(defaults);
169+
sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44);
170+
sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml");
171+
sessionSettings.setString(sessionID, "SenderCompID", senderId);
172+
sessionSettings.setString(sessionID, "TargetCompID", targetId);
173+
174+
return sessionSettings;
175+
}
176+
177+
}

0 commit comments

Comments
 (0)