Skip to content

Commit 9328bc9

Browse files
committed
osc: work on adding more features to rate limiter
1 parent 68b03dc commit 9328bc9

File tree

3 files changed

+197
-77
lines changed

3 files changed

+197
-77
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
#include <ossia/detail/config.hpp>
3+
4+
#include <chrono>
5+
6+
namespace ossia::net
7+
{
8+
struct rate_limiter_configuration
9+
{
10+
using clock = std::chrono::steady_clock;
11+
12+
// What is the rate limit
13+
std::chrono::milliseconds duration{};
14+
15+
// Put things in e.g. OSC bundles
16+
bool bundle{};
17+
18+
// When sending, send every parameters from the device, not just the last ones
19+
bool send_all{};
20+
21+
// Always send the last sent values on every tick
22+
bool repeat{};
23+
};
24+
25+
}

src/ossia/network/rate_limiting_protocol.cpp

+155-74
Original file line numberDiff line numberDiff line change
@@ -8,83 +8,162 @@
88
#if defined(__cpp_exceptions)
99
namespace ossia::net
1010
{
11+
// FIXME refactor with coalescing_queue
12+
// FIXME refactor with sleep_accurate (MIDISync.hpp)
13+
// FIXME refactor with exp wait in audio_spin_mutex
14+
1115
struct rate_limiter
1216
{
1317
rate_limiting_protocol& self;
14-
void operator()() const noexcept
18+
std::chrono::steady_clock::duration duration;
19+
std::chrono::steady_clock::duration time_to_sleep{duration};
20+
21+
std::chrono::steady_clock::time_point sleep_before()
1522
{
16-
ossia::set_thread_name("ossia ratelim");
1723
using namespace std::literals;
1824
using clock = rate_limiting_protocol::clock;
19-
const auto duration = self.m_duration.load();
20-
thread_local auto time_to_sleep = duration;
21-
while(self.m_running)
25+
auto prev_time = clock::now();
26+
if(time_to_sleep > 1ms)
27+
std::this_thread::sleep_for(time_to_sleep);
28+
return prev_time;
29+
}
30+
31+
void sleep_after(std::chrono::steady_clock::time_point prev_time)
32+
{
33+
using namespace std::literals;
34+
using clock = rate_limiting_protocol::clock;
35+
36+
auto new_time = clock::now();
37+
auto observed_duration
38+
= std::chrono::duration_cast<std::chrono::milliseconds>(new_time - prev_time);
39+
if(observed_duration > duration)
40+
{
41+
if(observed_duration >= 2 * duration)
42+
time_to_sleep = 0ms;
43+
else
44+
time_to_sleep = 2 * duration - observed_duration;
45+
}
46+
else
47+
{
48+
time_to_sleep = duration;
49+
}
50+
}
51+
};
52+
53+
template <bool Bundle, bool Repeat, bool SendAll>
54+
struct rate_limiter_impl;
55+
56+
template <>
57+
struct rate_limiter_impl<false, false, false> : rate_limiter
58+
{
59+
void operator()()
60+
{
61+
// TODO find safe way to handle if a parameter is removed
62+
// TODO instead we should do the value filtering in the parameter ...
63+
// but still have to handle cases that can be optimized, such as midi
64+
{
65+
std::lock_guard lock{self.m_msgMutex};
66+
std::swap(self.m_buffer, self.m_userMessages);
67+
}
68+
69+
// Copy newest messages in local map
70+
for(auto& msg : self.m_buffer)
71+
{
72+
if(msg.second.first.valid())
73+
{
74+
self.m_threadMessages[msg.first] = std::move(msg.second);
75+
msg.second.first = ossia::value{};
76+
}
77+
}
78+
79+
// Push the actual messages
80+
for(auto& v : self.m_threadMessages)
81+
{
82+
auto val = v.second.first;
83+
if(val.valid())
84+
{
85+
self.m_protocol->push(*v.first, v.second.first);
86+
}
87+
}
88+
89+
// Clear both containers (while keeping memory allocated for sent
90+
// messages so that it stays fast)
91+
for(auto& v : self.m_buffer)
92+
if(v.second.first.valid())
93+
v.second.first = ossia::value{};
94+
95+
for(auto& v : self.m_threadMessages)
96+
if(v.second.first.valid())
97+
v.second.first = ossia::value{};
98+
}
99+
};
100+
101+
template <>
102+
struct rate_limiter_impl<true, true, true> : rate_limiter
103+
{
104+
void operator()()
105+
{
106+
std::vector<std::pair<ossia::net::parameter_base*, ossia::value>> vec;
107+
ossia::iterate_all_children(this->self.m_device->get_root_node());
108+
109+
// bundle
110+
// send all
111+
// repeat
112+
113+
// TODO find safe way to handle if a parameter is removed
114+
// TODO instead we should do the value filtering in the parameter ...
115+
// but still have to handle cases that can be optimized, such as midi
116+
{
117+
std::lock_guard lock{self.m_msgMutex};
118+
std::swap(self.m_buffer, self.m_userMessages);
119+
}
120+
121+
// Copy newest messages in local map
122+
for(auto& msg : self.m_buffer)
123+
{
124+
if(msg.second.first.valid())
125+
{
126+
self.m_threadMessages[msg.first] = std::move(msg.second);
127+
msg.second.first = ossia::value{};
128+
}
129+
}
130+
131+
// Push the actual messages
132+
for(auto& v : self.m_threadMessages)
133+
{
134+
auto val = v.second.first;
135+
if(val.valid())
136+
{
137+
self.m_protocol->push(*v.first, v.second.first);
138+
}
139+
}
140+
141+
// Clear both containers (while keeping memory allocated for sent
142+
// messages so that it stays fast)
143+
for(auto& v : self.m_buffer)
144+
if(v.second.first.valid())
145+
v.second.first = ossia::value{};
146+
147+
for(auto& v : self.m_threadMessages)
148+
if(v.second.first.valid())
149+
v.second.first = ossia::value{};
150+
}
151+
};
152+
153+
template <bool Bundle, bool Repeat, bool SendAll>
154+
struct rate_limiter_concrete : rate_limiter_impl<Bundle, Repeat, SendAll>
155+
{
156+
void operator()()
157+
{
158+
ossia::set_thread_name("ossia ratelim");
159+
160+
while(this->self.m_running)
22161
{
23162
try
24163
{
25-
auto prev_time = clock::now();
26-
if(time_to_sleep > 1ms)
27-
std::this_thread::sleep_for(time_to_sleep);
28-
29-
// TODO find safe way to handle if a parameter is removed
30-
// TODO instead we should do the value filtering in the parameter ...
31-
// but still have to handle cases that can be optimized, such as midi
32-
{
33-
std::lock_guard lock{self.m_msgMutex};
34-
std::swap(self.m_buffer, self.m_userMessages);
35-
}
36-
37-
// Copy newest messages in local map
38-
for(auto& msg : self.m_buffer)
39-
{
40-
if(msg.second.first.valid())
41-
{
42-
self.m_threadMessages[msg.first] = std::move(msg.second);
43-
msg.second.first = ossia::value{};
44-
}
45-
}
46-
47-
// Push the actual messages
48-
for(auto& v : self.m_threadMessages)
49-
{
50-
auto val = v.second.first;
51-
if(val.valid())
52-
{
53-
self.m_protocol->push(*v.first, v.second.first);
54-
}
55-
}
56-
57-
// Clear both containers (while keeping memory allocated for sent
58-
// messages so that it stays fast)
59-
for(auto& v : self.m_buffer)
60-
{
61-
if(v.second.first.valid())
62-
{
63-
v.second.first = ossia::value{};
64-
}
65-
}
66-
67-
for(auto& v : self.m_threadMessages)
68-
{
69-
if(v.second.first.valid())
70-
{
71-
v.second.first = ossia::value{};
72-
}
73-
}
74-
auto new_time = clock::now();
75-
auto observed_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
76-
new_time - prev_time);
77-
if(observed_duration > duration)
78-
{
79-
if(observed_duration >= 2 * duration)
80-
time_to_sleep = 0ms;
81-
else
82-
time_to_sleep = 2 * duration - observed_duration;
83-
}
84-
else
85-
{
86-
time_to_sleep = duration;
87-
}
164+
const auto prev_time = this->sleep_before();
165+
rate_limiter_impl<Bundle, Repeat, SendAll>::operator()();
166+
sleep_after(prev_time);
88167
}
89168
catch(...)
90169
{
@@ -94,17 +173,17 @@ struct rate_limiter
94173
};
95174

96175
rate_limiting_protocol::rate_limiting_protocol(
97-
rate_limiting_protocol::duration d, std::unique_ptr<protocol_base> arg)
176+
rate_limiter_configuration d, std::unique_ptr<protocol_base> arg)
98177
: protocol_base{flags{SupportsMultiplex}}
99-
, m_duration{d}
178+
, m_duration{d.duration}
179+
, m_bundle{d.bundle}
180+
, m_repeat{d.repeat}
181+
, m_send_all{d.send_all}
100182
, m_protocol{std::move(arg)}
101-
102183
{
103184
m_userMessages.reserve(4096);
104185
m_buffer.reserve(4096);
105186
m_threadMessages.reserve(4096);
106-
m_lastTime = clock::now();
107-
m_thread = std::thread{rate_limiter{*this}};
108187
}
109188

110189
rate_limiting_protocol::~rate_limiting_protocol()
@@ -116,6 +195,7 @@ rate_limiting_protocol::~rate_limiting_protocol()
116195
void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d)
117196
{
118197
m_duration = d;
198+
// FIXME update thread
119199
}
120200

121201
bool rate_limiting_protocol::pull(ossia::net::parameter_base& address)
@@ -165,7 +245,8 @@ void rate_limiting_protocol::set_device(device_base& dev)
165245
{
166246
m_device = &dev;
167247
m_protocol->set_device(dev);
248+
m_lastTime = clock::now();
249+
m_thread = std::thread{rate_limiter_concrete<false, false, false>{*this, m_duration}};
168250
}
169-
170251
}
171252
#endif

