Skip to content

Commit 7af5ed9

Browse files
committed
Make AvroDeserializationSchema deser failure tolerant
1 parent 75cf610 commit 7af5ed9

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

flink-sql-runner/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,13 @@
233233
</excludes>
234234
</artifactSet>
235235
<filters>
236+
<!-- Exclude original AvroDeserializationSchema, because for now we overwrite that class downstream. -->
237+
<filter>
238+
<artifact>org.apache.flink:flink-avro</artifact>
239+
<excludes>
240+
<exclude>org/apache/flink/formats/avro/AvroDeserializationSchema**</exclude>
241+
</excludes>
242+
</filter>
236243
<filter>
237244
<artifact>*:*</artifact>
238245
<excludes>

flink-sql-runner/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,16 @@ public T deserialize(@Nullable byte[] message) throws IOException {
173173
((JsonDecoder) this.decoder).configure(inputStream);
174174
}
175175

176-
return datumReader.read(null, decoder);
176+
try {
177+
return datumReader.read(null, decoder);
178+
} catch (Exception e) {
179+
/*
180+
* If the deserialization fails, {@code datumReader} has to be reset, because {@code Decoder} will remain in an
181+
* inconsistent state, which will make subsequent valid deserialization attempts to fail.
182+
*/
183+
this.datumReader = null;
184+
throw e;
185+
}
177186
}
178187

179188
void checkAvroInitialized() throws IOException {

0 commit comments

Comments
 (0)