Skip to content

How do we override default adminConfig, ProducerConfig and ConsumerConfigs. #246

@voodemsanthosh

Description

@voodemsanthosh

We are using spring cloud stream kafka binder,

              <dependency>
                   <groupId>org.springframework.cloud</groupId>
                   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
             </dependency>

We are trying to override detault consumer properties mostly for below properties.

spring:
cloud:
stream:
kafka:
#https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_problem_statement
bindings:
process-in-0:
consumer:
autoCommitOnError: true
ackEachRecord: true
max.poll.records: 100
max.poll.interval.ms: 600000
configuration:
auto.commit.interval.ms: 60000

these are not ever showing under consumer configs level

{ "date_time":"2023-08-20 19:21:20,013", "thread":"[main]", "log_level":"INFO ", "class_name":"ConsumerConfig", "globalId":"", {}, "log_message":ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 60000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [localhost:8080]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = test-input-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-input
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 60000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = security/keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = security/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
}

Please provider right documents to set or override default consumer configuration.

My problem statement:

Application designed as functional approach and we are trying to consume and produce message to kafka topic,

@Bean
public Function<Message<List<String>>, Message<String>> process() {
  return request -> {
   //logic here
  } 
}

my consumer message is large and processing take more time(1000000 ms) and it is retrying continuously.

Please help us how to resolve this offset commit for larger message and process take more time ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions