Skip to content

Commit 2aed87a

Browse files
authored
Merge pull request #45 from newrelic/feat/logback_forwarder
feat: newrelic log-forwarder for logback
2 parents e2a7a00 + 1277e20 commit 2aed87a

File tree

27 files changed

+1146
-21
lines changed

27 files changed

+1146
-21
lines changed

dropwizard/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies {
2424
implementation("io.dropwizard:dropwizard-request-logging:1.3.14")
2525
implementation("javax.servlet:javax.servlet-api:3.1.0")
2626

27-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
27+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
2828
includeInJar(project(":logback")) {
2929
isTransitive = false
3030
}

examples/dropwizard-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ repositories {
1212
dependencies {
1313
implementation("io.dropwizard:dropwizard-core:1.3.14")
1414
implementation(project(":dropwizard"))
15-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
15+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
1616
}
1717

1818
configure<JavaPluginConvention> {

examples/jul-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ repositories {
1212

1313
dependencies {
1414
implementation(project(":jul"))
15-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
15+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
1616
}
1717

1818

examples/log4j1-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies {
1515

1616
implementation("log4j:log4j:1.2.17")
1717
implementation("com.fasterxml.jackson.core:jackson-core:2.11.1")
18-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
18+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
1919
}
2020

2121

examples/log4j2-app/build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ repositories {
1111
}
1212

1313
dependencies {
14-
implementation("org.apache.logging.log4j:log4j-api:2.17.1")
15-
implementation("org.apache.logging.log4j:log4j-core:2.17.1")
14+
implementation("org.apache.logging.log4j:log4j-api:2.17.2")
15+
implementation("org.apache.logging.log4j:log4j-core:2.17.2")
1616
runtimeOnly(project(":log4j2"))
1717

1818
implementation("com.fasterxml.jackson.core:jackson-core:2.11.1")
1919
implementation("com.lmax:disruptor:3.4.2")
20-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
20+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
2121
}
2222

2323

examples/logback-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ dependencies {
1616
implementation("ch.qos.logback:logback-classic:1.2.3")
1717
implementation("com.fasterxml.jackson.core:jackson-core:2.11.1")
1818

19-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
19+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
2020
}
2121

2222

examples/logback11-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ dependencies {
1616
implementation("ch.qos.logback:logback-classic:1.1.1")
1717
implementation("com.fasterxml.jackson.core:jackson-core:2.11.1")
1818

19-
implementation("com.newrelic.agent.java:newrelic-api:7.4.3")
19+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
2020
}
2121

2222

forwarder/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Log Forwarder
2+
3+
A log forwarder built using the [New Relic Java Telemetry SDK](https://github.com/newrelic/newrelic-telemetry-sdk-java).
4+
5+
## Usage
6+
7+
The `forwarder` module is currently used by the following:
8+
* [Logback 1.2](../logback/README.md#using-the-http-log-forwarder-from-within-your-app) - See `NewRelicHttpAppender`
9+
10+
## Configuration
11+
12+
The `forwarder` provides the following configuration options via `LogForwarderConfiguration`:
13+
* `endpoint`: The `endpoint` defaults to New Relic US production environments (https://log-api.newrelic.com/log/v1) and will need to be configured for other environments (e.g. EU production should instead
14+
use https://log-api.eu.newrelic.com/log/v1).
15+
* `license`: The `license` will be picked up from the java-agent if installed, but you can override it if you want.
16+
* `maxQueuedLogs`: Maximum number of logs queued in memory waiting to be sent.
17+
* `maxLogsPerBatch`: Maximum number of logs per batch (request) to NewRelic.
18+
* `maxTerminationTimeSeconds`: Number of seconds to wait for graceful shutdown of its executor.
19+
* `flushIntervalSeconds`: Time period and initial delay when scheduling a task at a fixed rate.
20+
* `maxScheduledLogsToBeAppended`: Maximum scheduled logs to be appended. This is used to prevent the log forwarder from accepting more logs when we reach this number of jobs in the scheduler.

forwarder/build.gradle.kts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
plugins {
2+
java
3+
}
4+
5+
group = "com.newrelic.logging"
6+
version = "1.0-SNAPSHOT"
7+
8+
repositories {
9+
mavenCentral()
10+
}
11+
12+
dependencies {
13+
implementation("com.newrelic.agent.java:newrelic-api:7.6.0")
14+
implementation("com.newrelic.telemetry:telemetry-core:0.13.1")
15+
implementation("com.newrelic.telemetry:telemetry-http-okhttp:0.13.1")
16+
17+
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
18+
testImplementation("org.mockito:mockito-core:3.4.4")
19+
testImplementation("ch.qos.logback:logback-classic:1.2.3")
20+
testImplementation(project(":core-test"))
21+
}
22+
23+
tasks.getByName<Test>("test") {
24+
useJUnitPlatform()
25+
}
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright 2022 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.newrelic.logging.forwarder;
7+
8+
import com.newrelic.api.agent.Agent;
9+
import com.newrelic.api.agent.NewRelic;
10+
import com.newrelic.telemetry.Attributes;
11+
import com.newrelic.telemetry.LogBatchSenderFactory;
12+
import com.newrelic.telemetry.OkHttpPoster;
13+
import com.newrelic.telemetry.SenderConfiguration;
14+
import com.newrelic.telemetry.TelemetryClient;
15+
import com.newrelic.telemetry.logs.Log;
16+
import com.newrelic.telemetry.logs.LogBatch;
17+
import com.newrelic.telemetry.logs.LogBatchSender;
18+
19+
import java.net.MalformedURLException;
20+
import java.net.URL;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.ScheduledThreadPoolExecutor;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Supplier;
29+
30+
/**
31+
* A LogForwarder that will forward the logs using the NewRelic Telemetry SDK.
32+
*
33+
* This logic is in a separate class, so it can be reused by multiple appender
34+
* implementations across different logging libraries.
35+
*/
36+
public class LogForwarder {
37+
38+
private static final String LICENSE_KEY_CONFIG_FIELD = "license_key";
39+
private static final boolean USE_DAEMON_THREADS = true;
40+
private static final String PLUGIN_TYPE_KEY = "plugin.type";
41+
42+
private final String pluginType;
43+
private final BlockingQueue<Log> logs;
44+
private final LogForwarderConfiguration configuration;
45+
private final ScheduledThreadPoolExecutor executor;
46+
private final LogForwarderNotificationHandler notificationHandler;
47+
private TelemetryClient client;
48+
49+
/**
50+
* Initialize {@link TelemetryClient} with a {@link LogBatchSender} that will forward logs to newrelic each second
51+
* and also manage the retry logic if the requests are failing.
52+
*
53+
* LogBatches will be sent to the TelemetryClient each second or each time the limit defined by
54+
* {@link LogForwarderConfiguration#getMaxLogsPerBatch()} is reached.
55+
*
56+
* LogBatches will be forwarded to NewRelic each second by the TelemetryClient.
57+
*
58+
* Logs will be dropped when {@link LogForwarderConfiguration#getMaxQueuedLogs()} is reached.
59+
*
60+
* @param givenPluginType the logging library using the forwarder.
61+
* @param givenConfiguration the log forwarder configuration.
62+
*/
63+
public LogForwarder(String givenPluginType, LogForwarderConfiguration givenConfiguration) {
64+
if (agentSupplier.get() == null) throw new RuntimeException("NewRelic java-log-extensions requires the NewRelic Java Agent installed and set to work.");
65+
pluginType = givenPluginType;
66+
configuration = givenConfiguration;
67+
logs = new LinkedBlockingQueue<>(configuration.getMaxLogsPerBatch());
68+
executor = new ScheduledThreadPoolExecutor(1, Threads.daemonNamedThreadFactory("log-batcher-scheduler"));
69+
notificationHandler = new LogForwarderNotificationHandler(pluginType);
70+
}
71+
72+
/**
73+
* Start the scheduled tasks that:
74+
* - Create log batches.
75+
* - Notify about dropped logs.
76+
*
77+
* Start the {@link TelemetryClient} that will send the logs to the NewRelic Log API.
78+
*/
79+
public void start() {
80+
executor.scheduleAtFixedRate(this::addBatchWithCurrentLogs, configuration.getFlushIntervalSeconds(), configuration.getFlushIntervalSeconds(), TimeUnit.SECONDS);
81+
client = createTelemetryClient(generateSenderConfiguration());
82+
client.withNotificationHandler(notificationHandler);
83+
}
84+
85+
/**
86+
* Schedule the log to be appended to a LogBatch.
87+
*
88+
* If the current queue size is bigger than the maxLogsPerBatch we drop the log. We use
89+
* the maxLogsPerBatch instead a custom configurations because we only want to limit
90+
* the queue somehow to prevent the log forwarder taking so much memory and affecting
91+
* the application.
92+
*
93+
* @param log the log to be appended.
94+
*/
95+
public void append(Log log) {
96+
scheduleLog(new RetryableLog(log));
97+
}
98+
99+
/**
100+
* Shutdowns the telemetry sdk client and the executor
101+
*/
102+
public void shutdown() {
103+
addBatchWithCurrentLogs();
104+
if (client != null) {
105+
client.shutdown();
106+
}
107+
if (executor != null) {
108+
executor.shutdown();
109+
}
110+
if (notificationHandler != null) {
111+
notificationHandler.shutdown();
112+
}
113+
}
114+
115+
/**
116+
* Generate a default configuration for the Telemetry SDK.
117+
*
118+
* @return the sender default configuration.
119+
*/
120+
private SenderConfiguration generateSenderConfiguration() {
121+
return LogBatchSenderFactory
122+
.fromHttpImplementation(OkHttpPoster::new)
123+
.configureWith(getLicense())
124+
.endpoint(getEndpoint())
125+
.build();
126+
}
127+
128+
/**
129+
* Create a telemetry client using the information from {@link LogForwarderConfiguration}.
130+
*
131+
* We only set the LogBatchSender since is the only one we're going to use.
132+
*
133+
* @param senderConfiguration the sender configuration.
134+
* @return the telemetry client.
135+
*/
136+
protected TelemetryClient createTelemetryClient(SenderConfiguration senderConfiguration) {
137+
return new TelemetryClient(
138+
null,
139+
null,
140+
null,
141+
LogBatchSender.create(senderConfiguration),
142+
configuration.getMaxTerminationTimeSeconds(),
143+
USE_DAEMON_THREADS,
144+
configuration.getMaxQueuedLogs()
145+
);
146+
}
147+
148+
/**
149+
* Add a log to the executor queue that will append the given log to the current batch.
150+
*
151+
* @param retryableLog a log wrapped in a RetryableLog to manage the retrying if needed.
152+
*/
153+
private void scheduleLog(RetryableLog retryableLog) {
154+
if (executor.getQueue().size() >= configuration.getMaxScheduledLogsToBeAppended()) {
155+
droppedLog();
156+
} else {
157+
executor.execute(() -> appendWithRetry(retryableLog));
158+
}
159+
}
160+
161+
/**
162+
* Schedule adding a log to the queue with the back off time provided by the RetryableLog.
163+
*
164+
* If max retries is reached or the maxQueuedLogAppend is reached, the log will be dropped.
165+
*
166+
* @param retryableLog the log to be appended.
167+
*/
168+
private void scheduleLogWithDelay(RetryableLog retryableLog) {
169+
long waitTime = retryableLog.retryBackOffTime();
170+
if (waitTime == -1) {
171+
droppedLog();
172+
return;
173+
}
174+
175+
if (executor.getQueue().size() >= configuration.getMaxScheduledLogsToBeAppended()) {
176+
droppedLog();
177+
} else {
178+
executor.schedule(() -> scheduleLog(retryableLog), waitTime, TimeUnit.MILLISECONDS);
179+
}
180+
}
181+
182+
/**
183+
* To avoid blocking the main thread this method should be executed by
184+
* a scheduled action.
185+
*
186+
* If the max of log per batch is raised, then we will create a batch with current
187+
* queued logs and send it to the telemetry SDK.
188+
*
189+
* If the logs queue refuses to add a new log, we schedule a retry..
190+
*
191+
* @param retryableLog the log to be appended.
192+
*/
193+
private void appendWithRetry(RetryableLog retryableLog) {
194+
if (logs.size() >= configuration.getMaxLogsPerBatch()) {
195+
addBatchWithCurrentLogs();
196+
}
197+
198+
if (!logs.offer(retryableLog.getLog())) {
199+
scheduleLogWithDelay(retryableLog);
200+
}
201+
}
202+
203+
/**
204+
* Create a batch adding the plugin type attribute for given collection of logs.
205+
*
206+
* @param logsToBeAdded the logs to be added on the batch.
207+
* @return the telemetry log batch object.
208+
*/
209+
private LogBatch createBatch(Collection<Log> logsToBeAdded) {
210+
final Attributes attributes = new Attributes();
211+
attributes.put(PLUGIN_TYPE_KEY, pluginType);
212+
return new LogBatch(logsToBeAdded, attributes);
213+
}
214+
215+
/**
216+
* Drain the current logs queue and create a batch with those logs if not empty.
217+
*
218+
* Empty could occur if the application is not emitting logs and the scheduled action to send logs
219+
* is triggered (each getFlushIntervalSeconds).
220+
*/
221+
private void addBatchWithCurrentLogs() {
222+
final List<Log> logsToBeAdded = new ArrayList<>();
223+
logs.drainTo(logsToBeAdded, configuration.getMaxLogsPerBatch());
224+
if (logsToBeAdded.size() > 0) {
225+
client.sendBatch(createBatch(logsToBeAdded));
226+
}
227+
}
228+
229+
private void droppedLog() {
230+
notificationHandler.noticeDroppedLog();
231+
}
232+
233+
/**
234+
* Get the endpoint from the configuration.
235+
*
236+
* The endpoint defaults to the us-production if the customer doesn't provide a custom one in the configuration.
237+
*
238+
* @return the logs API URL.
239+
*/
240+
private URL getEndpoint() {
241+
try {
242+
return new URL(configuration.getEndpoint());
243+
} catch (MalformedURLException e) {
244+
throw new RuntimeException("Invalid newrelic log endpoint.", e);
245+
}
246+
}
247+
248+
/**
249+
* Get the license form the configuration or takes the one given in the agent.
250+
*
251+
* @return the provided license or fallback to the agent one.
252+
*/
253+
private String getLicense() {
254+
if (!configuration.getLicense().isEmpty()) {
255+
return configuration.getLicense();
256+
}
257+
return getAgentLicense();
258+
}
259+
260+
private String getAgentLicense() {
261+
return agentSupplier.get().getConfig().getValue(LICENSE_KEY_CONFIG_FIELD);
262+
}
263+
264+
// visible for testing
265+
public static Supplier<Agent> agentSupplier = NewRelic::getAgent;
266+
}

0 commit comments

Comments
 (0)