Skip to content

Commit 5bfaffc

Browse files
authored
Merge pull request #897 from nats-io/fix_870
[ADDED] `natsOptions_SetProxyConnHandler` option
2 parents 8c2231e + 58fd1db commit 5bfaffc

File tree

7 files changed

+198
-21
lines changed

7 files changed

+198
-21
lines changed

src/comsock.c

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2015-2021 The NATS Authors
1+
// Copyright 2015-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -134,6 +134,15 @@ natsSock_ShuffleIPs(natsSockCtx *ctx, struct addrinfo **tmp, int tmpSize, struct
134134

135135
#define MAX_HOST_NAME (256)
136136

137+
static void
138+
_resetDeadline(natsSockCtx* ctx, int64_t start, int64_t totalTimeout)
139+
{
140+
int64_t used = nats_Now() - start;
141+
int64_t left = totalTimeout - used;
142+
143+
natsDeadline_Init(&(ctx->writeDeadline), (left > 0 ? left : 0));
144+
}
145+
137146
natsStatus
138147
natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
139148
{
@@ -170,12 +179,25 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
170179

171180
snprintf(sport, sizeof(sport), "%d", port);
172181

173-
if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
174-
max = 2;
175-
176182
start = nats_Now();
177183

178-
for (i=0; i<max; i++)
184+
// Call the proxy connect callback if provided
185+
if (ctx->proxyConnectCb != NULL)
186+
{
187+
// Invoke the proxy connect callback.
188+
s = ctx->proxyConnectCb(&ctx->fd, host, port, ctx->proxyConnectClosure);
189+
190+
// If there was a deadline, reset the deadline with whatever is left.
191+
if (totalTimeout > 0)
192+
_resetDeadline(ctx, start, totalTimeout);
193+
194+
return NATS_UPDATE_ERR_STACK(s);
195+
}
196+
197+
if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
198+
max = 2;
199+
200+
for (i = 0; i < max; i++)
179201
{
180202
struct addrinfo hints;
181203
struct addrinfo *servinfo = NULL;
@@ -306,12 +328,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
306328

307329
// If there was a deadline, reset the deadline with whatever is left.
308330
if (totalTimeout > 0)
309-
{
310-
int64_t used = nats_Now() - start;
311-
int64_t left = totalTimeout - used;
312-
313-
natsDeadline_Init(&(ctx->writeDeadline), (left > 0 ? left : 0));
314-
}
331+
_resetDeadline(ctx, start, totalTimeout);
315332

316333
return NATS_UPDATE_ERR_STACK(s);
317334
}

src/conn.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2015-2024 The NATS Authors
1+
// Copyright 2015-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -403,6 +403,10 @@ _createConn(natsConnection *nc)
403403
// Set ctx.noRandomize based on public NoRandomize option.
404404
nc->sockCtx.noRandomize = nc->opts->noRandomize;
405405

406+
// Set the proxy connect callback and closure.
407+
nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb;
408+
nc->sockCtx.proxyConnectClosure = nc->opts->proxyConnectClosure;
409+
406410
s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port);
407411
if (s == NATS_OK)
408412
nc->sockCtx.fdActive = true;

