Skip to content

Commit 3faeb44

Browse files
committed
draim multiple messages in UserThreadWebsocketCallbacks; Fix various bugs
1 parent 8f04f21 commit 3faeb44

10 files changed

Lines changed: 72 additions & 37 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
- Changed header files from .h to .hpp
1818
- Decoupled UserThreadWebsocketCallbacks from WebSocketeClient to support multiple WebSocketClient
1919
- Create market data and user data websocket when url is set
20+
- UserThreadWebsocketCallbacks now drains multiple queued messages per tick for higher throughput
21+
- Propagate precompile headers
22+
2023

2124
### Fixed
2225
- Various WebSocket unit tests not waiting for snapshot
2326
- Fixed duplicated Candle definition
27+
- WebSocket logger now writes correct payload offsets and labels user data correctly
28+
- Sequence-number checks now accept first message even if the sequence does not start at 0
29+
- Level2 book compile issues and trade timestamp handling
30+
- Trades JSON parsing (pass-by-reference)
31+
- Empty API secret handling in PEM formatting
32+
- PriceBookResponse parsing when pricebook is missing
33+
- Order status string typo and size_ratio field name
34+
- Missing default return in FCM trading session state parsing
2435

2536
## [0.1.0] - 2026-01-13
2637

CMakeLists.txt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cmake_minimum_required(VERSION 3.20)
1+
cmake_minimum_required(VERSION 3.30)
22

33
set(CMAKE_CXX_STANDARD 20)
44

