Skip to content

Commit dbd9f53

Browse files
committed
Provide a default mapper for BigQueryQueryItemReader
Signed-off-by: Dgray16 <vova235@gmail.com>
1 parent 22a62f8 commit dbd9f53

File tree

5 files changed

+207
-10
lines changed

5 files changed

+207
-10
lines changed

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
3939
private String query;
4040
private Converter<FieldValueList, T> rowMapper;
4141
private QueryJobConfiguration jobConfiguration;
42+
private Class<T> targetType;
4243

4344
/**
4445
* BigQuery service, responsible for API calls.
@@ -47,7 +48,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
4748
* @return {@link BigQueryQueryItemReaderBuilder}
4849
* @see BigQueryQueryItemReader#setBigQuery(BigQuery)
4950
*/
50-
public BigQueryQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
51+
public BigQueryQueryItemReaderBuilder<T> bigQuery(final BigQuery bigQuery) {
5152
this.bigQuery = bigQuery;
5253
return this;
5354
}
@@ -62,7 +63,7 @@ public BigQueryQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
6263
* @return {@link BigQueryQueryItemReaderBuilder}
6364
* @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
6465
*/
65-
public BigQueryQueryItemReaderBuilder<T> query(String query) {
66+
public BigQueryQueryItemReaderBuilder<T> query(final String query) {
6667
this.query = query;
6768
return this;
6869
}
@@ -74,7 +75,7 @@ public BigQueryQueryItemReaderBuilder<T> query(String query) {
7475
* @return {@link BigQueryQueryItemReaderBuilder}
7576
* @see BigQueryQueryItemReader#setRowMapper(Converter)
7677
*/
77-
public BigQueryQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T> rowMapper) {
78+
public BigQueryQueryItemReaderBuilder<T> rowMapper(final Converter<FieldValueList, T> rowMapper) {
7879
this.rowMapper = rowMapper;
7980
return this;
8081
}
@@ -86,21 +87,40 @@ public BigQueryQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T>
8687
* @return {@link BigQueryQueryItemReaderBuilder}
8788
* @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
8889
*/
89-
public BigQueryQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration jobConfiguration) {
90+
public BigQueryQueryItemReaderBuilder<T> jobConfiguration(final QueryJobConfiguration jobConfiguration) {
9091
this.jobConfiguration = jobConfiguration;
9192
return this;
9293
}
9394

95+
/**
96+
* Specifies a target type which will be used as a result.
97+
* Only needed when {@link BigQueryQueryItemReaderBuilder#rowMapper} is not provided.
98+
*
99+
* @param targetType a {@link Class} that represent desired type
100+
* @return {@link BigQueryQueryItemReaderBuilder}
101+
*/
102+
public BigQueryQueryItemReaderBuilder<T> targetType(final Class<T> targetType) {
103+
this.targetType = targetType;
104+
return this;
105+
}
106+
94107
/**
95108
* Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
96109
*
97110
* @return {@link BigQueryQueryItemReader}
98111
*/
99112
public BigQueryQueryItemReader<T> build() {
100-
BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();
113+
final BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();
101114

102115
reader.setBigQuery(this.bigQuery);
103-
reader.setRowMapper(this.rowMapper);
116+
117+
if (this.rowMapper == null) {
118+
Assert.notNull(this.targetType, "No target type provided");
119+
Assert.isTrue(this.targetType.isRecord(), "Only Java record supported");
120+
reader.setRowMapper(new RecordMapper<T>().map(this.targetType));
121+
} else {
122+
reader.setRowMapper(this.rowMapper);
123+
}
104124

105125
if (this.jobConfiguration == null) {
106126
Assert.isTrue(StringUtils.hasText(this.query), "No query provided");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.reader.builder;
18+
19+
import com.google.cloud.bigquery.FieldValueList;
20+
import org.springframework.beans.BeanUtils;
21+
import org.springframework.beans.SimpleTypeConverter;
22+
import org.springframework.core.convert.converter.Converter;
23+
import org.springframework.util.Assert;
24+
25+
import java.lang.reflect.Constructor;
26+
27+
/**
28+
* A helper class which is tries to convert BigQuery response to a Java record.
29+
*
30+
* @param <T> Java record type
31+
* @author Volodymyr Perebykivskyi
32+
* @since 0.2.0
33+
*/
34+
public final class RecordMapper<T> {
35+
36+
private final SimpleTypeConverter simpleConverter = new SimpleTypeConverter();
37+
38+
/**
39+
* Generates a conversion from BigQuery response to a Java record.
40+
*
41+
* @param targetType a Java record {@link Class}
42+
* @return {@link Converter}
43+
* @see org.springframework.batch.item.file.mapping.RecordFieldSetMapper
44+
*/
45+
public Converter<FieldValueList, T> map(Class<T> targetType) {
46+
Constructor<T> constructor = BeanUtils.getResolvableConstructor(targetType);
47+
Assert.isTrue(constructor.getParameterCount() > 0, "Record without fields is redundant");
48+
49+
String[] parameterNames = BeanUtils.getParameterNames(constructor);
50+
Class<?>[] parameterTypes = constructor.getParameterTypes();
51+
52+
return source -> {
53+
Object[] args = new Object[parameterNames.length];
54+
55+
for (int i = 0; i < args.length; i++) {
56+
args[i] = simpleConverter.convertIfNecessary(source.get(parameterNames[i]).getValue(), parameterTypes[i]);
57+
}
58+
59+
return BeanUtils.instantiateClass(constructor, args);
60+
};
61+
}
62+
}

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
/**
2424
* Unchecked {@link Exception} indicating that an error has occurred on during {@link ItemWriter#write(Chunk)}.
25+
* @author Volodymyr Perebykivskyi
26+
* @since 0.2.0
2527
*/
2628
public class BigQueryItemWriterException extends ItemWriterException {
2729

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import com.google.cloud.bigquery.TableId;
2323
import org.junit.jupiter.api.Assertions;
2424
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.MethodSource;
2528
import org.springframework.batch.extensions.bigquery.common.PersonDto;
2629
import org.springframework.batch.extensions.bigquery.common.TestConstants;
2730
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
@@ -30,6 +33,7 @@
3033
import org.springframework.core.convert.converter.Converter;
3134

3235
import java.lang.invoke.MethodHandles;
36+
import java.util.stream.Stream;
3337

3438
class BigQueryItemReaderBuilderTest extends AbstractBigQueryTest {
3539

@@ -65,7 +69,41 @@ void testBuild_WithoutJobConfiguration() throws IllegalAccessException, NoSuchFi
6569
}
6670

6771
@Test
68-
void testBuild_WithJobConfiguration() throws IllegalAccessException, NoSuchFieldException {
72+
void testBuild_WithoutRowMapper() throws IllegalAccessException, NoSuchFieldException {
73+
BigQuery mockedBigQuery = prepareMockedBigQuery();
74+
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup());
75+
76+
QueryJobConfiguration expectedJobConfiguration = QueryJobConfiguration
77+
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 1")
78+
.build();
79+
80+
BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
81+
.bigQuery(mockedBigQuery)
82+
.jobConfiguration(expectedJobConfiguration)
83+
.targetType(PersonDto.class)
84+
.build();
85+
86+
Assertions.assertNotNull(reader);
87+
88+
BigQuery actualBigQuery = (BigQuery) handle
89+
.findVarHandle(BigQueryQueryItemReader.class, "bigQuery", BigQuery.class)
90+
.get(reader);
91+
92+
Converter<FieldValueList, PersonDto> actualRowMapper = (Converter<FieldValueList, PersonDto>) handle
93+
.findVarHandle(BigQueryQueryItemReader.class, "rowMapper", Converter.class)
94+
.get(reader);
95+
96+
QueryJobConfiguration actualJobConfiguration = (QueryJobConfiguration) handle
97+
.findVarHandle(BigQueryQueryItemReader.class, "jobConfiguration", QueryJobConfiguration.class)
98+
.get(reader);
99+
100+
Assertions.assertEquals(mockedBigQuery, actualBigQuery);
101+
Assertions.assertNotNull(actualRowMapper);
102+
Assertions.assertEquals(expectedJobConfiguration, actualJobConfiguration);
103+
}
104+
105+
@Test
106+
void testBuild() throws IllegalAccessException, NoSuchFieldException {
69107
BigQuery mockedBigQuery = prepareMockedBigQuery();
70108
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup());
71109

@@ -99,8 +137,19 @@ void testBuild_WithJobConfiguration() throws IllegalAccessException, NoSuchField
99137
Assertions.assertEquals(jobConfiguration, actualJobConfiguration);
100138
}
101139

102-
@Test
103-
void testBuild_NoQueryProvided() {
104-
Assertions.assertThrows(IllegalArgumentException.class, new BigQueryQueryItemReaderBuilder<>()::build);
140+
@ParameterizedTest
141+
@MethodSource("brokenBuilders")
142+
void testBuild_Exception(String expectedMessage, BigQueryQueryItemReaderBuilder<?> builder) {
143+
IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, builder::build);
144+
Assertions.assertEquals(expectedMessage, ex.getMessage());
145+
}
146+
147+
private static Stream<Arguments> brokenBuilders() {
148+
final class HumanDto {}
149+
return Stream.of(
150+
Arguments.of("No target type provided", new BigQueryQueryItemReaderBuilder<PersonDto>()),
151+
Arguments.of("Only Java record supported", new BigQueryQueryItemReaderBuilder<HumanDto>().targetType(HumanDto.class)),
152+
Arguments.of("No query provided", new BigQueryQueryItemReaderBuilder<PersonDto>().rowMapper(source -> null))
153+
);
105154
}
106155
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.unit.reader.builder;
18+
19+
import com.google.cloud.bigquery.Field;
20+
import com.google.cloud.bigquery.FieldValue;
21+
import com.google.cloud.bigquery.FieldValueList;
22+
import com.google.cloud.bigquery.StandardSQLTypeName;
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
import org.springframework.batch.extensions.bigquery.common.PersonDto;
26+
import org.springframework.batch.extensions.bigquery.common.TestConstants;
27+
import org.springframework.batch.extensions.bigquery.reader.builder.RecordMapper;
28+
import org.springframework.core.convert.converter.Converter;
29+
30+
import java.util.List;
31+
32+
class RecordMapperTest {
33+
34+
@Test
35+
void testMap() {
36+
RecordMapper<PersonDto> mapper = new RecordMapper<>();
37+
List<PersonDto> expected = TestConstants.CHUNK.getItems();
38+
39+
Field name = Field.of(TestConstants.NAME, StandardSQLTypeName.STRING);
40+
Field age = Field.of(TestConstants.AGE, StandardSQLTypeName.INT64);
41+
42+
PersonDto person1 = expected.get(0);
43+
FieldValue value10 = FieldValue.of(FieldValue.Attribute.PRIMITIVE, person1.name());
44+
FieldValue value11 = FieldValue.of(FieldValue.Attribute.PRIMITIVE, person1.age());
45+
46+
FieldValueList row = FieldValueList.of(List.of(value10, value11), name, age);
47+
48+
Converter<FieldValueList, PersonDto> converter = mapper.map(PersonDto.class);
49+
Assertions.assertNotNull(converter);
50+
51+
PersonDto actual = converter.convert(row);
52+
53+
Assertions.assertEquals(expected.get(0).name(), actual.name());
54+
Assertions.assertEquals(expected.get(0).age(), actual.age());
55+
}
56+
57+
@Test
58+
void testMap_EmptyRecord() {
59+
record TestRecord(){}
60+
IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, () -> new RecordMapper<TestRecord>().map(TestRecord.class));
61+
Assertions.assertEquals("Record without fields is redundant", ex.getMessage());
62+
}
63+
64+
}

0 commit comments

Comments
 (0)