Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,29 @@ import_types!(schema = "src/kafka/table.json");
impl KafkaTable {
pub fn subject(&self) -> Cow<'_, str> {
match &self.value_subject {
None => Cow::Owned(format!("{}-value", self.topic)),
None => {
// For single topic, use the standard subject naming convention
// For patterns, the value_subject should be explicitly set
let topic = self
.topic
.as_ref()
.map(|t| format!("{}-value", t))
.unwrap_or_else(|| "unknown-value".to_string());
Cow::Owned(topic)
}
Some(s) => Cow::Borrowed(s),
}
}

/// Returns the topic name or pattern for display purposes
pub fn topic_display(&self) -> String {
self.topic.clone().unwrap_or_else(|| {
self.topic_pattern
.clone()
.map(|p| format!("pattern:{}", p))
.unwrap_or_else(|| "unknown".to_string())
})
}
}

pub struct KafkaConnector {}
Expand Down Expand Up @@ -150,8 +169,29 @@ impl KafkaConnector {
}
};

let topic = options.pull_opt_str("topic")?;
let topic_pattern = options.pull_opt_str("topic_pattern")?;

// Validate: exactly one of topic or topic_pattern must be set
match (&topic, &topic_pattern) {
(None, None) => bail!("Either 'topic' or 'topic_pattern' must be specified"),
(Some(_), Some(_)) => bail!("Cannot specify both 'topic' and 'topic_pattern'"),
_ => {}
}

// Validate: topic_pattern can only be used with source tables
if topic_pattern.is_some() {
match &table_type {
TableType::Source { .. } => {}
TableType::Sink { .. } => {
bail!("'topic_pattern' can only be used with source tables, not sinks")
}
}
}

