Skip to content

Commit 75cf610

Browse files
committed
Copy AvroDeserializationSchema from upstream as is
1 parent e25d212 commit 75cf610

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.flink.formats.avro;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
import javax.annotation.Nullable;
21+
import org.apache.avro.Schema;
22+
import org.apache.avro.generic.GenericData;
23+
import org.apache.avro.generic.GenericDatumReader;
24+
import org.apache.avro.generic.GenericRecord;
25+
import org.apache.avro.io.Decoder;
26+
import org.apache.avro.io.DecoderFactory;
27+
import org.apache.avro.io.JsonDecoder;
28+
import org.apache.avro.specific.SpecificData;
29+
import org.apache.avro.specific.SpecificDatumReader;
30+
import org.apache.avro.specific.SpecificRecord;
31+
import org.apache.flink.api.common.serialization.DeserializationSchema;
32+
import org.apache.flink.api.common.typeinfo.TypeInformation;
33+
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
34+
import org.apache.flink.formats.avro.typeutils.AvroFactory;
35+
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
36+
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
37+
import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
38+
import org.apache.flink.util.Preconditions;
39+
40+
/**
41+
* Deserialization schema that deserializes from Avro binary format.
42+
*
43+
* @param <T> type of record it produces
44+
*/
45+
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
46+
47+
/**
48+
* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided
49+
* schema.
50+
*
51+
* @param schema schema of produced records
52+
* @return deserialized record in form of {@link GenericRecord}
53+
*/
54+
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
55+
return forGeneric(schema, AvroEncoding.BINARY);
56+
}
57+
58+
/**
59+
* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided
60+
* schema.
61+
*
62+
* @param schema schema of produced records
63+
* @param encoding Avro serialization approach to use for decoding
64+
* @return deserialized record in form of {@link GenericRecord}
65+
*/
66+
public static AvroDeserializationSchema<GenericRecord> forGeneric(
67+
Schema schema, AvroEncoding encoding) {
68+
return new AvroDeserializationSchema<>(GenericRecord.class, schema, encoding);
69+
}
70+
71+
/**
72+
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
73+
* schema.
74+
*
75+
* @param tClass class of record to be produced
76+
* @return deserialized record
77+
*/
78+
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(
79+
Class<T> tClass) {
80+
return forSpecific(tClass, AvroEncoding.BINARY);
81+
}
82+
83+
/**
84+
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
85+
* schema.
86+
*
87+
* @param tClass class of record to be produced
88+
* @param encoding Avro serialization approach to use for decoding
89+
* @return deserialized record
90+
*/
91+
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(
92+
Class<T> tClass, AvroEncoding encoding) {
93+
return new AvroDeserializationSchema<>(tClass, null, encoding);
94+
}
95+
96+
private static final long serialVersionUID = -6766681879020862312L;
97+
98+
/** Class to deserialize to. */
99+
private final Class<T> recordClazz;
100+
101+
/** Schema in case of GenericRecord for serialization purpose. */
102+
private final String schemaString;
103+
104+
/** Reader that deserializes byte array into a record. */
105+
private transient GenericDatumReader<T> datumReader;
106+
107+
/** Input stream to read message from. */
108+
private transient MutableByteArrayInputStream inputStream;
109+
110+
/** Config option for the deserialization approach to use. */
111+
private final AvroEncoding encoding;
112+
113+
/** Avro decoder that decodes data. */
114+
private transient Decoder decoder;
115+
116+
/** Avro schema for the reader. */
117+
private transient Schema reader;
118+
119+
/**
120+
* Creates a Avro deserialization schema.
121+
*
122+
* @param recordClazz class to which deserialize. Should be one of: {@link
123+
* org.apache.avro.specific.SpecificRecord}, {@link org.apache.avro.generic.GenericRecord}.
124+
* @param reader reader's Avro schema. Should be provided if recordClazz is {@link GenericRecord}
125+
* @param encoding encoding approach to use. Identifies the Avro decoder class to use.
126+
*/
127+
AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader, AvroEncoding encoding) {
128+
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
129+
this.recordClazz = recordClazz;
130+
this.reader = reader;
131+
if (reader != null) {
132+
this.schemaString = reader.toString();
133+
} else {
134+
this.schemaString = null;
135+
}
136+
this.encoding = encoding;
137+
}
138+
139+
GenericDatumReader<T> getDatumReader() {
140+
return datumReader;
141+
}
142+
143+
Schema getReaderSchema() {
144+
return reader;
145+
}
146+
147+
MutableByteArrayInputStream getInputStream() {
148+
return inputStream;
149+
}
150+
151+
Decoder getDecoder() {
152+
return decoder;
153+
}
154+
155+
AvroEncoding getEncoding() {
156+
return encoding;
157+
}
158+
159+
@Override
160+
public T deserialize(@Nullable byte[] message) throws IOException {
161+
if (message == null) {
162+
return null;
163+
}
164+
// read record
165+
checkAvroInitialized();
166+
inputStream.setBuffer(message);
167+
Schema readerSchema = getReaderSchema();
168+
GenericDatumReader<T> datumReader = getDatumReader();
169+
170+
datumReader.setSchema(readerSchema);
171+
172+
if (encoding == AvroEncoding.JSON) {
173+
((JsonDecoder) this.decoder).configure(inputStream);
174+
}
175+
176+
return datumReader.read(null, decoder);
177+
}
178+
179+
void checkAvroInitialized() throws IOException {
180+
if (datumReader != null) {
181+
return;
182+
}
183+
184+
ClassLoader cl = Thread.currentThread().getContextClassLoader();
185+
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
186+
@SuppressWarnings("unchecked")
187+
SpecificData specificData =
188+
AvroFactory.getSpecificDataForClass((Class<? extends SpecificData>) recordClazz, cl);
189+
this.datumReader = new SpecificDatumReader<>(specificData);
190+
this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
191+
} else {
192+
this.reader = new Schema.Parser().parse(schemaString);
193+
GenericData genericData = new GenericData(cl);
194+
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
195+
}
196+
197+
this.inputStream = new MutableByteArrayInputStream();
198+
199+
if (encoding == AvroEncoding.JSON) {
200+
this.decoder = DecoderFactory.get().jsonDecoder(getReaderSchema(), inputStream);
201+
} else {
202+
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
203+
}
204+
}
205+
206+
@Override
207+
public boolean isEndOfStream(T nextElement) {
208+
return false;
209+
}
210+
211+
@Override
212+
@SuppressWarnings({"unchecked", "rawtypes"})
213+
public TypeInformation<T> getProducedType() {
214+
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
215+
return new AvroTypeInfo(recordClazz);
216+
} else {
217+
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
218+
}
219+
}
220+
221+
@Override
222+
public boolean equals(Object o) {
223+
if (this == o) {
224+
return true;
225+
}
226+
if (o == null || getClass() != o.getClass()) {
227+
return false;
228+
}
229+
AvroDeserializationSchema<?> that = (AvroDeserializationSchema<?>) o;
230+
return recordClazz.equals(that.recordClazz) && Objects.equals(reader, that.reader);
231+
}
232+
233+
@Override
234+
public int hashCode() {
235+
return Objects.hash(recordClazz, reader);
236+
}
237+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
/**
17+
* We need to overwrite classes in this package downstream to apply some custom logic for the Kafka
18+
* deserialization failure handler to work properly.
19+
*/
20+
package org.apache.flink.formats.avro;

0 commit comments

Comments
 (0)