to #103, Support PB format serializer in Kafka Source#185
to #103, Support PB format serializer in Kafka Source#185qidian99 wants to merge 1 commit intobytedance:masterfrom
Conversation
|
hi @qidian99 , thanks for your contribution, looks like there is a conflict. Would you rebase the latest master? |
2cb8936 to
40983a3
Compare
Done rebasing. @garyli1019 |
40983a3 to
bc60582
Compare
garyli1019
left a comment
There was a problem hiding this comment.
Thanks for your contribution! left some comments
There was a problem hiding this comment.
can we add the license header?
There was a problem hiding this comment.
This class is generated by protoc, which isn't dynamic. I removed it and made the test case directly read proto descriptor for job configuration.
There was a problem hiding this comment.
I think it's ok to put this class in our codebase, because if we generate this class from the test case, this class will be recognized by git and added to the git commit for those who run the test case. It's tricky to add this to .gitignore as well. Add the license header on an auto generated file is fine I believe.
There was a problem hiding this comment.
nvm, my concern didn't happen at all.
There was a problem hiding this comment.
can we use the CountKafkaDeserializationSchemaWrapper like json, this wrapper is useful to run our regression test for streaming mode.
There was a problem hiding this comment.
Done, PbKafkaDeserializationSchema is renamed to PbDeserializationSchema and extends Flink's deserialization schema
There was a problem hiding this comment.
with the wrapper, this class could be more general which not only work for kafka. Can we move this to bitsail-components/bitsail-component-formats/bitsail-component-format-pb
There was a problem hiding this comment.
Tried generalizing PbDeserializationSchema. Already created empty module named bitsail-component-format-pb. However, there are some problems:
- the wrapper takes in org.apache.flink.api.common.serialization.DeserializationSchema while deserialization schemas in bitsail-component-formats implements com.bytedance.bitsail.base.format.DeserializationSchema<I, O>
- v1 connectors are not using spi to get deserialization schemas, e.g., RocketMQDeserializationSchema

IMHO, It is better to create another issue for bitsail-component-formats implementations
There was a problem hiding this comment.
Got it, I agree with you here! Please feel free to create an issue if you're interested with the follow up.
There was a problem hiding this comment.
can we can UT for this, I believe all the supported types should be covered
There was a problem hiding this comment.
not sure why we need this SneakyThrows here, would you elaborate?
There was a problem hiding this comment.
Removed. Thanks for pointing out!
| @@ -0,0 +1,7 @@ | |||
| syntax = "proto3"; | |||
|
|
|||
| message ProtoTest { | |||
There was a problem hiding this comment.
I'd suggest we include all supported types here, including some complex type.
There was a problem hiding this comment.
This json sample should be runnable. It's ok to add the pb info from the integration test you write.
There was a problem hiding this comment.
Added base64 encoded descriptor and class name
|
|
||
| @Test | ||
| public void testKafkaSource() throws Exception { | ||
| public void testKafkaSourceJsonFormat() throws Exception { |
There was a problem hiding this comment.
I ran this ITCase and one test was failed. looks like the kafka producer didn't stop when we switch to the second test.
87e5e04 to
a46ffa4
Compare
a46ffa4 to
00fca9e
Compare
|
👏 Great work! |

Signed-off-by:
to #103, Support PB format serializer in Kafka Source