diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index c5aec67bf..1455d5851 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -62,10 +62,29 @@ import_types!(schema = "src/kafka/table.json"); impl KafkaTable { pub fn subject(&self) -> Cow<'_, str> { match &self.value_subject { - None => Cow::Owned(format!("{}-value", self.topic)), + None => { + // For single topic, use the standard subject naming convention + // For patterns, the value_subject should be explicitly set + let topic = self + .topic + .as_ref() + .map(|t| format!("{}-value", t)) + .unwrap_or_else(|| "unknown-value".to_string()); + Cow::Owned(topic) + } Some(s) => Cow::Borrowed(s), } } + + /// Returns the topic name or pattern for display purposes + pub fn topic_display(&self) -> String { + self.topic.clone().unwrap_or_else(|| { + self.topic_pattern + .clone() + .map(|p| format!("pattern:{}", p)) + .unwrap_or_else(|| "unknown".to_string()) + }) + } } pub struct KafkaConnector {} @@ -150,8 +169,29 @@ impl KafkaConnector { } }; + let topic = options.pull_opt_str("topic")?; + let topic_pattern = options.pull_opt_str("topic_pattern")?; + + // Validate: exactly one of topic or topic_pattern must be set + match (&topic, &topic_pattern) { + (None, None) => bail!("Either 'topic' or 'topic_pattern' must be specified"), + (Some(_), Some(_)) => bail!("Cannot specify both 'topic' and 'topic_pattern'"), + _ => {} + } + + // Validate: topic_pattern can only be used with source tables + if topic_pattern.is_some() { + match &table_type { + TableType::Source { .. } => {} + TableType::Sink { .. } => { + bail!("'topic_pattern' can only be used with source tables, not sinks") + } + } + } + Ok(KafkaTable { - topic: options.pull_str("topic")?, + topic, + topic_pattern, type_: table_type, client_configs: options .pull_opt_str("client_configs")? @@ -207,9 +247,12 @@ impl Connector for KafkaConnector { let (typ, desc) = match table.type_ { TableType::Source { .. } => ( ConnectionType::Source, - format!("KafkaSource<{}>", table.topic), + format!("KafkaSource<{}>", table.topic_display()), + ), + TableType::Sink { .. } => ( + ConnectionType::Sink, + format!("KafkaSink<{}>", table.topic_display()), ), - TableType::Sink { .. } => (ConnectionType::Sink, format!("KafkaSink<{}>", table.topic)), }; let schema = schema @@ -408,7 +451,8 @@ impl Connector for KafkaConnector { Ok(ConstructedOperator::from_source(Box::new( KafkaSourceFunc { - topic: table.topic, + topic: table.topic.clone(), + topic_pattern: table.topic_pattern.clone(), bootstrap_servers: profile.bootstrap_servers.to_string(), group_id: group_id.clone(), group_id_prefix: group_id_prefix.clone(), @@ -446,7 +490,8 @@ impl Connector for KafkaConnector { write_futures: vec![], client_config: client_configs(&profile, Some(table.clone()))?, context: Context::new(Some(profile.clone())), - topic: table.topic, + // Safe to unwrap: validation ensures sink tables have a topic + topic: table.topic.clone().expect("Sink tables must have a topic"), serializer: ArrowSerializer::new( config.format.expect("Format must be defined for KafkaSink"), ), @@ -767,10 +812,14 @@ impl KafkaTester { self.info(&mut tx, "Connected to Kafka").await; - let topic = table.topic.clone(); + // For testing, topic must be specified (not topic_pattern) + let topic = table + .topic + .as_ref() + .ok_or_else(|| anyhow!("Testing requires a specific topic, not a pattern"))?; let metadata = client - .fetch_metadata(Some(&topic), Duration::from_secs(10)) + .fetch_metadata(Some(topic.as_str()), Duration::from_secs(10)) .map_err(|e| anyhow!("Failed to fetch metadata: {:?}", e))?; self.info(&mut tx, "Fetched topic metadata").await; diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index 2d9e62aff..ad218f49b 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -30,7 +30,8 @@ use super::{Context, SourceOffset, StreamConsumer}; mod test; pub struct KafkaSourceFunc { - pub topic: String, + pub topic: Option, + pub topic_pattern: Option, pub bootstrap_servers: String, pub group_id: Option, pub group_id_prefix: Option, @@ -45,13 +46,31 @@ pub struct KafkaSourceFunc { pub metadata_fields: Vec, } -#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)] pub struct KafkaState { + topic: String, partition: i32, offset: i64, } +/// Key for storing Kafka state: (topic, partition) +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, Hash)] +pub struct KafkaStateKey { + topic: String, + partition: i32, +} + impl KafkaSourceFunc { + /// Returns the topic name for display purposes (either the exact topic or the pattern) + fn topic_display(&self) -> String { + self.topic.clone().unwrap_or_else(|| { + self.topic_pattern + .clone() + .map(|p| format!("pattern:{}", p)) + .unwrap_or_else(|| "unknown".to_string()) + }) + } + async fn get_consumer(&mut self, ctx: &mut SourceContext) -> anyhow::Result { info!("Creating kafka consumer for {}", self.bootstrap_servers); let mut client_config = ClientConfig::new(); @@ -86,7 +105,7 @@ impl KafkaSourceFunc { .set("bootstrap.servers", &self.bootstrap_servers) .set("enable.partition.eof", "false") .set("enable.auto.commit", "false") - .set("group.id", group_id) + .set("group.id", &group_id) .create_with_context(self.context.clone())?; // NOTE: this is required to trigger an oauth token refresh (when using @@ -95,9 +114,40 @@ impl KafkaSourceFunc { bail!("unexpected recv before assignments"); } + // Handle topic pattern subscription (uses Kafka's group coordination) + if let Some(pattern) = &self.topic_pattern { + info!( + "Subscribing to topic pattern '{}' with group_id '{}'", + pattern, group_id + ); + + // rdkafka expects patterns to start with '^' for regex matching + let pattern_str = if pattern.starts_with('^') { + pattern.clone() + } else { + format!("^{}", pattern) + }; + + consumer + .subscribe(&[&pattern_str]) + .context("Failed to subscribe to topic pattern")?; + + info!( + "Successfully subscribed to pattern '{}' - Kafka will handle partition assignment", + pattern_str + ); + + return Ok(consumer); + } + + // Handle single topic (existing behavior with manual partition assignment) + let topic = self.topic.as_ref().ok_or_else(|| { + anyhow::anyhow!("Either 'topic' or 'topic_pattern' must be specified") + })?; + let state: Vec<_> = ctx .table_manager - .get_global_keyed_state::("k") + .get_global_keyed_state::("k") .await? .get_all() .values() @@ -106,10 +156,13 @@ impl KafkaSourceFunc { // did we restore any partitions? let has_state = !state.is_empty(); - let state: HashMap = state.iter().map(|s| (s.partition, **s)).collect(); - let metadata = consumer.fetch_metadata(Some(&self.topic), Duration::from_secs(30))?; + let state: HashMap<(String, i32), KafkaState> = state + .iter() + .map(|s| ((s.topic.clone(), s.partition), (*s).clone())) + .collect(); + let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(30))?; - info!("Fetched metadata for topic {}", self.topic); + info!("Fetched metadata for topic {}", topic); let our_partitions: HashMap<_, _> = { let partitions = metadata.topics()[0].partitions(); @@ -121,7 +174,7 @@ impl KafkaSourceFunc { }) .map(|(_, p)| { let offset = state - .get(&p.id()) + .get(&(topic.clone(), p.id())) .map(|s| Offset::Offset(s.offset)) .unwrap_or_else(|| { if has_state { @@ -133,14 +186,14 @@ impl KafkaSourceFunc { } }); - ((self.topic.clone(), p.id()), offset) + ((topic.clone(), p.id()), offset) }) .collect() }; info!( "partition map for {}-{}: {:?}", - self.topic, ctx.task_info.task_index, our_partitions + topic, ctx.task_info.task_index, our_partitions ); let topic_partitions = TopicPartitionList::from_topic_map(&our_partitions)?; @@ -161,9 +214,12 @@ impl KafkaSourceFunc { .context("creating kafka consumer")?; let rate_limiter = GovernorRateLimiter::direct(Quota::per_second(self.messages_per_second)); - let mut offsets = HashMap::new(); + // Track offsets by (topic, partition) to support multi-topic subscriptions + let mut offsets: HashMap<(String, i32), i64> = HashMap::new(); - if consumer.assignment().unwrap().count() == 0 { + // For pattern subscriptions, we skip the initial assignment check since + // Kafka handles partition assignment dynamically via the consumer group protocol + if self.topic_pattern.is_none() && consumer.assignment().unwrap().count() == 0 { warn!( "Kafka Consumer {}-{} is subscribed to no partitions, as there are more subtasks than partitions... setting idle", ctx.task_info.operator_id, ctx.task_info.task_index @@ -227,7 +283,8 @@ impl KafkaSourceFunc { collector.flush_buffer().await?; } - offsets.insert(msg.partition(), msg.offset()); + // Store offset with (topic, partition) key for multi-topic support + offsets.insert((topic.to_string(), msg.partition()), msg.offset()); rate_limiter.until_ready().await; } }, @@ -247,13 +304,20 @@ impl KafkaSourceFunc { debug!("starting checkpointing {}", ctx.task_info.task_index); let mut topic_partitions = TopicPartitionList::new(); let s = ctx.table_manager.get_global_keyed_state("k").await?; - for (partition, offset) in &offsets { - s.insert(*partition, KafkaState { - partition: *partition, - offset: *offset + 1, - }).await; + for ((topic, partition), offset) in &offsets { + s.insert( + KafkaStateKey { + topic: topic.clone(), + partition: *partition, + }, + KafkaState { + topic: topic.clone(), + partition: *partition, + offset: *offset + 1, + } + ).await; topic_partitions.add_partition_offset( - &self.topic, *partition, Offset::Offset(*offset)).unwrap(); + topic, *partition, Offset::Offset(*offset)).unwrap(); } if let Err(e) = consumer.commit(&topic_partitions, CommitMode::Async) { @@ -305,7 +369,7 @@ impl SourceOperator for KafkaSourceFunc { } fn name(&self) -> String { - format!("kafka-{}", self.topic) + format!("kafka-{}", self.topic_display()) } fn tables(&self) -> HashMap { diff --git a/crates/arroyo-connectors/src/kafka/source/test.rs b/crates/arroyo-connectors/src/kafka/source/test.rs index 531505658..dba575608 100644 --- a/crates/arroyo-connectors/src/kafka/source/test.rs +++ b/crates/arroyo-connectors/src/kafka/source/test.rs @@ -81,7 +81,8 @@ impl KafkaTopicTester { ) -> KafkaSourceWithReads { let mut kafka = Box::new(KafkaSourceFunc { bootstrap_servers: self.server.clone(), - topic: self.topic.clone(), + topic: Some(self.topic.clone()), + topic_pattern: None, group_id: self.group_id.clone(), group_id_prefix: None, offset_mode: SourceOffset::Earliest, @@ -394,7 +395,8 @@ async fn test_kafka_with_metadata_fields() { // Set metadata fields in KafkaSourceFunc let mut kafka = KafkaSourceFunc { bootstrap_servers: kafka_topic_tester.server.clone(), - topic: kafka_topic_tester.topic.clone(), + topic: Some(kafka_topic_tester.topic.clone()), + topic_pattern: None, group_id: kafka_topic_tester.group_id.clone(), group_id_prefix: None, offset_mode: SourceOffset::Earliest, diff --git a/crates/arroyo-connectors/src/kafka/table.json b/crates/arroyo-connectors/src/kafka/table.json index 56404d233..55083d058 100644 --- a/crates/arroyo-connectors/src/kafka/table.json +++ b/crates/arroyo-connectors/src/kafka/table.json @@ -5,9 +5,14 @@ "topic": { "title": "Topic", "type": "string", - "description": "The Kafka topic to use for this table", + "description": "The Kafka topic to use for this table. Either 'topic' or 'topic_pattern' must be set, but not both.", "format": "autocomplete" }, + "topic_pattern": { + "title": "Topic Pattern", + "type": "string", + "description": "A regex pattern to subscribe to multiple topics (e.g., '^logs-.*' or '^edge[0-9]+_raw$'). Uses Kafka's pattern subscription with automatic rebalancing. Either 'topic' or 'topic_pattern' must be set, but not both. Only valid for source tables." + }, "type": { "title": "Table Type", "oneOf": [ @@ -94,7 +99,6 @@ } }, "required": [ - "topic", "type" ] }