src/nats.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1672,6 +1672,20 @@ typedef void (*natsMsgHandler)(
16721672
typedef void (*natsConnectionHandler)(
16731673
natsConnection *nc, void *closure);
16741674

1675+
/** \brief Callback used to handle connections via proxy.
1676+
*
1677+
* This callback is used to handle connections via proxy.
1678+
* It creates a socket, uses it for proxy verification, and returns it in `fd`
1679+
* to be used for the NATS connection.
1680+
*
1681+
* The callback should return #NATS_OK if it has created a socket and returned it
1682+
* in `fd`, or any other error, such as #NATS_ERR if not.
1683+
*
1684+
* @see natsOptions_SetProxyConnHandler
1685+
*/
1686+
typedef natsStatus (*natsProxyConnHandler)(
1687+
natsSock *fd, char *host, int port, void *closure);
1688+
16751689
/** \brief Callback used to notify the user of errors encountered while processing
16761690
* inbound messages.
16771691
*
@@ -2951,6 +2965,21 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending);
29512965
NATS_EXTERN natsStatus
29522966
natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending);
29532967

2968+
/** \brief Sets the proxy connection handler.
2969+
*
2970+
* Specifies the callback to invoke for proxy connection returning the socket to use.
2971+
*
2972+
* @see natsProxyConnHandler
2973+
*
2974+
* @param opts the pointer to the #natsOptions object.
2975+
* @param proxyConnHandler the proxy connection handler callback.
2976+
* @param closure a pointer to an user object that will be passed to
2977+
* the callback. `closure` can be `NULL`.
2978+
*/
2979+
NATS_EXTERN natsStatus
2980+
natsOptions_SetProxyConnHandler(natsOptions *opts, natsProxyConnHandler proxyConnHandler,
2981+
void *closure);
2982+
29542983
/** \brief Sets the error handler for asynchronous events.
29552984
*
29562985
* Specifies the callback to invoke when an asynchronous error

src/natsp.h

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ struct __natsOptions
277277
natsConnectionHandler microClosedCb;
278278
natsErrHandler microAsyncErrCb;
279279

280+
natsProxyConnHandler proxyConnectCb;
281+
void *proxyConnectClosure;
282+
280283
int64_t pingInterval;
281284
int maxPingsOut;
282285
int maxPendingMsgs;
@@ -653,22 +656,25 @@ typedef struct __natsPongList
653656

654657
typedef struct __natsSockCtx
655658
{
656-
natsSock fd;
657-
bool fdActive;
659+
natsSock fd;
660+
bool fdActive;
658661

659-
natsDeadline readDeadline;
660-
natsDeadline writeDeadline;
662+
natsDeadline readDeadline;
663+
natsDeadline writeDeadline;
661664

662-
SSL *ssl;
665+
SSL *ssl;
663666

664667
// This is true when we are using an external event loop (such as libuv).
665-
bool useEventLoop;
668+
bool useEventLoop;
666669

667-
int orderIP; // possible values: 0,4,6,46,64
670+
int orderIP; // possible values: 0,4,6,46,64
668671

669672
// By default, the list of IPs returned by the hostname resolution will
670673
// be shuffled. This option, if `true`, will disable the shuffling.
671-
bool noRandomize;
674+
bool noRandomize;
675+
676+
natsProxyConnHandler proxyConnectCb;
677+
void *proxyConnectClosure;
672678

673679
} natsSockCtx;
674680

src/opts.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2015-2024 The NATS Authors
1+
// Copyright 2015-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -989,6 +989,19 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending)
989989
return NATS_OK;
990990
}
991991

992+
natsStatus
993+
natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler, void *closure)
994+
{
995+
LOCK_AND_CHECK_OPTIONS(opts, 0)
996+
997+
opts->proxyConnectCb = proxyConnHandler;
998+
opts->proxyConnectClosure = closure;
999+
1000+
UNLOCK_OPTS(opts);
1001+
1002+
return NATS_OK;
1003+
}
1004+
9921005
natsStatus
9931006
natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
9941007
void *closure)

test/list_test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ _test(PingReconnect)
220220
_test(ProcessMsgArgs)
221221
_test(ProperFalloutAfterMaxAttempts)
222222
_test(ProperReconnectDelay)
223+
_test(ProxyConnectCb)
223224
_test(PublishMsg)
224225
_test(PubSubWithReply)
225226
_test(QueueSubscriber)

test/test.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2705,6 +2705,21 @@ _dummyTokenHandler(void *closure)
27052705
return "token";
27062706
}
27072707

2708+
static natsStatus
2709+
_dummyProxyConnHandler(natsSock *fd, char *host, int port, void *closure)
2710+
{
2711+
struct threadArg *args = (struct threadArg*) closure;
2712+
2713+
if (closure != NULL)
2714+
{
2715+
natsMutex_Lock(args->m);
2716+
args->sum++;
2717+
natsMutex_Unlock(args->m);
2718+
}
2719+
2720+
return NATS_ERR;
2721+
}
2722+
27082723
static void
27092724
_dummyErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err,
27102725
void *closure)
@@ -3000,6 +3015,16 @@ void test_natsOptions(void)
30003015
s = natsOptions_SetMaxPendingBytes(opts, 1000000);
30013016
testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000))
30023017

3018+
test("Set Proxy Connection Handler: ")
3019+
s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler, (void*) 1);
3020+
testCond((s == NATS_OK) && (opts->proxyConnectCb == _dummyProxyConnHandler)
3021+
&& (opts->proxyConnectClosure == (void*) 1));
3022+
3023+
test("Remove Proxy Connection Handler: ")
3024+
s = natsOptions_SetProxyConnHandler(opts, NULL, NULL);
3025+
testCond((s == NATS_OK) && (opts->proxyConnectCb == NULL)
3026+
&& (opts->proxyConnectClosure == NULL));
3027+
30033028
test("Set Error Handler: ");
30043029
s = natsOptions_SetErrorHandler(opts, _dummyErrHandler, NULL);
30053030
testCond((s == NATS_OK) && (opts->asyncErrCb == _dummyErrHandler));
@@ -5952,6 +5977,88 @@ void test_ReconnectServerStats(void)
59525977
_destroyDefaultThreadArgs(&args);
59535978
}
59545979

5980+
5981+
static natsStatus
5982+
_proxyConnectCb(natsSock *fd, char *host, int port, void *closure)
5983+
{
5984+
struct threadArg *arg = (struct threadArg*) closure;
5985+
natsStatus s = NATS_OK;
5986+
natsSockCtx ctx;
5987+
5988+
natsMutex_Lock(arg->m);
5989+
arg->sum++;
5990+
natsMutex_Unlock(arg->m);
5991+
5992+
s = natsSock_Init(&ctx);
5993+
IFOK(s, natsSock_ConnectTcp(&ctx, host, port))
5994+
5995+
if (s == NATS_OK)
5996+
*fd = ctx.fd;
5997+
5998+
return s;
5999+
}
6000+
6001+
void test_ProxyConnectCb(void)
6002+
{
6003+
natsStatus s;
6004+
natsConnection *nc = NULL;
6005+
natsOptions *opts = NULL;
6006+
natsPid serverPid = NATS_INVALID_PID;
6007+
const char *servers[] = { "nats://127.0.0.1:4222", "nats://127.0.0.1:4223" };
6008+
struct threadArg args;
6009+
6010+
s = _createDefaultThreadArgsForCbTests(&args);
6011+
if (s != NATS_OK)
6012+
FAIL("Unable to setup test")
6013+
6014+
serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
6015+
CHECK_SERVER_STARTED(serverPid)
6016+
6017+
s = natsOptions_Create(&opts);
6018+
IFOK(s, natsOptions_SetServers(opts, servers, 2));
6019+
IFOK(s, natsOptions_SetNoRandomize(opts, true));
6020+
if (s != NATS_OK)
6021+
FAIL("Unable to create options")
6022+
6023+
test("Set connectCb that returns failure: ");
6024+
s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler, (void*) &args);
6025+
testCond(s == NATS_OK);
6026+
6027+
test("Connect with connectCb returning failure: ");
6028+
s = natsConnection_Connect(&nc, opts);
6029+
testCond(s == NATS_NO_SERVER);
6030+
nats_clearLastError();
6031+
6032+
test("Check invoked properly: ");
6033+
natsMutex_Lock(args.m);
6034+
s = (args.sum == 2 ? NATS_OK : NATS_ERR);
6035+
args.sum = 0;
6036+
natsMutex_Unlock(args.m);
6037+
testCond(s == NATS_OK);
6038+
6039+
test("Set connectCb that returns success: ");
6040+
s = natsOptions_SetProxyConnHandler(opts, _proxyConnectCb, (void*) &args);
6041+
testCond(s == NATS_OK);
6042+
6043+
test("Connect with connectCb returning success: ");
6044+
s = natsConnection_Connect(&nc, opts);
6045+
testCond(s == NATS_OK);
6046+
6047+
test("Check invoked properly: ");
6048+
natsMutex_Lock(args.m);
6049+
s = (args.sum == 1 ? NATS_OK : NATS_ERR);
6050+
args.sum = 0;
6051+
natsMutex_Unlock(args.m);
6052+
testCond(s == NATS_OK);
6053+
6054+
natsConnection_Destroy(nc);
6055+
natsOptions_Destroy(opts);
6056+
6057+
_stopServer(serverPid);
6058+
6059+
_destroyDefaultThreadArgs(&args);
6060+
}
6061+
59556062
static void
59566063
_disconnectedCb(natsConnection *nc, void *closure)
59576064
{

0 commit comments

Comments
 (0)