1
1
package io .logz .log4j2 ;
2
2
3
- import com .google .common .base .Splitter ;
4
- import io .logz .sender .HttpsRequestConfiguration ;
5
- import io .logz .sender .LogzioSender ;
6
- import io .logz .sender .SenderStatusReporter ;
7
- import io .logz .sender .com .google .common .base .Throwables ;
8
- import io .logz .sender .com .google .gson .JsonObject ;
9
- import io .logz .sender .exceptions .LogzioParameterErrorException ;
3
+ import java .io .File ;
4
+ import java .net .InetAddress ;
5
+ import java .net .UnknownHostException ;
6
+ import java .util .Arrays ;
7
+ import java .util .Date ;
8
+ import java .util .HashMap ;
9
+ import java .util .HashSet ;
10
+ import java .util .Map ;
11
+ import java .util .Set ;
12
+ import java .util .concurrent .Executors ;
13
+ import java .util .concurrent .ScheduledExecutorService ;
14
+ import java .util .concurrent .TimeUnit ;
15
+ import java .util .function .Supplier ;
16
+
10
17
import org .apache .logging .log4j .Logger ;
11
18
import org .apache .logging .log4j .Marker ;
12
19
import org .apache .logging .log4j .core .Appender ;
22
29
import org .apache .logging .log4j .status .StatusLogger ;
23
30
import org .apache .logging .log4j .util .ReadOnlyStringMap ;
24
31
25
- import java .io .File ;
26
- import java .net .InetAddress ;
27
- import java .net .UnknownHostException ;
28
- import java .util .Arrays ;
29
- import java .util .Date ;
30
- import java .util .HashMap ;
31
- import java .util .HashSet ;
32
- import java .util .Map ;
33
- import java .util .Set ;
34
- import java .util .concurrent .Executors ;
35
- import java .util .concurrent .ScheduledExecutorService ;
36
- import java .util .concurrent .TimeUnit ;
32
+ import com .google .common .base .Splitter ;
33
+ import io .logz .sender .HttpsRequestConfiguration ;
34
+ import io .logz .sender .LogzioSender ;
35
+ import io .logz .sender .SenderStatusReporter ;
36
+ import io .logz .sender .com .google .common .base .Throwables ;
37
+ import io .logz .sender .com .google .gson .JsonObject ;
38
+ import io .logz .sender .exceptions .LogzioParameterErrorException ;
37
39
38
40
@ Plugin (name = "LogzioAppender" , category = "Core" , elementType = Appender .ELEMENT_TYPE , printObject = true )
39
41
public class LogzioAppender extends AbstractAppender {
@@ -256,12 +258,17 @@ public Builder setInMemoryLogsCountCapacity(long inMemoryLogsCountCapacity) {
256
258
private final long inMemoryQueueCapacityBytes ;
257
259
private final long inMemoryLogsCountCapacity ;
258
260
private final Map <String , String > additionalFieldsMap = new HashMap <>();
259
- private ScheduledExecutorService tasksExecutor ;
261
+
262
+ // need to keep static instances of ScheduledExecutorService per LogzioAppender as
263
+ // the LogzioSender.Builder keep static instances per the given token and type
264
+ private static final Map <String , ScheduledExecutorService > tasksExecutors = new HashMap <>();
265
+
266
+ private static final Map <LogzioAppender , LogzioSender > appenderToLogzioSender = new HashMap <>();
260
267
261
268
private LogzioAppender (String name , Filter filter , final boolean ignoreExceptions , String url ,
262
- String token , String type , int drainTimeoutSec , int fileSystemFullPercentThreshold ,
263
- String queueDir , int socketTimeout , int connectTimeout , boolean addHostname ,
264
- String additionalFields , boolean debug , int gcPersistedQueueFilesIntervalSeconds ,
269
+ String token , String type , int drainTimeoutSec , int fileSystemFullPercentThreshold ,
270
+ String queueDir , int socketTimeout , int connectTimeout , boolean addHostname ,
271
+ String additionalFields , boolean debug , int gcPersistedQueueFilesIntervalSeconds ,
265
272
boolean compressRequests , boolean inMemoryQueue ,
266
273
long inMemoryQueueCapacityBytes , long inMemoryLogsCountCapacity ) {
267
274
super (name , filter , null , ignoreExceptions );
@@ -298,6 +305,7 @@ private LogzioAppender(String name, Filter filter, final boolean ignoreException
298
305
}
299
306
300
307
public void start () {
308
+ safeStopLogzioSender ();
301
309
HttpsRequestConfiguration conf ;
302
310
try {
303
311
conf = getHttpsRequestConfiguration ();
@@ -317,12 +325,15 @@ public void start() {
317
325
if (!validateQueueCapacity ()) {
318
326
return ;
319
327
}
320
- tasksExecutor = Executors .newScheduledThreadPool (1 , Log4jThreadFactory .createDaemonThreadFactory (this .getClass ().getSimpleName ()));
328
+
329
+ final ScheduledExecutorService tasksExecutor = safeExecutorCreate (() ->
330
+ Executors .newScheduledThreadPool (1 , Log4jThreadFactory .createDaemonThreadFactory (this .getClass ().getSimpleName ())));
331
+
321
332
logzioSenderBuilder
322
333
.setTasksExecutor (tasksExecutor )
323
334
.withInMemoryQueue ()
324
- .setCapacityInBytes (inMemoryQueueCapacityBytes )
325
- .setLogsCountLimit (inMemoryLogsCountCapacity )
335
+ .setCapacityInBytes (inMemoryQueueCapacityBytes )
336
+ .setLogsCountLimit (inMemoryLogsCountCapacity )
326
337
.endInMemoryQueue ();
327
338
} else {
328
339
if (!validateFSFullPercentThreshold ()) {
@@ -333,14 +344,15 @@ public void start() {
333
344
if (queueDirFile == null ) {
334
345
return ;
335
346
}
347
+ final ScheduledExecutorService tasksExecutor = safeExecutorCreate (() ->
348
+ Executors .newScheduledThreadPool (3 , Log4jThreadFactory .createDaemonThreadFactory (this .getClass ().getSimpleName ())));
336
349
337
- tasksExecutor = Executors .newScheduledThreadPool (3 , Log4jThreadFactory .createDaemonThreadFactory (this .getClass ().getSimpleName ()));
338
350
logzioSenderBuilder
339
351
.setTasksExecutor (tasksExecutor )
340
352
.withDiskQueue ()
341
- .setQueueDir (queueDirFile )
342
- .setFsPercentThreshold (fileSystemFullPercentThreshold )
343
- .setGcPersistedQueueFilesIntervalSeconds (gcPersistedQueueFilesIntervalSeconds )
353
+ .setQueueDir (queueDirFile )
354
+ .setFsPercentThreshold (fileSystemFullPercentThreshold )
355
+ .setGcPersistedQueueFilesIntervalSeconds (gcPersistedQueueFilesIntervalSeconds )
344
356
.endDiskQueue ();
345
357
}
346
358
try {
@@ -349,7 +361,13 @@ public void start() {
349
361
statusLogger .error ("Couldn't build logzio sender: " + e .getMessage (), e );
350
362
return ;
351
363
}
364
+
365
+ synchronized (appenderToLogzioSender ) {
366
+ appenderToLogzioSender .put (this , logzioSender );
367
+ }
368
+
352
369
logzioSender .start ();
370
+
353
371
super .start ();
354
372
}
355
373
@@ -424,13 +442,40 @@ private boolean validateFSFullPercentThreshold() {
424
442
@ Override
425
443
public boolean stop (final long timeout , final TimeUnit timeUnit ) {
426
444
setStopping ();
445
+
427
446
boolean stopped = super .stop (timeout , timeUnit , false );
428
- if (logzioSender != null ) logzioSender .stop ();
429
- if ( tasksExecutor != null ) tasksExecutor .shutdownNow ();
447
+
448
+ safeStopLogzioSender ();
449
+
430
450
setStopped ();
451
+
431
452
return stopped ;
432
453
}
433
454
455
+ private void safeStopLogzioSender () {
456
+ if (logzioSender == null ) {
457
+ return ;
458
+ }
459
+
460
+ boolean doStop = false ;
461
+ synchronized (appenderToLogzioSender ) {
462
+ appenderToLogzioSender .remove (this );
463
+
464
+ if (!appenderToLogzioSender .containsValue (logzioSender )) {
465
+ doStop = true ;
466
+ }
467
+ }
468
+
469
+ if (doStop ) {
470
+ statusLogger .info ("Stop {}" , logzioSender );
471
+
472
+ logzioSender .stop ();
473
+
474
+ safeExecutorTerminate ();
475
+ } else {
476
+ statusLogger .info ("Stop skipped for reused {}" , logzioSender );
477
+ }
478
+ }
434
479
435
480
@ Override
436
481
public void append (LogEvent logEvent ) {
@@ -439,6 +484,53 @@ public void append(LogEvent logEvent) {
439
484
}
440
485
}
441
486
487
+ private ScheduledExecutorService safeExecutorCreate (Supplier <ScheduledExecutorService > doCreate ) {
488
+ final ScheduledExecutorService tasksExecutor = doCreate .get ();
489
+
490
+ synchronized (tasksExecutors ) {
491
+ final String key = getExecutorKey ();
492
+
493
+ safeExecutorTerminate (key );
494
+
495
+ statusLogger .info ("Created new tasksExecutor: {} for key.length: {}" ,
496
+ tasksExecutor , key .length ());
497
+
498
+ tasksExecutors .put (key , tasksExecutor );
499
+ }
500
+
501
+ return tasksExecutor ;
502
+ }
503
+
504
+ private void safeExecutorTerminate () {
505
+ synchronized (tasksExecutors ) {
506
+ safeExecutorTerminate (getExecutorKey ());
507
+ }
508
+ }
509
+
510
+ private void safeExecutorTerminate (String key ) {
511
+ final ScheduledExecutorService tasksExecutor = tasksExecutors .remove (key );
512
+
513
+ if (tasksExecutor != null ) {
514
+ statusLogger .info ("Terminating old tasksExecutor: {} for key.length: {}" ,
515
+ tasksExecutor , key .length ());
516
+
517
+ try {
518
+ tasksExecutor .shutdownNow ();
519
+
520
+ while (!tasksExecutor .isTerminated ()) {
521
+ Thread .sleep (500 );
522
+ }
523
+ } catch (Exception e ) {
524
+ statusLogger .error ("Failed to stop old executor" , e );
525
+ }
526
+ } else {
527
+ statusLogger .info ("Skip terminating no tasksExecutor for key.length: {}" , key .length ());
528
+ }
529
+ }
530
+
531
+ private String getExecutorKey () {
532
+ return "" + logzioToken + logzioType ;
533
+ }
442
534
443
535
private JsonObject formatMessageAsJson (LogEvent loggingEvent ) {
444
536
JsonObject logMessage = new JsonObject ();
@@ -479,7 +571,6 @@ private static String getValueFromSystemEnvironmentIfNeeded(String value) {
479
571
return value ;
480
572
}
481
573
482
-
483
574
private class StatusReporter implements SenderStatusReporter {
484
575
485
576
@ Override
@@ -512,4 +603,4 @@ public void info(String msg, Throwable e) {
512
603
statusLogger .info (msg ,e );
513
604
}
514
605
}
515
- }
606
+ }
0 commit comments