@@ -42,8 +42,12 @@ target_include_directories(coinbase-advanced-cpp INTERFACE
4242
target_link_libraries(coinbase-advanced-cpp INTERFACE slick::net jwt-cpp::jwt-cpp)
4343
target_precompile_headers(coinbase-advanced-cpp INTERFACE
4444
<nlohmann/json.hpp>
45-
<slick/net/http.h>
46-
<slick/net/websocket.h>
45+
<random>
46+
<fstream>
47+
)
48+
49+
set_target_properties(coinbase-advanced-cpp PROPERTIES
50+
TRANSITIVE_COMPILE_PROPERTIES "PRECOMPILE_HEADERS"
4751
)
4852

4953
if (MSVC)

include/coinbase/auth.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
namespace coinbase {
1717

1818
inline std::string fix_pem_format(std::string key) {
19+
if (key.empty()) {
20+
return key;
21+
}
1922
// Remove any quotes that might have been added
20-
if (key.front() == '"' && key.back() == '"') {
23+
if (key.size() >= 2 && key.front() == '"' && key.back() == '"') {
2124
key = key.substr(1, key.length() - 2);
2225
}
2326

include/coinbase/level2_book.hpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66

77
#include <string>
88
#include <vector>
9+
#include <map>
10+
#include <functional>
911
#include <nlohmann/json.hpp>
1012
#include <coinbase/utils.hpp>
13+
#include <coinbase/market_data.hpp>
1114

1215
using json = nlohmann::json;
1316

@@ -21,11 +24,11 @@ struct SideBook {
2124

2225
struct Level2Book {
2326
std::string product_id;
24-
uint64_t last_update_time;
27+
uint64_t last_update_time = 0;
2528
SideBook<Side::BUY> bids;
2629
SideBook<Side::SELL> asks;
2730

28-
void onLevel2Snapshot(uin64_t seq_num, const Level2UpdateBatch &batch) {
31+
void onLevel2Snapshot(uint64_t seq_num, const Level2UpdateBatch &batch) {
2932
bids.levels.clear();
3033
asks.levels.clear();
3134
for (const auto &update : batch.updates) {
@@ -35,10 +38,12 @@ struct Level2Book {
3538
asks.levels.emplace(update.price_level, update.new_quantity);
3639
}
3740
}
38-
last_update_time = batch.updates.back().event_time;
41+
if (!batch.updates.empty()) {
42+
last_update_time = batch.updates.back().event_time;
43+
}
3944
}
4045

41-
void onLevel2Updates(uin64_t seq_num, const Level2UpdateBatch &batch) {
46+
void onLevel2Updates(uint64_t seq_num, const Level2UpdateBatch &batch) {
4247
for (const auto &update : batch.updates) {
4348
if (update.event_time <= last_update_time) {
4449
LOG_TRACE("{} Skipping update event_time {} <= last_update_time {}", product_id, update.event_time, last_update_time);
@@ -71,10 +76,10 @@ struct Level2Book {
7176
}
7277
}
7378

74-
void onMarketTrades(uin64_t seq_num, const std::vector<MarketTrade> &trades) {
79+
void onMarketTrades(uint64_t seq_num, const std::vector<MarketTrade> &trades) {
7580
for (const auto &trade : trades) {
76-
if (trade.event_time <= last_update_time) {
77-
LOG_TRACE("{} Skipping trade event_time {} <= last_update_time {}", product_id, trade.event_time, last_update_time);
81+
if (trade.time <= last_update_time) {
82+
LOG_TRACE("{} Skipping trade event_time {} <= last_update_time {}", product_id, trade.time, last_update_time);
7883
continue;
7984
}
8085
if (trade.side == Side::BUY) {
@@ -94,7 +99,7 @@ struct Level2Book {
9499
}
95100
}
96101
}
97-
last_update_time = trade.event_time;
102+
last_update_time = trade.time;
98103
}
99104
}
100105
};

include/coinbase/order.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ struct ScaledLimitConfig : public MarketConfig{
155155
std::string price_distribution;
156156
std::string size_distribution;
157157
double size_diff;
158-
double size_ration;
158+
double size_ratio;
159159
};
160160

161161
inline void from_json(const json &j, ScaledLimitConfig &c) {
@@ -167,7 +167,7 @@ inline void from_json(const json &j, ScaledLimitConfig &c) {
167167
VARIABLE_FROM_JSON(j, c, price_distribution);
168168
VARIABLE_FROM_JSON(j, c, size_distribution);
169169
DOUBLE_FROM_JSON(j, c, size_diff);
170-
DOUBLE_FROM_JSON(j, c, size_ration);
170+
DOUBLE_FROM_JSON(j, c, size_ratio);
171171
}
172172

173173
struct OrderConfiguration {
@@ -317,7 +317,7 @@ inline std::string to_string(OrderStatus status) {
317317
case OrderStatus::QUEUED:
318318
return "QUEUED";
319319
case OrderStatus::CANCEL_QUEUED:
320-
return "CANCEL_QUEUEQ";
320+
return "CANCEL_QUEUED";
321321
case OrderStatus::EDIT_QUEUED:
322322
return "EDIT_QUEUED";
323323
case OrderStatus::UNKNOWN_ORDER_STATUS:

include/coinbase/price_book.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ struct PriceBookResponse {
7171

7272
inline void from_json(const json &j, PriceBookResponse &b) {
7373
STRUCT_FROM_JSON(j, b, pricebook);
74-
b.pricebook = j["pricebook"];
7574
DOUBLE_FROM_JSON(j, b, last);
7675
DOUBLE_FROM_JSON(j, b, mid_market);
7776
DOUBLE_FROM_JSON(j, b, spread_bps);

include/coinbase/product.hpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ inline FcmTradingSessionState to_fcm_trading_session_state(std::string_view stat
4040
return FcmTradingSessionState::FCM_TRADING_SESSION_STATE_CLOSE;
4141
}
4242

43-
FcmTradingSessionState::FCM_TRADING_SESSION_STATE_UNDEFINED;
43+
return FcmTradingSessionState::FCM_TRADING_SESSION_STATE_UNDEFINED;
4444
}
4545

4646
struct Maintenance {
@@ -152,11 +152,11 @@ inline void from_json(const json &j, FutureProductDetails &d) {
152152
VARIABLE_FROM_JSON(j, d, twenty_four_by_seven);
153153
}
154154

155-
struct SettlmentSource {
155+
struct SettlementSource {
156156
std::string url;
157157
std::string name;
158158

159-
NLOHMANN_DEFINE_TYPE_INTRUSIVE(SettlmentSource, url, name)
159+
NLOHMANN_DEFINE_TYPE_INTRUSIVE(SettlementSource, url, name)
160160
};
161161

162162
struct PredictionMarketProductDetails {
@@ -181,7 +181,7 @@ struct PredictionMarketProductDetails {
181181
std::string scope;
182182
std::string yes_titles;
183183
std::vector<std::string> prehibitions;
184-
std::vector<SettlmentSource> settlement_sources;
184+
std::vector<SettlementSource> settlement_sources;
185185
std::vector<std::string> tags;
186186
uint64_t contract_expiry;
187187
uint64_t trade_starting_time;
@@ -213,7 +213,7 @@ inline void from_json(const json &j, PredictionMarketProductDetails &p) {
213213
VARIABLE_FROM_JSON(j, p, market_cbrn);
214214
VARIABLE_FROM_JSON(j, p, scope);
215215
VARIABLE_FROM_JSON(j, p, yes_titles);
216-
p.yes_titles = j["prehibitions"];
216+
p.prehibitions = j["prehibitions"];
217217
p.settlement_sources = j["settlement_sources"];
218218
p.tags = j["tags"];
219219
TIMESTAMP_FROM_JSON(j, p, contract_expiry);
@@ -287,11 +287,11 @@ struct Product {
287287
bool limit_only;
288288
bool post_only;
289289
bool trading_disabled;
290-
bool aution_mode;
290+
bool auction_mode;
291291
bool view_only;
292292
FcmTradingSessionDetails fcm_trading_session_details;
293293
FutureProductDetails future_product_details;
294-
EquityProductDetails equity_project_details;
294+
EquityProductDetails equity_product_details;
295295
};
296296

297297
inline void from_json(const json& j, Product& p) {

include/coinbase/trades.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct Trades {
2525
Side side;
2626
};
2727

28-
inline void from_json(const json &j, Trades t) {
28+
inline void from_json(const json &j, Trades &t) {
2929
VARIABLE_FROM_JSON(j, t, trade_id);
3030
VARIABLE_FROM_JSON(j, t, product_id);
3131
VARIABLE_FROM_JSON(j, t, exchange);

include/coinbase/utils.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ inline std::string to_string(double value, Side side, double min_increment) {
149149
#define DOUBLE_FROM_JSON(j, o, field) o.##field = double_from_json(j, #field)
150150
#define INT_FROM_JSON(j, o, field) o.##field = int_from_json(j, #field)
151151
#define VARIABLE_FROM_JSON(j, o, field) if (j.contains(#field)) try { j.at(#field).get_to(o.##field); } catch(const std::exception &e) { LOG_INFO(#field " {}", j.dump()); LOG_ERROR(e.what()); }
152-
#define BOOL_FROM_JSON(j, o, field) if (!j[#field].is_null() && j[#field].is_string()) { if (j[#field] == "true") o.##field = true; else if (j[#field] == "false") o.##field = false; } else try { j.at(#field).get_to(o.##field); } catch(const std::exception &e) { LOG_INFO(#field " {}", j.dump()); LOG_ERROR(e.what()); }
153-
#define STRUCT_FROM_JSON(j, o, field) if (!j[#field].is_null()) from_json(j[#field], o.##field)
152+
#define BOOL_FROM_JSON(j, o, field) if (j.contains(#field) && !j[#field].is_null() && j[#field].is_string()) { if (j[#field] == "true") o.##field = true; else if (j[#field] == "false") o.##field = false; } else try { j.at(#field).get_to(o.##field); } catch(const std::exception &e) { LOG_INFO(#field " {}", j.dump()); LOG_ERROR(e.what()); }
153+
#define STRUCT_FROM_JSON(j, o, field) if (j.contains(#field) && !j[#field].is_null()) from_json(j[#field], o.##field)
154154
#define ENUM_FROM_JSON(j, o, field) if (j.contains(#field)) try { o.##field = to_##field(j.at(#field).get<std::string_view>()); } catch(const std::exception &e) { LOG_INFO(#field " {}", j.dump()); LOG_ERROR(e.what()); }
155155
} // end namespace coinbase

include/coinbase/websocket.hpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ struct UserThreadWebsocketCallbacks : public DataHandler, public WebsocketCallba
131131
~UserThreadWebsocketCallbacks() override = default;
132132

133133
// Process data in the user thread. Callbacks will be invoked in the user thread.
134-
void processData();
134+
void processData(uint32_t max_drain_count = 100);
135135

136136
private:
137137
bool checkMarketDataSequenceNumber(void *ws_client, int64_t seq_num) override {
138138
auto it = md_seq_nums_.find(ws_client);
139139
if (it == md_seq_nums_.end()) [[unlikely]] {
140-
it = md_seq_nums_.emplace(ws_client, -1).first;
140+
it = md_seq_nums_.emplace(ws_client, seq_num - 1).first;
141141
}
142142
auto &seq_atomic = it->second;
143143
auto last_seq = seq_atomic.load(std::memory_order_acquire);
@@ -153,7 +153,7 @@ struct UserThreadWebsocketCallbacks : public DataHandler, public WebsocketCallba
153153
bool checkUserDataSequenceNumber(void *ws_client, int64_t seq_num) override {
154154
auto it = user_seq_nums_.find(ws_client);
155155
if (it == user_seq_nums_.end()) [[unlikely]] {
156-
it = user_seq_nums_.emplace(ws_client, -1).first;
156+
it = user_seq_nums_.emplace(ws_client, seq_num - 1).first;
157157
}
158158
auto &seq_atomic = it->second;
159159
auto last_seq = seq_atomic.load(std::memory_order_acquire);
@@ -231,9 +231,13 @@ class WebSocketClient {
231231

232232

233233

234-
inline void UserThreadWebsocketCallbacks::processData() {
235-
auto [data_ptr, data_size] = data_queue_.read(read_cursor_);
236-
if (data_ptr && data_size > 0) {
234+
inline void UserThreadWebsocketCallbacks::processData(uint32_t max_drain_count) {
235+
uint32_t i = 0;
236+
do {
237+
auto [data_ptr, data_size] = data_queue_.read(read_cursor_);
238+
if (!data_ptr || data_size == 0) {
239+
break;
240+
}
237241
void* client = nullptr;
238242
memcpy(&client, data_ptr, sizeof(void*));
239243
char channel = data_ptr[sizeof(void*)];
@@ -245,6 +249,7 @@ inline void UserThreadWebsocketCallbacks::processData() {
245249
processMarketData(client, data_ptr, data_size - sizeof(void*) - 1);
246250
}
247251
}
252+
while(++i < max_drain_count);
248253
}
249254

250255

@@ -417,8 +422,8 @@ inline void WebSocketClient::runDataLogger() {
417422
if (!data) {
418423
break;
419424
}
420-
++data; // skip channel identifier
421-
data_log_.write(data, size - 1);
425+
data += sizeof(void*) + 1; // skip client and channel identifier
426+
data_log_.write(data, size - sizeof(void*) - 1);
422427
data_log_ << std::endl;
423428
}
424429
}
@@ -450,7 +455,7 @@ inline void WebSocketClient::onUserData(const char* data, std::size_t size) {
450455
// If data_queue_ is not nullptr, data logger is enabled.
451456
// dispatch data for logging
452457
if (data_queue_) {
453-
dispatchData(data, size, 'M');
458+
dispatchData(data, size, 'U');
454459
}
455460
}
456461
}
@@ -517,7 +522,7 @@ inline bool DataHandler::processMarketData(void *ws_client, const char* data, st
517522
processHeartbeat(j);
518523
}
519524
else {
520-
LOG_ERROR("unknown channel: {}", to_string(channel));
525+
LOG_ERROR("unknown channel: {}", channel.is_string() ? channel.get<std::string_view>() : channel.dump());
521526
}
522527
}
523528
catch (const std::exception &e) {
@@ -545,7 +550,7 @@ inline bool DataHandler::processUserData(void *ws_client, const char* data, std:
545550
else if (channel == "futures_balance_summary") {
546551
}
547552
else {
548-
LOG_ERROR("unknown channel: {}", to_string(channel));
553+
LOG_ERROR("unknown channel: {}", channel.is_string() ? channel.get<std::string_view>() : channel.dump());
549554
}
550555
}
551556
catch (const std::exception &e) {
@@ -652,6 +657,10 @@ inline bool DataHandler::processHeartbeat([[maybe_unused]] const json& j) {
652657
}
653658

654659
inline bool DataHandler::checkMarketDataSequenceNumber([[maybe_unused]] void* client, int64_t seq_num) {
660+
if (last_md_seq_num_ < 0) [[unlikely]] {
661+
last_md_seq_num_ = seq_num;
662+
return true;
663+
}
655664
if (seq_num != last_md_seq_num_ + 1) {
656665
LOG_ERROR("market data message lost. seq_num: {}, last_md_seq_num: {}", seq_num, last_md_seq_num_);
657666
callbacks_->onMarketDataGap();
@@ -662,6 +671,10 @@ inline bool DataHandler::checkMarketDataSequenceNumber([[maybe_unused]] void* cl
662671
}
663672

664673
inline bool DataHandler::checkUserDataSequenceNumber([[maybe_unused]] void* client, int64_t seq_num) {
674+
if (last_user_seq_num_ < 0) [[unlikely]] {
675+
last_user_seq_num_ = seq_num;
676+
return true;
677+
}
665678
if (seq_num != last_user_seq_num_ + 1) {
666679
LOG_ERROR("user data message lost. seq_num: {}, last_user_seq_num: {}", seq_num, last_user_seq_num_);
667680
callbacks_->onUserDataGap();

0 commit comments

Comments
 (0)