-
Notifications
You must be signed in to change notification settings - Fork 517
feat(connect): support connect openTelemetry and log for 1.6 #2961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.6
Are you sure you want to change the base?
Changes from all commits
fcef98a
e59f4c5
e678d7e
b11139d
ddc7d97
78ec2c4
67c1a5c
08eb317
599ebaf
289ce6a
9dea756
faac7ab
f2ecc16
40a00d9
e0fc4e8
c3ee0b0
231b86a
57e4db1
651ab89
9650799
163c2af
a4066df
23abb0f
0e3e1e5
82abcd7
43345d4
febb8b5
a0de21a
0abd7d0
fe0cb08
d440ab2
ac93c25
2e4ac84
4395020
b62b540
7e8f404
b449d92
3ba9024
7342979
8b6c595
b2176e5
d99dd29
adfe066
0f08d0a
801f85e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # AutoMQ Log Uploader Module | ||
|
|
||
| This module provides asynchronous S3 log upload capability based on Log4j 1.x. Other submodules only need to depend on this module and configure it simply to synchronize logs to object storage. Core components: | ||
|
|
||
| - `com.automq.log.S3RollingFileAppender`: Extends `RollingFileAppender`, pushes log events to the uploader while writing to local files. | ||
| - `com.automq.log.uploader.LogUploader`: Asynchronously buffers, compresses, and uploads logs; supports configuration switches and periodic cleanup. | ||
| - `com.automq.log.uploader.S3LogConfig`/`S3LogConfigProvider`: Abstracts the configuration required for uploading. The default implementation `PropertiesS3LogConfigProvider` reads from `automq-log.properties`. | ||
|
|
||
| ## Quick Integration | ||
|
|
||
| 1. Add dependency in your module's `build.gradle`: | ||
| ```groovy | ||
| implementation project(':automq-log-uploader') | ||
| ``` | ||
| 2. Create `automq-log.properties` in the resources directory (or customize `S3LogConfigProvider`): | ||
| ```properties | ||
| log.s3.enable=true | ||
| log.s3.bucket=0@s3://your-log-bucket?region=us-east-1 | ||
| log.s3.cluster.id=my-cluster | ||
| log.s3.node.id=1 | ||
| log.s3.selector.type=controller | ||
| ``` | ||
| 3. Reference the Appender in `log4j.properties`: | ||
| ```properties | ||
| log4j.appender.s3_uploader=com.automq.log.S3RollingFileAppender | ||
| log4j.appender.s3_uploader.File=logs/server.log | ||
| log4j.appender.s3_uploader.MaxFileSize=100MB | ||
| log4j.appender.s3_uploader.MaxBackupIndex=10 | ||
| log4j.appender.s3_uploader.layout=org.apache.log4j.PatternLayout | ||
| log4j.appender.s3_uploader.layout.ConversionPattern=[%d] %p %m (%c)%n | ||
| ``` | ||
| If you need to customize the configuration provider, you can set: | ||
| ```properties | ||
| log4j.appender.s3_uploader.configProviderClass=com.example.CustomS3LogConfigProvider | ||
| ``` | ||
|
|
||
| ## Key Configuration Description | ||
|
|
||
| | Configuration Item | Description | | ||
| | ------ | ---- | | ||
| | `log.s3.enable` | Whether to enable S3 upload function. | ||
| | `log.s3.bucket` | It is recommended to use AutoMQ Bucket URI (e.g. `0@s3://bucket?region=us-east-1&pathStyle=true`). If using a shorthand bucket name, additional fields such as `log.s3.region` need to be provided. | ||
| | `log.s3.cluster.id` / `log.s3.node.id` | Used to construct the object storage path `automq/logs/{cluster}/{node}/{hour}/{uuid}`. | ||
| | `log.s3.selector.type` | Leader election strategy (`controller`, `connect-leader`, or custom). | ||
|
|
||
|
|
||
| The upload schedule can be overridden by environment variables: | ||
|
|
||
| - `AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL`: Maximum upload interval (milliseconds). | ||
| - `AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL`: Retention period (milliseconds), old objects earlier than this time will be cleaned up. | ||
|
|
||
| ### Leader Election Strategies | ||
|
|
||
| To avoid multiple nodes executing S3 cleanup tasks simultaneously, the log uploader has a built-in leader election mechanism consistent with the OpenTelemetry module: | ||
|
|
||
| 1. **controller** *(default for brokers)*: Defers to the Kafka KRaft controller leadership that AutoMQ exposes at runtime. No additional configuration is required—the broker registers a supplier and the uploader continuously checks it. | ||
| 2. **connect-leader** *(default for Kafka Connect clusters)*: Mirrors the distributed herder leader election. Works out of the box when running inside AutoMQ's Connect runtime. | ||
| 3. **custom**: Implement `com.automq.log.uploader.selector.LogUploaderNodeSelectorProvider` and register it through SPI to introduce a custom leader election strategy. | ||
|
|
||
| > **Note** | ||
| > | ||
| > Runtime-backed selectors (`controller`, `connect-leader`) no longer require the S3 uploader to be initialized after leadership registration. The selector re-evaluates the registry on every invocation, so once the hosting runtime publishes its leader supplier the uploader automatically adopts it. | ||
|
|
||
| ## Extension | ||
|
|
||
| If the application already has its own dependency injection/configuration method, you can implement `S3LogConfigProvider` and call it at startup: | ||
|
|
||
| ```java | ||
| import com.automq.log.S3RollingFileAppender; | ||
|
|
||
| S3RollingFileAppender.setConfigProvider(new CustomConfigProvider()); | ||
| ``` | ||
|
|
||
| All `S3RollingFileAppender` instances will share this provider. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| plugins { | ||
| id 'java-library' | ||
| } | ||
|
|
||
| repositories { | ||
| mavenCentral() | ||
| } | ||
|
|
||
| dependencies { | ||
| api project(':s3stream') | ||
|
|
||
| implementation project(':clients') | ||
| implementation libs.reload4j | ||
| implementation libs.slf4jApi | ||
| implementation libs.slf4jBridge | ||
| implementation libs.nettyBuffer | ||
| implementation libs.guava | ||
| implementation libs.commonLang | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| /* | ||
| * Copyright 2025, AutoMQ HK Limited. | ||
| * | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.automq.log; | ||
|
|
||
| import com.automq.log.uploader.LogRecorder; | ||
| import com.automq.log.uploader.LogUploader; | ||
| import com.automq.log.uploader.PropertiesS3LogConfigProvider; | ||
| import com.automq.log.uploader.S3LogConfig; | ||
| import com.automq.log.uploader.S3LogConfigProvider; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.log4j.RollingFileAppender; | ||
| import org.apache.log4j.spi.LoggingEvent; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class S3RollingFileAppender extends RollingFileAppender { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a little bit complex. Maybe we could keep the almost same S3RollingFileAppender and LogUploader as before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just provider custom log class, not so complex |
||
| public static final String CONFIG_PROVIDER_PROPERTY = "automq.log.s3.config.provider"; | ||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(S3RollingFileAppender.class); | ||
| private static final Object INIT_LOCK = new Object(); | ||
|
|
||
| private static volatile LogUploader logUploaderInstance; | ||
| private static volatile S3LogConfigProvider configProvider; | ||
| private static volatile boolean initializationPending; | ||
|
|
||
| private String configProviderClass; | ||
|
|
||
| public S3RollingFileAppender() { | ||
| super(); | ||
| } | ||
|
|
||
| /** | ||
| * Allows programmatic override of the LogUploader instance. | ||
| * Useful for testing or complex dependency injection scenarios. | ||
| * | ||
| * @param uploader The LogUploader instance to use. | ||
| */ | ||
| public static void setLogUploader(LogUploader uploader) { | ||
| synchronized (INIT_LOCK) { | ||
| logUploaderInstance = uploader; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Programmatically sets the configuration provider to be used by all {@link S3RollingFileAppender} instances. | ||
| */ | ||
| public static void setConfigProvider(S3LogConfigProvider provider) { | ||
| synchronized (INIT_LOCK) { | ||
| configProvider = provider; | ||
| } | ||
| triggerInitialization(); | ||
| } | ||
|
|
||
| /** | ||
| * Setter used by Log4j property configuration to specify a custom {@link S3LogConfigProvider} implementation. | ||
| */ | ||
| public void setConfigProviderClass(String configProviderClass) { | ||
| this.configProviderClass = configProviderClass; | ||
| } | ||
|
|
||
| @Override | ||
| public void activateOptions() { | ||
| super.activateOptions(); | ||
| initializeUploader(); | ||
| } | ||
|
|
||
| private void initializeUploader() { | ||
| if (logUploaderInstance != null) { | ||
| return; | ||
| } | ||
| synchronized (INIT_LOCK) { | ||
| if (logUploaderInstance != null) { | ||
| return; | ||
| } | ||
| try { | ||
| S3LogConfigProvider provider = resolveProvider(); | ||
| if (provider == null) { | ||
| LOGGER.info("No S3LogConfigProvider available; S3 log upload remains disabled."); | ||
| initializationPending = true; | ||
| return; | ||
| } | ||
| S3LogConfig config = provider.get(); | ||
| if (config == null || !config.isEnabled() || config.objectStorage() == null) { | ||
| LOGGER.info("S3 log upload is disabled by configuration."); | ||
| initializationPending = config == null; | ||
| return; | ||
| } | ||
|
|
||
| LogUploader uploader = new LogUploader(); | ||
| uploader.start(config); | ||
| logUploaderInstance = uploader; | ||
| initializationPending = false; | ||
|
|
||
| Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
| try { | ||
| uploader.close(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.warn("Failed to close LogUploader gracefully", e); | ||
| } | ||
| })); | ||
| LOGGER.info("S3RollingFileAppender initialized successfully using provider {}.", | ||
| provider.getClass().getName()); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to initialize S3RollingFileAppender", e); | ||
| initializationPending = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static void triggerInitialization() { | ||
| S3LogConfigProvider provider; | ||
| synchronized (INIT_LOCK) { | ||
| if (logUploaderInstance != null) { | ||
| return; | ||
| } | ||
| provider = configProvider; | ||
| } | ||
| if (provider == null) { | ||
| initializationPending = true; | ||
| return; | ||
| } | ||
| new S3RollingFileAppender().initializeUploader(); | ||
| } | ||
|
|
||
| private S3LogConfigProvider resolveProvider() { | ||
| S3LogConfigProvider provider = configProvider; | ||
| if (provider != null) { | ||
| return provider; | ||
| } | ||
|
|
||
| synchronized (INIT_LOCK) { | ||
| if (configProvider != null) { | ||
| return configProvider; | ||
| } | ||
|
|
||
| String providerClassName = configProviderClass; | ||
| if (StringUtils.isBlank(providerClassName)) { | ||
| providerClassName = System.getProperty(CONFIG_PROVIDER_PROPERTY); | ||
| } | ||
|
|
||
| if (StringUtils.isNotBlank(providerClassName)) { | ||
| provider = instantiateProvider(providerClassName.trim()); | ||
| if (provider == null) { | ||
| LOGGER.warn("Falling back to default configuration provider because {} could not be instantiated.", | ||
| providerClassName); | ||
| } | ||
| } | ||
|
|
||
| if (provider == null) { | ||
| provider = new PropertiesS3LogConfigProvider(); | ||
| } | ||
|
|
||
| configProvider = provider; | ||
| return provider; | ||
| } | ||
| } | ||
|
|
||
| private S3LogConfigProvider instantiateProvider(String providerClassName) { | ||
| try { | ||
| Class<?> clazz = Class.forName(providerClassName); | ||
| Object instance = clazz.getDeclaredConstructor().newInstance(); | ||
| if (!(instance instanceof S3LogConfigProvider)) { | ||
| LOGGER.error("Class {} does not implement S3LogConfigProvider.", providerClassName); | ||
| return null; | ||
| } | ||
| return (S3LogConfigProvider) instance; | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to instantiate S3LogConfigProvider {}", providerClassName, e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void subAppend(LoggingEvent event) { | ||
| super.subAppend(event); | ||
| if (!closed && logUploaderInstance != null) { | ||
| LogRecorder.LogEvent logEvent = new LogRecorder.LogEvent( | ||
| event.getTimeStamp(), | ||
| event.getLevel().toString(), | ||
| event.getLoggerName(), | ||
| event.getRenderedMessage(), | ||
| event.getThrowableStrRep()); | ||
|
|
||
| try { | ||
| logEvent.validate(); | ||
| logUploaderInstance.append(logEvent); | ||
| } catch (IllegalArgumentException e) { | ||
| errorHandler.error("Failed to validate and append log event", e, 0); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does automq-log-uploader have independent build.gradle instead of keeping the same pattern as others modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's like automq-shell.