Skip to content

Commit 3e1b7e9

Browse files
committed
add unit tests for the changed part of AvroDeserializationSchema
1 parent ed5d976 commit 3e1b7e9

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

flink-sql-runner/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@
207207
<classifier>runtime</classifier>
208208
<scope>test</scope>
209209
</dependency>
210+
211+
<dependency>
212+
<groupId>org.apache.flink</groupId>
213+
<artifactId>flink-avro</artifactId>
214+
<version>${flink.version}</version>
215+
<type>test-jar</type>
216+
<scope>test</scope>
217+
</dependency>
210218
</dependencies>
211219

212220
<build>
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.flink.formats.avro;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.fail;
20+
21+
import java.io.IOException;
22+
import java.util.Random;
23+
import org.apache.avro.generic.GenericRecord;
24+
import org.apache.flink.formats.avro.generated.Address;
25+
import org.apache.flink.formats.avro.utils.AvroTestUtils;
26+
import org.apache.flink.formats.avro.utils.TestDataGenerator;
27+
import org.junit.jupiter.api.Test;
28+
29+
class AvroDeserializationSchemaTest {
30+
31+
private static final Address ADDRESS = TestDataGenerator.generateRandomAddress(new Random());
32+
private static final byte[] CORRUPT_BYTES = {0x0, 0x1, 0x2, 0x3, 0x4};
33+
34+
@Test
35+
void testGenericRecordFailureRecovery() throws IOException {
36+
AvroDeserializationSchema<GenericRecord> deserSchema =
37+
AvroDeserializationSchema.forGeneric(ADDRESS.getSchema());
38+
39+
byte[] msgBytes = AvroTestUtils.writeRecord(ADDRESS, Address.getClassSchema());
40+
deserSchema.deserialize(msgBytes);
41+
42+
try {
43+
deserSchema.deserialize(CORRUPT_BYTES);
44+
fail("Should fail when deserializing corrupt bytes.");
45+
} catch (Exception ignored) {
46+
}
47+
48+
GenericRecord result = deserSchema.deserialize(msgBytes);
49+
assertThat(result).isNotNull();
50+
assertThat(result.get("num")).isEqualTo(ADDRESS.getNum());
51+
assertThat(result.get("state").toString()).isEqualTo(ADDRESS.getState());
52+
assertThat(result.get("city").toString()).isEqualTo(ADDRESS.getCity());
53+
assertThat(result.get("street").toString()).isEqualTo(ADDRESS.getStreet());
54+
}
55+
56+
@Test
57+
void testSpecificRecordFailureRecovery() throws IOException {
58+
AvroDeserializationSchema<Address> deserSchema =
59+
AvroDeserializationSchema.forSpecific(Address.class);
60+
61+
byte[] msgBytes = AvroTestUtils.writeRecord(ADDRESS, Address.getClassSchema());
62+
deserSchema.deserialize(msgBytes);
63+
64+
try {
65+
deserSchema.deserialize(CORRUPT_BYTES);
66+
fail("Should fail when deserializing corrupt bytes.");
67+
} catch (Exception ignored) {
68+
}
69+
70+
Address result = deserSchema.deserialize(msgBytes);
71+
assertThat(result).isEqualTo(ADDRESS);
72+
}
73+
}

0 commit comments

Comments
 (0)