Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions src/comsock.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2021 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -134,6 +134,18 @@ natsSock_ShuffleIPs(natsSockCtx *ctx, struct addrinfo **tmp, int tmpSize, struct

#define MAX_HOST_NAME (256)

void resetDeadline(natsSockCtx* ctx, int64_t start, int64_t totalTimeout)
{
// If there was a deadline, reset the deadline with whatever is left.
if (totalTimeout > 0)
{
int64_t used = nats_Now() - start;
int64_t left = totalTimeout - used;

natsDeadline_Init(&(ctx->writeDeadline), (left > 0 ? left : 0));
}
}

natsStatus
natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
{
Expand All @@ -152,6 +164,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
int64_t totalTimeout = 0;
int64_t timeoutPerIP = 0;
struct addrinfo *tmpStorage[64];
bool hasProxyConnectCb = ctx->proxyConnectCb != NULL;

if (phost == NULL)
return nats_setError(NATS_ADDRESS_MISSING, "%s", "No host specified");
Expand All @@ -170,44 +183,55 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)

snprintf(sport, sizeof(sport), "%d", port);

if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
max = 2;

start = nats_Now();

for (i=0; i<max; i++)
// Call the proxy connect callback if provided
if (hasProxyConnectCb)
{
struct addrinfo hints;
struct addrinfo *servinfo = NULL;
int count = 0;
struct addrinfo *p;
// Invoke the proxy connect callback.
s = ctx->proxyConnectCb(host, port, &ctx->fd);

memset(&hints,0,sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
resetDeadline(ctx, start, totalTimeout);

switch (ctx->orderIP)
{
case 4: hints.ai_family = AF_INET; break;
case 6: hints.ai_family = AF_INET6; break;
case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break;
case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break;
default: hints.ai_family = AF_UNSPEC;
}
return NATS_UPDATE_ERR_STACK(s);
}

if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0)
{
s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s",
gai_strerror(res));
continue;
if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
max = 2;

for (i = 0; i < max; i++)
{
struct addrinfo hints;
struct addrinfo* servinfo = NULL;
int count = 0;
struct addrinfo* p;

memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;

switch (ctx->orderIP)
{
case 4: hints.ai_family = AF_INET; break;
case 6: hints.ai_family = AF_INET6; break;
case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break;
case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break;
default: hints.ai_family = AF_UNSPEC;
}
servInfos[numServInfo] = servinfo;
for (p = servinfo; (p != NULL); p = p->ai_next)
{
count++;
numIPs++;
if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0)
{
s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", gai_strerror(res));
continue;
}
natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count);
numServInfo++;

servInfos[numServInfo] = servinfo;
for (p = servinfo; (p != NULL); p = p->ai_next)
{
count++;
numIPs++;
}
natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count);
numServInfo++;
}
// If we got a getaddrinfo() and there is no servInfos to try to connect to
// bail out now.
Expand Down Expand Up @@ -238,7 +262,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
for (i=0; i<numServInfo; i++)
{
struct addrinfo *p;

for (p = servInfos[i]; (p != NULL); p = p->ai_next)
{
ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
Expand Down Expand Up @@ -281,15 +305,15 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
s = nats_setDefaultError(NATS_NO_SERVER);
}
}

if (s == NATS_OK)
{
s = natsSock_SetCommonTcpOptions(ctx->fd);
// We have connected OK and completed setting options, so we are done.
if (s == NATS_OK)
break;
}

_closeFd(ctx->fd);
ctx->fd = NATS_SOCK_INVALID;
}
Expand All @@ -301,17 +325,11 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
break;
}
}

for (i=0; i<numServInfo; i++)
nats_FreeAddrInfo(servInfos[i]);

// If there was a deadline, reset the deadline with whatever is left.
if (totalTimeout > 0)
{
int64_t used = nats_Now() - start;
int64_t left = totalTimeout - used;

natsDeadline_Init(&(ctx->writeDeadline), (left > 0 ? left : 0));
}
resetDeadline(ctx, start, totalTimeout);

return NATS_UPDATE_ERR_STACK(s);
}
Expand Down
5 changes: 4 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2024 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -402,6 +402,9 @@ _createConn(natsConnection *nc)

// Set ctx.noRandomize based on public NoRandomize option.
nc->sockCtx.noRandomize = nc->opts->noRandomize;

// Set the proxy connect callback
nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb;

s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port);
if (s == NATS_OK)
Expand Down
21 changes: 21 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,15 @@ typedef void (*natsMsgHandler)(
typedef void (*natsConnectionHandler)(
natsConnection *nc, void *closure);

/** \brief Callback used to handle connections via proxy.
*
* This callback is used to handle connections via proxy.
* It creates a socket, use it for proxy verification, and return it in `fd`
* to be used for the bus connection.
*/
typedef natsStatus (*natsProxyConnHandler)(
char* host, int port, natsSock* fd);

/** \brief Callback used to notify the user of errors encountered while processing
* inbound messages.
*
Expand Down Expand Up @@ -2951,6 +2960,18 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending);
NATS_EXTERN natsStatus
natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending);

/** \brief Sets the proxy connection handler.
*
* Specifies the callback to invoke for proxy connection returning the socket to use.
*
* @see natsProxyConnHandler
*
* @param opts the pointer to the #natsOptions object.
* @param proxyConnHandler the proxy connection handler callback.
*/
NATS_EXTERN natsStatus
natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler);

/** \brief Sets the error handler for asynchronous events.
*
* Specifies the callback to invoke when an asynchronous error
Expand Down
20 changes: 12 additions & 8 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ struct __natsOptions
natsConnectionHandler microClosedCb;
natsErrHandler microAsyncErrCb;

natsProxyConnHandler proxyConnectCb;

int64_t pingInterval;
int maxPingsOut;
int maxPendingMsgs;
Expand Down Expand Up @@ -653,22 +655,24 @@ typedef struct __natsPongList

typedef struct __natsSockCtx
{
natsSock fd;
bool fdActive;
natsSock fd;
bool fdActive;

natsDeadline readDeadline;
natsDeadline writeDeadline;
natsDeadline readDeadline;
natsDeadline writeDeadline;

SSL *ssl;
SSL *ssl;

// This is true when we are using an external event loop (such as libuv).
bool useEventLoop;
bool useEventLoop;

int orderIP; // possible values: 0,4,6,46,64
int orderIP; // possible values: 0,4,6,46,64

// By default, the list of IPs returned by the hostname resolution will
// be shuffled. This option, if `true`, will disable the shuffling.
bool noRandomize;
bool noRandomize;

natsProxyConnHandler proxyConnectCb;

} natsSockCtx;

Expand Down
14 changes: 13 additions & 1 deletion src/opts.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2024 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -989,6 +989,18 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending)
return NATS_OK;
}

natsStatus
natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler)
{
LOCK_AND_CHECK_OPTIONS(opts, 0)

opts->proxyConnectCb = proxyConnHandler;

UNLOCK_OPTS(opts);

return NATS_OK;
}

natsStatus
natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
void *closure)
Expand Down
1 change: 1 addition & 0 deletions test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ _test(PingReconnect)
_test(ProcessMsgArgs)
_test(ProperFalloutAfterMaxAttempts)
_test(ProperReconnectDelay)
_test(ProxyConnectCb)
_test(PublishMsg)
_test(PubSubWithReply)
_test(QueueSubscriber)
Expand Down
Loading