Skip to content

Commit 4e0294f

Browse files
committed
Serialize fields before queueing it to the workqueue
1 parent 520aed6 commit 4e0294f

8 files changed

+176
-292
lines changed

lib/perfdata/elasticsearchwriter.cpp

Lines changed: 33 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,13 @@ void ElasticsearchWriter::Resume()
101101
CheckResultHandler(checkable, cr);
102102
});
103103
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
104-
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
105-
StateChangeHandler(checkable, cr, type);
104+
const CheckResult::Ptr& cr, StateType, const MessageOrigin::Ptr&) {
105+
StateChangeHandler(checkable, cr);
106106
});
107-
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
107+
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr&,
108108
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
109109
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
110-
NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
110+
NotificationSentToAllUsersHandler(checkable, users, type, cr, author, text);
111111
});
112112
}
113113

@@ -236,15 +236,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
236236
if (IsPaused())
237237
return;
238238

239-
m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
240-
}
241-
242-
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
243-
{
244-
AssertOnWorkQueue();
245-
246-
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
247-
248239
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
249240
return;
250241

@@ -272,38 +263,23 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
272263
fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
273264

274265
fields->Set("reachable", checkable->IsReachable());
266+
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
275267

276-
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
277-
278-
if (commandObj)
279-
fields->Set("check_command", commandObj->GetName());
280-
281-
double ts = Utility::GetTime();
282-
283-
if (cr) {
284-
AddCheckResult(fields, checkable, cr);
285-
ts = cr->GetExecutionEnd();
286-
}
287-
268+
AddCheckResult(fields, checkable, cr);
288269
AddTemplateTags(fields, checkable, cr);
289270

290-
Enqueue(checkable, "checkresult", fields, ts);
271+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
272+
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
273+
274+
Enqueue(checkable, "checkresult", fields, ts);
275+
});
291276
}
292277

293-
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
278+
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
294279
{
295280
if (IsPaused())
296281
return;
297282

298-
m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
299-
}
300-
301-
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
302-
{
303-
AssertOnWorkQueue();
304-
305-
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
306-
307283
Host::Ptr host;
308284
Service::Ptr service;
309285
tie(host, service) = GetHostService(checkable);
@@ -325,45 +301,23 @@ void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& check
325301
fields->Set("last_hard_state", host->GetLastHardState());
326302
}
327303

328-
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
329-
330-
if (commandObj)
331-
fields->Set("check_command", commandObj->GetName());
332-
333-
double ts = Utility::GetTime();
334-
335-
if (cr) {
336-
AddCheckResult(fields, checkable, cr);
337-
ts = cr->GetExecutionEnd();
338-
}
304+
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
339305

306+
AddCheckResult(fields, checkable, cr);
340307
AddTemplateTags(fields, checkable, cr);
341308

342-
Enqueue(checkable, "statechange", fields, ts);
343-
}
309+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
310+
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
344311

345-
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
346-
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
347-
const CheckResult::Ptr& cr, const String& author, const String& text)
348-
{
349-
if (IsPaused())
350-
return;
351-
352-
m_WorkQueue.Enqueue([this, notification, checkable, users, type, cr, author, text]() {
353-
NotificationSentToAllUsersHandlerInternal(notification, checkable, users, type, cr, author, text);
312+
Enqueue(checkable, "statechange", fields, ts);
354313
});
355314
}
356315

357-
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
358-
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
359-
const CheckResult::Ptr& cr, const String& author, const String& text)
316+
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
317+
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text)
360318
{
361-
AssertOnWorkQueue();
362-
363-
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
364-
365-
Log(LogDebug, "ElasticsearchWriter")
366-
<< "Processing notification for '" << checkable->GetName() << "'";
319+
if (IsPaused())
320+
return;
367321

368322
Host::Ptr host;
369323
Service::Ptr service;
@@ -396,11 +350,7 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notifi
396350
fields->Set("notification_type", notificationTypeString);
397351
fields->Set("author", author);
398352
fields->Set("text", text);
399-
400-
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
401-
402-
if (commandObj)
403-
fields->Set("check_command", commandObj->GetName());
353+
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
404354

405355
double ts = Utility::GetTime();
406356

@@ -411,12 +361,20 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notifi
411361

412362
AddTemplateTags(fields, checkable, cr);
413363

414-
Enqueue(checkable, "notification", fields, ts);
364+
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
365+
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
366+
367+
Log(LogDebug, "ElasticsearchWriter")
368+
<< "Processing notification for '" << checkable->GetName() << "'";
369+
370+
Enqueue(checkable, "notification", fields, ts);
371+
});
415372
}
416373

417-
void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
418-
const Dictionary::Ptr& fields, double ts)
374+
void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts)
419375
{
376+
AssertOnWorkQueue();
377+
420378
/* Atomically buffer the data point. */
421379
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
422380

lib/perfdata/elasticsearchwriter.hpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,12 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
4242
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4343
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4444

45-
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
46-
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
45+
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4746
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
48-
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
49-
void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
50-
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
51-
const CheckResult::Ptr& cr, const String& author, const String& text);
52-
void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
53-
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
54-
const CheckResult::Ptr& cr, const String& author, const String& text);
47+
void NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
48+
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
5549

56-
void Enqueue(const Checkable::Ptr& checkable, const String& type,
57-
const Dictionary::Ptr& fields, double ts);
50+
void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts);
5851

5952
OptionalTlsStream Connect();
6053
void AssertOnWorkQueue();

lib/perfdata/gelfwriter.cpp

