Skip to content

further threadsafety hardening #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![Linux status](https://img.shields.io/github/workflow/status/vthiery/cpp-statsd-client/Linux?label=Linux&style=for-the-badge)](https://github.com/vthiery/cpp-statsd-client/actions/workflows/linux.yml?query=branch%3Amaster++)
[![Windows status](https://img.shields.io/github/workflow/status/vthiery/cpp-statsd-client/Windows?label=Windows&style=for-the-badge)](https://github.com/vthiery/cpp-statsd-client/actions/workflows/windows.yml?query=branch%3Amaster++)

A header-only StatsD client implemented in C++.
A thread-safe header-only StatsD client implemented in C++.
The client allows:

- batching,
Expand Down
100 changes: 42 additions & 58 deletions include/cpp-statsd-client/StatsdClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,39 @@
#include <cpp-statsd-client/UDPSender.hpp>
#include <cstdint>
#include <cstdio>
#include <functional>
#include <iomanip>
#include <memory>
#include <random>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

namespace Statsd {

namespace detail {
inline std::string sanitizePrefix(std::string prefix) {
// For convenience we provide the dot when generating the stat message
if (!prefix.empty() && prefix.back() == '.') {
prefix.pop_back();
}
return prefix;
}

inline float rand(unsigned int seed) {
static thread_local std::mt19937 twister(seed);
static thread_local std::uniform_real_distribution<float> dist(0.f, 1.f);
return dist(twister);
}

// All supported metric types
constexpr char METRIC_TYPE_COUNT[] = "c";
constexpr char METRIC_TYPE_GAUGE[] = "g";
constexpr char METRIC_TYPE_TIMING[] = "ms";
constexpr char METRIC_TYPE_SET[] = "s";
} // namespace detail
Comment on lines +18 to +38
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this up above the constructor for the class so i could make use of the random function as a default argument


/*!
*
* Statsd client
Expand All @@ -27,8 +51,8 @@ namespace Statsd {
* nor prepend one to the key
*
* The sampling frequency is specified per call and uses a
* random number generator to determine whether or not the stat
* will be recorded this time or not.
* random number generator (optionally user-specified) to
* determine whether the stat will be recorded this time or not.
*
* The top level configuration includes 2 optional parameters
* that determine how the stats are delivered to statsd. These
Expand Down Expand Up @@ -57,6 +81,10 @@ namespace Statsd {
*/
class StatsdClient {
public:
//! A functor that returns a value between 0 and 1 used
//! to determine whether a given message is sampled.
using FrequencyFunc = std::function<float()>;

//!@name Constructor and destructor, non-copyable
//!@{

Expand All @@ -66,7 +94,8 @@ class StatsdClient {
const std::string& prefix,
const uint64_t batchsize = 0,
const uint64_t sendInterval = 1000,
const int gaugePrecision = 4) noexcept;
const int gaugePrecision = 4,
FrequencyFunc frequencyFunc = std::bind(detail::rand, std::random_device()())) noexcept;

StatsdClient(const StatsdClient&) = delete;
StatsdClient& operator=(const StatsdClient&) = delete;
Expand All @@ -76,14 +105,6 @@ class StatsdClient {
//!@name Methods
//!@{

//! Sets a configuration
void setConfig(const std::string& host,
const uint16_t port,
const std::string& prefix,
const uint64_t batchsize = 0,
const uint64_t sendInterval = 1000,
const int gaugePrecision = 4) noexcept;

Comment on lines -79 to -86
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowing this makes it possible to break threadsafety gaurantees so its better to just remove it. you can simply construct a new class if you want to configure it differently (should be a rare thing anyway)

//! Returns the error message as an std::string
const std::string& errorMessage() const noexcept;

Expand Down Expand Up @@ -130,11 +151,8 @@ class StatsdClient {
float frequency = 1.0f,
const std::vector<std::string>& tags = {}) const noexcept;

//! Seed the RNG that controls sampling
void seed(unsigned int seed = std::random_device()()) noexcept;

//! Flush any queued stats to the daemon
void flush() noexcept;
void flush() const noexcept;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just cleanup

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be made const given that UDPSender::flush() is not const?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its kind of a dirty trick in this case but because the udpsender is a unique_ptr and its -> dereference operator is const (since it doesnt modify the smart pointer itself) we can then freely call the non const member on the dereferenced instance of udpsender that the smart ptr returned us


//!@}

Expand All @@ -152,59 +170,30 @@ class StatsdClient {

//!@}

private:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more cleanup

//! The prefix to be used for metrics
std::string m_prefix;

//! The UDP sender to be used for actual sending
std::unique_ptr<UDPSender> m_sender;

//! The random number generator for handling sampling
mutable std::mt19937 m_randomEngine;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed as its now encapsulated inside of the random functor below


//! Fixed floating point precision of gauges
int m_gaugePrecision;
};

namespace detail {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up as previously state

inline std::string sanitizePrefix(std::string prefix) {
// For convenience we provide the dot when generating the stat message
if (!prefix.empty() && prefix.back() == '.') {
prefix.pop_back();
}
return prefix;
}

// All supported metric types
constexpr char METRIC_TYPE_COUNT[] = "c";
constexpr char METRIC_TYPE_GAUGE[] = "g";
constexpr char METRIC_TYPE_TIMING[] = "ms";
constexpr char METRIC_TYPE_SET[] = "s";
} // namespace detail
//! The function used to determine whether a message is sampled
FrequencyFunc m_frequencyFunc;
};

inline StatsdClient::StatsdClient(const std::string& host,
const uint16_t port,
const std::string& prefix,
const uint64_t batchsize,
const uint64_t sendInterval,
const int gaugePrecision) noexcept
const int gaugePrecision,
FrequencyFunc frequencyFunc) noexcept
: m_prefix(detail::sanitizePrefix(prefix)),
m_sender(new UDPSender{host, port, batchsize, sendInterval}),
m_gaugePrecision(gaugePrecision) {
// Initialize the random generator to be used for sampling
seed();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happens in the declaration as part of the default param

}

inline void StatsdClient::setConfig(const std::string& host,
const uint16_t port,
const std::string& prefix,
const uint64_t batchsize,
const uint64_t sendInterval,
const int gaugePrecision) noexcept {
m_prefix = detail::sanitizePrefix(prefix);
m_sender.reset(new UDPSender(host, port, batchsize, sendInterval));
m_gaugePrecision = gaugePrecision;
}
m_gaugePrecision(gaugePrecision),
m_frequencyFunc(std::move(frequencyFunc)) {}

inline const std::string& StatsdClient::errorMessage() const noexcept {
return m_sender->errorMessage();
Expand Down Expand Up @@ -277,8 +266,7 @@ inline void StatsdClient::send(const std::string& key,
constexpr float epsilon{0.0001f};
const bool isFrequencyOne = std::fabs(frequency - 1.0f) < epsilon;
const bool isFrequencyZero = std::fabs(frequency) < epsilon;
if (isFrequencyZero ||
(!isFrequencyOne && (frequency < std::uniform_real_distribution<float>(0.f, 1.f)(m_randomEngine)))) {
if (isFrequencyZero || (!isFrequencyOne && (frequency < m_frequencyFunc()))) {
return;
}

Expand Down Expand Up @@ -321,11 +309,7 @@ inline void StatsdClient::send(const std::string& key,
m_sender->send(buffer);
}

inline void StatsdClient::seed(unsigned int seed) noexcept {
m_randomEngine.seed(seed);
}

inline void StatsdClient::flush() noexcept {
inline void StatsdClient::flush() const noexcept {
m_sender->flush();
}

Expand Down
54 changes: 20 additions & 34 deletions tests/testStatsdClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,17 @@ void testErrorConditions() {
throwOnError(client, false, "Should not be able to resolve a ridiculous ip");
}

void testReconfigure() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionality is removed so test is removed

StatsdServer server;
throwOnError(server);

StatsdClient client("localhost", 8125, "first.");
client.increment("foo");
throwOnWrongMessage(server, "first.foo:1|c");

client.setConfig("localhost", 8125, "second");
client.increment("bar");
throwOnWrongMessage(server, "second.bar:1|c");

client.setConfig("localhost", 8125, "");
client.increment("third.baz");
throwOnWrongMessage(server, "third.baz:1|c");

client.increment("");
throwOnWrongMessage(server, ":1|c");

// TODO: test what happens with the batching after resolving the question about incomplete
// batches being dropped vs sent on reconfiguring
}

void testSendRecv(uint64_t batchSize, uint64_t sendInterval) {
StatsdServer mock_server;
std::vector<std::string> messages, expected;
std::thread server(mock, std::ref(mock_server), std::ref(messages));

std::mt19937 twister(std::random_device{}());
std::uniform_real_distribution<float> dist(0.f, 1.f);
auto rand = [&]() -> float { return dist(twister); };

// Set a new config that has the client send messages to a proper address that can be resolved
StatsdClient client("localhost", 8125, "sendRecv.", batchSize, sendInterval, 3);
StatsdClient client("localhost", 8125, "sendRecv.", batchSize, sendInterval, 3, rand);
throwOnError(client);

// TODO: I forget if we need to wait for the server to be ready here before sending the first stats
Expand All @@ -100,7 +81,7 @@ void testSendRecv(uint64_t batchSize, uint64_t sendInterval) {
expected.emplace_back("sendRecv.kiki:-1|c");

// Adjusts "toto" by +2
client.seed(19); // this seed gets a hit on the first call
twister.seed(19); // this seed gets a hit on the first call
client.count("toto", 2, 0.1f);
throwOnError(client);
expected.emplace_back("sendRecv.toto:2|c|@0.10");
Expand All @@ -120,7 +101,7 @@ void testSendRecv(uint64_t batchSize, uint64_t sendInterval) {
expected.emplace_back("sendRecv.titifloat:-123.457|g");

// Record a timing of 2ms for "myTiming"
client.seed(19);
twister.seed(19);
client.timing("myTiming", 2, 0.1f);
throwOnError(client);
expected.emplace_back("sendRecv.myTiming:2|ms|@0.10");
Expand Down Expand Up @@ -159,13 +140,20 @@ void testSendRecv(uint64_t batchSize, uint64_t sendInterval) {

// Make sure we get the exactly correct output
if (messages != expected) {
std::cerr << "Unexpected stats received by server, got:" << std::endl;
for (const auto& message : messages) {
std::cerr << message << std::endl;
for (size_t i = 0; i < expected.size(); ++i) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while i was debugging this trying to get a magical seed that would work for all examples i found changing the formatting of this output makes it easier to debug what is going on

if (i >= messages.size()) {
std::cerr << "Missing messages at index " << i << ": expected '" << expected[i] << "'" << std::endl;
break;
}
if (messages[i] != expected[i]) {
std::cerr << "Mismatch at index " << i << ": expected '" << expected[i] << "', got '" << messages[i]
<< "'" << std::endl;
break;
}
}
std::cerr << std::endl << "But we expected:" << std::endl;
for (const auto& message : expected) {
std::cerr << message << std::endl;
if (messages.size() > expected.size()) {
std::cerr << "Got more messages than expected, got " << messages.size() << ", expected " << expected.size()
<< std::endl;
}
throw std::runtime_error("Unexpected stats");
}
Expand All @@ -176,8 +164,6 @@ int main() {

// general things that should be errors
testErrorConditions();
// reconfiguring how you are sending
testReconfigure();
// no batching
testSendRecv(0, 0);
// background batching
Expand Down