Conversation
mwylde
left a comment
There was a problem hiding this comment.
Apologies for the long delay on the review, and thanks for the contribution! This is a good start that needs a little more work before we'll be able to merge it.
In addition to the changes requested in the PR, it will also need state handling in the source. In order to provide at least-once semantics, arroyo needs to manage it own offset tracking as part of its state. Otherwise, if we're relying on auto-comitting, we might drop messages on restart. Look at how this is implemented in the Kafka connector.
| if (format == 'json') { | ||
| schemaTypeOptions.push({ name: 'Unstructured JSON', value: 'unstructured' }); | ||
| } | ||
| const schemaTypeOptions = useMemo(() => { |
There was a problem hiding this comment.
Can you explain the reason for this change? In general, all formats should be supported for any connector, as the format system is responsible for deserializing the bytes after they've been read from the source. We also want to avoid introducing special cases for particular connectors in the UI.
| @@ -0,0 +1,56 @@ | |||
| <?xml version="1.0" encoding="utf-8"?> | |||
There was a problem hiding this comment.
If possible, this should be converted to black and white to match with the other connector images. (If that's not too much trouble, I can just up a monochrome version).
| "username": { | ||
| "title": "Username", | ||
| "type": "string", | ||
| "description": "Username for authentication" |
There was a problem hiding this comment.
It's useful for username to also be a var-str, even though it's not generally sensitive, as that allows users to manage usernames and passwords together.
| ] | ||
| }, | ||
| "consumer_id": { | ||
| "type": "integer", |
There was a problem hiding this comment.
I think in general users shouldn't need to set a consumer name, since in arroyo we manage the horizontal scaling using low-level apis rather than high-level consumer group APIs.
It also looks like Iggy supports both numeric and textual consumers; I think many users would prefer to use names so perhaps it makes sense to support both types here by taking a string?
| "title": "Consumer ID", | ||
| "description": "The ID of the consumer to use for polling messages" | ||
| }, | ||
| "partition_id": { |
There was a problem hiding this comment.
I'm not sure we want to have the user choose a particular partition to read from; generally you want to read from all partitions. At the very least I think we need an option to read from all partitions.
| async fn init_client(&mut self) -> Result<()> { | ||
| info!("Creating Iggy client for {}", self.endpoint); | ||
|
|
||
| let client = IggyClient::default(); |
There was a problem hiding this comment.
We're not passing in the configured endpoint here
| })?; | ||
| table.insert(self.partition_id, state).await; | ||
|
|
||
| let client_temp = self.client.take(); |
| let mut additional_fields = HashMap::new(); | ||
| for field in &self.metadata_fields { | ||
| match field.key.as_str() { | ||
| "offset" => { |
There was a problem hiding this comment.
This can be made a bit more succinct by pulling the insert call out of each match statement (since it's always the same)
| .await | ||
| { | ||
| Ok(_) => { | ||
| info!("Sent {} messages to Iggy", messages_to_send.len()); |
There was a problem hiding this comment.
This should be downgraded to a debug or trace, otherwise will be very noisy
| } | ||
|
|
||
| if let Err(e) = self.send_messages(messages, ctx).await { | ||
| error!("Failed to send messages to Iggy: {:?}", e); |
There was a problem hiding this comment.
This is currently dropping messages on failure, which is generally the wrong default for a stream processing system like arroyo. Instead, we should retry some number of times and then fail the pipeline by panicking.
Adding connector for Apache Iggy.
We tried to keep the connector similar to apache kafka.
Currently supporting http transfer protocol and unstructed json data type.
Thanks to @Vinamra7 for helping contribute the source connector.
This refers the issue #818