Lines changed: 41 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,14 @@ void GelfWriter::Resume()
9494
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
9595
CheckResultHandler(checkable, cr);
9696
});
97-
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
98-
const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
97+
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr&,
98+
const Checkable::Ptr& checkable, const User::Ptr&, const NotificationType& type, const CheckResult::Ptr& cr,
9999
const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
100-
NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
100+
NotificationToUserHandler(checkable, type, cr, author, commentText, commandName);
101101
});
102102
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
103-
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
104-
StateChangeHandler(checkable, cr, type);
103+
const CheckResult::Ptr& cr, StateType, const MessageOrigin::Ptr&) {
104+
StateChangeHandler(checkable, cr);
105105
});
106106
}
107107

@@ -268,18 +268,6 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
268268
if (IsPaused())
269269
return;
270270

271-
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
272-
}
273-
274-
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
275-
{
276-
AssertOnWorkQueue();
277-
278-
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
279-
280-
Log(LogDebug, "GelfWriter")
281-
<< "Processing check result for '" << checkable->GetName() << "'";
282-
283271
Host::Ptr host;
284272
Service::Ptr service;
285273
tie(host, service) = GetHostService(checkable);
@@ -306,22 +294,15 @@ void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, con
306294
fields->Set("_reachable", checkable->IsReachable());
307295

308296
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
297+
fields->Set("_check_command", checkCommand->GetName());
309298

310-
if (checkCommand)
311-
fields->Set("_check_command", checkCommand->GetName());
312-
313-
double ts = Utility::GetTime();
314-
315-
if (cr) {
316-
fields->Set("_latency", cr->CalculateLatency());
317-
fields->Set("_execution_time", cr->CalculateExecutionTime());
318-
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
319-
fields->Set("full_message", cr->GetOutput());
320-
fields->Set("_check_source", cr->GetCheckSource());
321-
ts = cr->GetExecutionEnd();
322-
}
299+
fields->Set("_latency", cr->CalculateLatency());
300+
fields->Set("_execution_time", cr->CalculateExecutionTime());
301+
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
302+
fields->Set("full_message", cr->GetOutput());
303+
fields->Set("_check_source", cr->GetCheckSource());
323304

324-
if (cr && GetEnableSendPerfdata()) {
305+
if (GetEnableSendPerfdata()) {
325306
Array::Ptr perfdata = cr->GetPerformanceData();
326307

327308
if (perfdata) {
@@ -366,31 +347,21 @@ void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, con
366347
}
367348
}
368349

369-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
370-
}
350+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
351+
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
371352

372-
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
373-
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
374-
const String& author, const String& commentText, const String& commandName)
375-
{
376-
if (IsPaused())
377-
return;
353+
Log(LogDebug, "GelfWriter")
354+
<< "Processing check result for '" << checkable->GetName() << "'";
378355

379-
m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
380-
NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
356+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
381357
});
382358
}
383359

384-
void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
385-
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
386-
const String& author, const String& commentText, const String& commandName)
360+
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
361+
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName)
387362
{
388-
AssertOnWorkQueue();
389-
390-
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
391-
392-
Log(LogDebug, "GelfWriter")
393-
<< "Processing notification for '" << checkable->GetName() << "'";
363+
if (IsPaused())
364+
return;
394365

395366
Host::Ptr host;
396367
Service::Ptr service;
@@ -430,32 +401,23 @@ void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& noti
430401
fields->Set("_command", commandName);
431402
fields->Set("_notification_type", notificationTypeString);
432403
fields->Set("_comment", authorComment);
404+
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
433405

434-
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
406+
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
407+
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
435408

436-
if (commandObj)
437-
fields->Set("_check_command", commandObj->GetName());
409+
Log(LogDebug, "GelfWriter")
410+
<< "Processing notification for '" << checkable->GetName() << "'";
438411

439-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
412+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
413+
});
440414
}
441415

442-
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
416+
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
443417
{
444418
if (IsPaused())
445419
return;
446420

447-
m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
448-
}
449-
450-
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
451-
{
452-
AssertOnWorkQueue();
453-
454-
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
455-
456-
Log(LogDebug, "GelfWriter")
457-
<< "Processing state change for '" << checkable->GetName() << "'";
458-
459421
Host::Ptr host;
460422
Service::Ptr service;
461423
tie(host, service) = GetHostService(checkable);
@@ -478,21 +440,20 @@ void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, con
478440
fields->Set("_last_hard_state", host->GetLastHardState());
479441
}
480442

481-
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
443+
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
482444

483-
if (commandObj)
484-
fields->Set("_check_command", commandObj->GetName());
445+
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
446+
fields->Set("full_message", cr->GetOutput());
447+
fields->Set("_check_source", cr->GetCheckSource());
485448

486-
double ts = Utility::GetTime();
449+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
450+
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
487451

488-
if (cr) {
489-
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
490-
fields->Set("full_message", cr->GetOutput());
491-
fields->Set("_check_source", cr->GetCheckSource());
492-
ts = cr->GetExecutionEnd();
493-
}
452+
Log(LogDebug, "GelfWriter")
453+
<< "Processing state change for '" << checkable->GetName() << "'";
494454

495-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
455+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
456+
});
496457
}
497458

498459
String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
@@ -506,6 +467,8 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
506467

507468
void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
508469
{
470+
AssertOnWorkQueue();
471+
509472
std::ostringstream msgbuf;
510473
msgbuf << gelfMessage;
511474
msgbuf << '\0';

0 commit comments

Comments
 (0)