src/ossia/network/rate_limiting_protocol.hpp

+17-3
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,29 @@
44
#include <ossia/network/base/message_queue.hpp>
55
#include <ossia/network/base/parameter_data.hpp>
66
#include <ossia/network/base/protocol.hpp>
7+
#include <ossia/network/rate_limiter_configuration.hpp>
78

89
#include <readerwriterqueue.h>
910

1011
#include <chrono>
1112
#include <thread>
12-
#include <vector>
1313

1414
namespace ossia::net
1515
{
1616
struct rate_limiter;
17+
18+
template <bool Bundle, bool Repeat, bool SendAll>
19+
struct rate_limiter_impl;
20+
template <bool Bundle, bool Repeat, bool SendAll>
21+
struct rate_limiter_concrete;
22+
1723
class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_base
1824
{
1925
public:
20-
using clock = std::chrono::high_resolution_clock;
26+
using clock = std::chrono::steady_clock;
2127
using duration = clock::duration;
22-
rate_limiting_protocol(duration d, std::unique_ptr<protocol_base> arg);
28+
rate_limiting_protocol(
29+
rate_limiter_configuration conf, std::unique_ptr<protocol_base> arg);
2330
~rate_limiting_protocol() override;
2431

2532
void set_duration(duration d);
@@ -46,8 +53,15 @@ class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_ba
4653
rate_limiting_protocol& operator=(rate_limiting_protocol&&) = delete;
4754

4855
friend struct rate_limiter;
56+
template <bool Bundle, bool Repeat, bool SendAll>
57+
friend struct rate_limiter_impl;
58+
template <bool Bundle, bool Repeat, bool SendAll>
59+
friend struct rate_limiter_concrete;
4960

5061
std::atomic<duration> m_duration{};
62+
bool m_bundle{};
63+
bool m_send_all{};
64+
bool m_repeat{};
5165
std::unique_ptr<ossia::net::protocol_base> m_protocol;
5266
ossia::net::device_base* m_device{};
5367

0 commit comments

Comments
 (0)