Ok(KafkaTable {
topic: options.pull_str("topic")?,
topic,
topic_pattern,
type_: table_type,
client_configs: options
.pull_opt_str("client_configs")?
Expand Down Expand Up @@ -207,9 +247,12 @@ impl Connector for KafkaConnector {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
ConnectionType::Source,
format!("KafkaSource<{}>", table.topic),
format!("KafkaSource<{}>", table.topic_display()),
),
TableType::Sink { .. } => (
ConnectionType::Sink,
format!("KafkaSink<{}>", table.topic_display()),
),
TableType::Sink { .. } => (ConnectionType::Sink, format!("KafkaSink<{}>", table.topic)),
};

let schema = schema
Expand Down Expand Up @@ -408,7 +451,8 @@ impl Connector for KafkaConnector {

Ok(ConstructedOperator::from_source(Box::new(
KafkaSourceFunc {
topic: table.topic,
topic: table.topic.clone(),
topic_pattern: table.topic_pattern.clone(),
bootstrap_servers: profile.bootstrap_servers.to_string(),
group_id: group_id.clone(),
group_id_prefix: group_id_prefix.clone(),
Expand Down Expand Up @@ -446,7 +490,8 @@ impl Connector for KafkaConnector {
write_futures: vec![],
client_config: client_configs(&profile, Some(table.clone()))?,
context: Context::new(Some(profile.clone())),
topic: table.topic,
// Safe to unwrap: validation ensures sink tables have a topic
topic: table.topic.clone().expect("Sink tables must have a topic"),
serializer: ArrowSerializer::new(
config.format.expect("Format must be defined for KafkaSink"),
),
Expand Down Expand Up @@ -767,10 +812,14 @@ impl KafkaTester {

self.info(&mut tx, "Connected to Kafka").await;

let topic = table.topic.clone();
// For testing, topic must be specified (not topic_pattern)
let topic = table
.topic
.as_ref()
.ok_or_else(|| anyhow!("Testing requires a specific topic, not a pattern"))?;

let metadata = client
.fetch_metadata(Some(&topic), Duration::from_secs(10))
.fetch_metadata(Some(topic.as_str()), Duration::from_secs(10))
.map_err(|e| anyhow!("Failed to fetch metadata: {:?}", e))?;

self.info(&mut tx, "Fetched topic metadata").await;
Expand Down
104 changes: 84 additions & 20 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use super::{Context, SourceOffset, StreamConsumer};
mod test;

pub struct KafkaSourceFunc {
pub topic: String,
pub topic: Option<String>,
pub topic_pattern: Option<String>,
pub bootstrap_servers: String,
pub group_id: Option<String>,
pub group_id_prefix: Option<String>,
Expand All @@ -45,13 +46,31 @@ pub struct KafkaSourceFunc {
pub metadata_fields: Vec<MetadataField>,
}

#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
pub struct KafkaState {
topic: String,
partition: i32,
offset: i64,
}

/// Key for storing Kafka state: (topic, partition)
#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, Hash)]
pub struct KafkaStateKey {
topic: String,
partition: i32,
}

impl KafkaSourceFunc {
/// Returns the topic name for display purposes (either the exact topic or the pattern)
fn topic_display(&self) -> String {
self.topic.clone().unwrap_or_else(|| {
self.topic_pattern
.clone()
.map(|p| format!("pattern:{}", p))
.unwrap_or_else(|| "unknown".to_string())
})
}

async fn get_consumer(&mut self, ctx: &mut SourceContext) -> anyhow::Result<StreamConsumer> {
info!("Creating kafka consumer for {}", self.bootstrap_servers);
let mut client_config = ClientConfig::new();
Expand Down Expand Up @@ -86,7 +105,7 @@ impl KafkaSourceFunc {
.set("bootstrap.servers", &self.bootstrap_servers)
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "false")
.set("group.id", group_id)
.set("group.id", &group_id)
.create_with_context(self.context.clone())?;

// NOTE: this is required to trigger an oauth token refresh (when using
Expand All @@ -95,9 +114,40 @@ impl KafkaSourceFunc {
bail!("unexpected recv before assignments");
}

// Handle topic pattern subscription (uses Kafka's group coordination)
if let Some(pattern) = &self.topic_pattern {
info!(
"Subscribing to topic pattern '{}' with group_id '{}'",
pattern, group_id
);

// rdkafka expects patterns to start with '^' for regex matching
let pattern_str = if pattern.starts_with('^') {
pattern.clone()
} else {
format!("^{}", pattern)
};

consumer
.subscribe(&[&pattern_str])
.context("Failed to subscribe to topic pattern")?;

info!(
"Successfully subscribed to pattern '{}' - Kafka will handle partition assignment",
pattern_str
);

return Ok(consumer);
}

// Handle single topic (existing behavior with manual partition assignment)
let topic = self.topic.as_ref().ok_or_else(|| {
anyhow::anyhow!("Either 'topic' or 'topic_pattern' must be specified")
})?;

let state: Vec<_> = ctx
.table_manager
.get_global_keyed_state::<i32, KafkaState>("k")
.get_global_keyed_state::<KafkaStateKey, KafkaState>("k")
.await?
.get_all()
.values()
Expand All @@ -106,10 +156,13 @@ impl KafkaSourceFunc {
// did we restore any partitions?
let has_state = !state.is_empty();

let state: HashMap<i32, KafkaState> = state.iter().map(|s| (s.partition, **s)).collect();
let metadata = consumer.fetch_metadata(Some(&self.topic), Duration::from_secs(30))?;
let state: HashMap<(String, i32), KafkaState> = state
.iter()
.map(|s| ((s.topic.clone(), s.partition), (*s).clone()))
.collect();
let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(30))?;

info!("Fetched metadata for topic {}", self.topic);
info!("Fetched metadata for topic {}", topic);

let our_partitions: HashMap<_, _> = {
let partitions = metadata.topics()[0].partitions();
Expand All @@ -121,7 +174,7 @@ impl KafkaSourceFunc {
})
.map(|(_, p)| {
let offset = state
.get(&p.id())
.get(&(topic.clone(), p.id()))
.map(|s| Offset::Offset(s.offset))
.unwrap_or_else(|| {
if has_state {
Expand All @@ -133,14 +186,14 @@ impl KafkaSourceFunc {
}
});

((self.topic.clone(), p.id()), offset)
((topic.clone(), p.id()), offset)
})
.collect()
};

info!(
"partition map for {}-{}: {:?}",
self.topic, ctx.task_info.task_index, our_partitions
topic, ctx.task_info.task_index, our_partitions
);

let topic_partitions = TopicPartitionList::from_topic_map(&our_partitions)?;
Expand All @@ -161,9 +214,12 @@ impl KafkaSourceFunc {
.context("creating kafka consumer")?;

let rate_limiter = GovernorRateLimiter::direct(Quota::per_second(self.messages_per_second));
let mut offsets = HashMap::new();
// Track offsets by (topic, partition) to support multi-topic subscriptions
let mut offsets: HashMap<(String, i32), i64> = HashMap::new();

if consumer.assignment().unwrap().count() == 0 {
// For pattern subscriptions, we skip the initial assignment check since
// Kafka handles partition assignment dynamically via the consumer group protocol
if self.topic_pattern.is_none() && consumer.assignment().unwrap().count() == 0 {
warn!(
"Kafka Consumer {}-{} is subscribed to no partitions, as there are more subtasks than partitions... setting idle",
ctx.task_info.operator_id, ctx.task_info.task_index
Expand Down Expand Up @@ -227,7 +283,8 @@ impl KafkaSourceFunc {
collector.flush_buffer().await?;
}

offsets.insert(msg.partition(), msg.offset());
// Store offset with (topic, partition) key for multi-topic support
offsets.insert((topic.to_string(), msg.partition()), msg.offset());
rate_limiter.until_ready().await;
}
},
Expand All @@ -247,13 +304,20 @@ impl KafkaSourceFunc {
debug!("starting checkpointing {}", ctx.task_info.task_index);
let mut topic_partitions = TopicPartitionList::new();
let s = ctx.table_manager.get_global_keyed_state("k").await?;
for (partition, offset) in &offsets {
s.insert(*partition, KafkaState {
partition: *partition,
offset: *offset + 1,
}).await;
for ((topic, partition), offset) in &offsets {
s.insert(
KafkaStateKey {
topic: topic.clone(),
partition: *partition,
},
KafkaState {
topic: topic.clone(),
partition: *partition,
offset: *offset + 1,
}
).await;
topic_partitions.add_partition_offset(
&self.topic, *partition, Offset::Offset(*offset)).unwrap();
topic, *partition, Offset::Offset(*offset)).unwrap();
}

if let Err(e) = consumer.commit(&topic_partitions, CommitMode::Async) {
Expand Down Expand Up @@ -305,7 +369,7 @@ impl SourceOperator for KafkaSourceFunc {
}

fn name(&self) -> String {
format!("kafka-{}", self.topic)
format!("kafka-{}", self.topic_display())
}

fn tables(&self) -> HashMap<String, TableConfig> {
Expand Down
6 changes: 4 additions & 2 deletions crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ impl KafkaTopicTester {
) -> KafkaSourceWithReads {
let mut kafka = Box::new(KafkaSourceFunc {
bootstrap_servers: self.server.clone(),
topic: self.topic.clone(),
topic: Some(self.topic.clone()),
topic_pattern: None,
group_id: self.group_id.clone(),
group_id_prefix: None,
offset_mode: SourceOffset::Earliest,
Expand Down Expand Up @@ -394,7 +395,8 @@ async fn test_kafka_with_metadata_fields() {
// Set metadata fields in KafkaSourceFunc
let mut kafka = KafkaSourceFunc {
bootstrap_servers: kafka_topic_tester.server.clone(),
topic: kafka_topic_tester.topic.clone(),
topic: Some(kafka_topic_tester.topic.clone()),
topic_pattern: None,
group_id: kafka_topic_tester.group_id.clone(),
group_id_prefix: None,
offset_mode: SourceOffset::Earliest,
Expand Down
8 changes: 6 additions & 2 deletions crates/arroyo-connectors/src/kafka/table.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
"topic": {
"title": "Topic",
"type": "string",
"description": "The Kafka topic to use for this table",
"description": "The Kafka topic to use for this table. Either 'topic' or 'topic_pattern' must be set, but not both.",
"format": "autocomplete"
},
"topic_pattern": {
"title": "Topic Pattern",
"type": "string",
"description": "A regex pattern to subscribe to multiple topics (e.g., '^logs-.*' or '^edge[0-9]+_raw$'). Uses Kafka's pattern subscription with automatic rebalancing. Either 'topic' or 'topic_pattern' must be set, but not both. Only valid for source tables."
},
"type": {
"title": "Table Type",
"oneOf": [
Expand Down Expand Up @@ -94,7 +99,6 @@
}
},
"required": [
"topic",
"type"
]
}