Skip to content

Commit 4114825

Browse files
authored
[bq] Provide a default mapper for BigQueryQueryItemReader
Signed-off-by: Volodymyr Perebykivskyi <vova235@gmail.com>
1 parent 22a62f8 commit 4114825

File tree

6 files changed

+231
-38
lines changed

6 files changed

+231
-38
lines changed

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
3939
private String query;
4040
private Converter<FieldValueList, T> rowMapper;
4141
private QueryJobConfiguration jobConfiguration;
42+
private Class<T> targetType;
4243

4344
/**
4445
* BigQuery service, responsible for API calls.
@@ -47,7 +48,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
4748
* @return {@link BigQueryQueryItemReaderBuilder}
4849
* @see BigQueryQueryItemReader#setBigQuery(BigQuery)
4950
*/
50-
public BigQueryQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
51+
public BigQueryQueryItemReaderBuilder<T> bigQuery(final BigQuery bigQuery) {
5152
this.bigQuery = bigQuery;
5253
return this;
5354
}
@@ -62,7 +63,7 @@ public BigQueryQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
6263
* @return {@link BigQueryQueryItemReaderBuilder}
6364
* @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
6465
*/
65-
public BigQueryQueryItemReaderBuilder<T> query(String query) {
66+
public BigQueryQueryItemReaderBuilder<T> query(final String query) {
6667
this.query = query;
6768
return this;
6869
}
@@ -74,7 +75,7 @@ public BigQueryQueryItemReaderBuilder<T> query(String query) {
7475
* @return {@link BigQueryQueryItemReaderBuilder}
7576
* @see BigQueryQueryItemReader#setRowMapper(Converter)
7677
*/
77-
public BigQueryQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T> rowMapper) {
78+
public BigQueryQueryItemReaderBuilder<T> rowMapper(final Converter<FieldValueList, T> rowMapper) {
7879
this.rowMapper = rowMapper;
7980
return this;
8081
}
@@ -86,21 +87,41 @@ public BigQueryQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T>
8687
* @return {@link BigQueryQueryItemReaderBuilder}
8788
* @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
8889
*/
89-
public BigQueryQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration jobConfiguration) {
90+
public BigQueryQueryItemReaderBuilder<T> jobConfiguration(final QueryJobConfiguration jobConfiguration) {
9091
this.jobConfiguration = jobConfiguration;
9192
return this;
9293
}
9394

95+
/**
96+
* Specifies a target type which will be used as a result.
97+
* Only needed when {@link BigQueryQueryItemReaderBuilder#rowMapper} is not provided.
98+
* Take into account that only {@link Class#isRecord()} supported.
99+
*
100+
* @param targetType a {@link Class} that represent desired type
101+
* @return {@link BigQueryQueryItemReaderBuilder}
102+
*/
103+
public BigQueryQueryItemReaderBuilder<T> targetType(final Class<T> targetType) {
104+
this.targetType = targetType;
105+
return this;
106+
}
107+
94108
/**
95109
* Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
96110
*
97111
* @return {@link BigQueryQueryItemReader}
98112
*/
99113
public BigQueryQueryItemReader<T> build() {
100-
BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();
114+
final BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();
101115

102116
reader.setBigQuery(this.bigQuery);
103-
reader.setRowMapper(this.rowMapper);
117+
118+
if (this.rowMapper == null) {
119+
Assert.notNull(this.targetType, "No target type provided");
120+
Assert.isTrue(this.targetType.isRecord(), "Only Java record supported");
121+
reader.setRowMapper(new RecordMapper<T>().generateMapper(this.targetType));
122+
} else {
123+
reader.setRowMapper(this.rowMapper);
124+
}
104125

105126
if (this.jobConfiguration == null) {
106127
Assert.isTrue(StringUtils.hasText(this.query), "No query provided");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.reader.builder;
18+
19+
import com.google.cloud.bigquery.FieldValueList;
20+
import org.springframework.beans.BeanUtils;
21+
import org.springframework.beans.SimpleTypeConverter;
22+
import org.springframework.core.convert.converter.Converter;
23+
import org.springframework.util.Assert;
24+
25+
import java.lang.reflect.Constructor;
26+
27+
/**
28+
* A helper class which tries to convert BigQuery response to a Java record.
29+
*
30+
* @param <T> Java record type
31+
* @author Volodymyr Perebykivskyi
32+
* @since 0.2.0
33+
*/
34+
public final class RecordMapper<T> {
35+
36+
private final SimpleTypeConverter simpleConverter = new SimpleTypeConverter();
37+
38+
/**
39+
* Generates a conversion from BigQuery response to a Java record.
40+
*
41+
* @param targetType a Java record {@link Class}
42+
* @return {@link Converter}
43+
* @see org.springframework.batch.item.file.mapping.RecordFieldSetMapper
44+
*/
45+
public Converter<FieldValueList, T> generateMapper(Class<T> targetType) {
46+
Constructor<T> constructor = BeanUtils.getResolvableConstructor(targetType);
47+
Assert.isTrue(constructor.getParameterCount() > 0, "Record without fields is redundant");
48+
49+
String[] parameterNames = BeanUtils.getParameterNames(constructor);
50+
Class<?>[] parameterTypes = constructor.getParameterTypes();
51+
52+
Object[] args = new Object[parameterNames.length];
53+
54+
return source -> {
55+
if (args[0] == null) {
56+
for (int i = 0; i < args.length; i++) {
57+
args[i] = simpleConverter.convertIfNecessary(source.get(parameterNames[i]).getValue(), parameterTypes[i]);
58+
}
59+
}
60+
61+
return BeanUtils.instantiateClass(constructor, args);
62+
};
63+
}
64+
}

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ protected Table getTable() {
8282
*
8383
* @param datasetInfo BigQuery dataset info
8484
*/
85-
public void setDatasetInfo(DatasetInfo datasetInfo) {
85+
public void setDatasetInfo(final DatasetInfo datasetInfo) {
8686
this.datasetInfo = datasetInfo;
8787
}
8888

@@ -91,7 +91,7 @@ public void setDatasetInfo(DatasetInfo datasetInfo) {
9191
*
9292
* @param consumer your consumer
9393
*/
94-
public void setJobConsumer(Consumer<Job> consumer) {
94+
public void setJobConsumer(final Consumer<Job> consumer) {
9595
this.jobConsumer = consumer;
9696
}
9797

@@ -100,7 +100,7 @@ public void setJobConsumer(Consumer<Job> consumer) {
100100
*
101101
* @param writeChannelConfig BigQuery channel configuration
102102
*/
103-
public void setWriteChannelConfig(WriteChannelConfiguration writeChannelConfig) {
103+
public void setWriteChannelConfig(final WriteChannelConfiguration writeChannelConfig) {
104104
this.writeChannelConfig = writeChannelConfig;
105105
}
106106

@@ -109,30 +109,30 @@ public void setWriteChannelConfig(WriteChannelConfiguration writeChannelConfig)
109109
*
110110
* @param bigQuery BigQuery service
111111
*/
112-
public void setBigQuery(BigQuery bigQuery) {
112+
public void setBigQuery(final BigQuery bigQuery) {
113113
this.bigQuery = bigQuery;
114114
}
115115

116116
@Override
117-
public void write(Chunk<? extends T> chunk) throws Exception {
117+
public void write(final Chunk<? extends T> chunk) throws Exception {
118118
if (!chunk.isEmpty()) {
119-
List<? extends T> items = chunk.getItems();
119+
final List<? extends T> items = chunk.getItems();
120120
doInitializeProperties(items);
121121

122122
if (this.logger.isDebugEnabled()) {
123123
this.logger.debug(String.format("Mapping %d elements", items.size()));
124124
}
125125

126-
ByteBuffer byteBuffer = mapDataToBigQueryFormat(items);
126+
final ByteBuffer byteBuffer = mapDataToBigQueryFormat(items);
127127
doWriteDataToBigQuery(byteBuffer);
128128
}
129129
}
130130

131-
private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOException {
132-
ByteBuffer byteBuffer;
133-
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
131+
private ByteBuffer mapDataToBigQueryFormat(final List<? extends T> items) throws IOException {
132+
final ByteBuffer byteBuffer;
133+
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
134134

135-
List<byte[]> data = convertObjectsToByteArrays(items);
135+
final List<byte[]> data = convertObjectsToByteArrays(items);
136136

137137
for (byte[] byteArray : data) {
138138
outputStream.write(byteArray);
@@ -147,14 +147,14 @@ private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOExc
147147
return byteBuffer;
148148
}
149149

150-
private void doWriteDataToBigQuery(ByteBuffer byteBuffer) throws IOException {
150+
private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
151151
if (this.logger.isDebugEnabled()) {
152152
this.logger.debug("Writing data to BigQuery");
153153
}
154154

155155
TableDataWriteChannel writeChannel = null;
156156

157-
try (TableDataWriteChannel writer = getWriteChannel()) {
157+
try (final TableDataWriteChannel writer = getWriteChannel()) {
158158
/* TableDataWriteChannel is not thread safe */
159159
writer.write(byteBuffer);
160160
writeChannel = writer;
@@ -209,29 +209,22 @@ public void afterPropertiesSet() {
209209

210210
performFormatSpecificChecks();
211211

212-
String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
212+
final String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
213213
if (this.datasetInfo == null) {
214214
this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
215+
} else {
216+
Assert.isTrue(Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset), "Dataset should be configured properly");
215217
}
216218

217-
Assert.isTrue(
218-
Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset),
219-
"Dataset should be configured properly"
220-
);
221-
222219
createDataset();
223220
}
224221

225222
private void createDataset() {
226-
TableId tableId = this.writeChannelConfig.getDestinationTable();
227-
String datasetToCheck = tableId.getDataset();
228-
229-
if (datasetToCheck != null) {
230-
Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck);
223+
final TableId tableId = this.writeChannelConfig.getDestinationTable();
224+
final String datasetToCheck = tableId.getDataset();
231225

232-
if (foundDataset == null && this.datasetInfo != null) {
233-
this.bigQuery.create(this.datasetInfo);
234-
}
226+
if (datasetToCheck != null && this.bigQuery.getDataset(datasetToCheck) == null && this.datasetInfo != null) {
227+
this.bigQuery.create(this.datasetInfo);
235228
}
236229
}
237230

@@ -270,7 +263,7 @@ private boolean isIceberg() {
270263
* @param table BigQuery table
271264
* @return {@code true} if BigQuery {@link Table} has schema already described
272265
*/
273-
protected boolean tableHasDefinedSchema(Table table) {
266+
protected boolean tableHasDefinedSchema(final Table table) {
274267
return Optional
275268
.ofNullable(table)
276269
.map(Table::getDefinition)

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
/**
2424
* Unchecked {@link Exception} indicating that an error has occurred on during {@link ItemWriter#write(Chunk)}.
25+
* @author Volodymyr Perebykivskyi
26+
* @since 0.2.0
2527
*/
2628
public class BigQueryItemWriterException extends ItemWriterException {
2729

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import com.google.cloud.bigquery.TableId;
2323
import org.junit.jupiter.api.Assertions;
2424
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.MethodSource;
2528
import org.springframework.batch.extensions.bigquery.common.PersonDto;
2629
import org.springframework.batch.extensions.bigquery.common.TestConstants;
2730
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
@@ -30,6 +33,7 @@
3033
import org.springframework.core.convert.converter.Converter;
3134

3235
import java.lang.invoke.MethodHandles;
36+
import java.util.stream.Stream;
3337

3438
class BigQueryItemReaderBuilderTest extends AbstractBigQueryTest {
3539

@@ -65,7 +69,41 @@ void testBuild_WithoutJobConfiguration() throws IllegalAccessException, NoSuchFi
6569
}
6670

6771
@Test
68-
void testBuild_WithJobConfiguration() throws IllegalAccessException, NoSuchFieldException {
72+
void testBuild_WithoutRowMapper() throws IllegalAccessException, NoSuchFieldException {
73+
BigQuery mockedBigQuery = prepareMockedBigQuery();
74+
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup());
75+
76+
QueryJobConfiguration expectedJobConfiguration = QueryJobConfiguration
77+
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 1")
78+
.build();
79+
80+
BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
81+
.bigQuery(mockedBigQuery)
82+
.jobConfiguration(expectedJobConfiguration)
83+
.targetType(PersonDto.class)
84+
.build();
85+
86+
Assertions.assertNotNull(reader);
87+
88+
BigQuery actualBigQuery = (BigQuery) handle
89+
.findVarHandle(BigQueryQueryItemReader.class, "bigQuery", BigQuery.class)
90+
.get(reader);
91+
92+
Converter<FieldValueList, PersonDto> actualRowMapper = (Converter<FieldValueList, PersonDto>) handle
93+
.findVarHandle(BigQueryQueryItemReader.class, "rowMapper", Converter.class)
94+
.get(reader);
95+
96+
QueryJobConfiguration actualJobConfiguration = (QueryJobConfiguration) handle
97+
.findVarHandle(BigQueryQueryItemReader.class, "jobConfiguration", QueryJobConfiguration.class)
98+
.get(reader);
99+
100+
Assertions.assertEquals(mockedBigQuery, actualBigQuery);
101+
Assertions.assertNotNull(actualRowMapper);
102+
Assertions.assertEquals(expectedJobConfiguration, actualJobConfiguration);
103+
}
104+
105+
@Test
106+
void testBuild() throws IllegalAccessException, NoSuchFieldException {
69107
BigQuery mockedBigQuery = prepareMockedBigQuery();
70108
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup());
71109

@@ -99,8 +137,19 @@ void testBuild_WithJobConfiguration() throws IllegalAccessException, NoSuchField
99137
Assertions.assertEquals(jobConfiguration, actualJobConfiguration);
100138
}
101139

102-
@Test
103-
void testBuild_NoQueryProvided() {
104-
Assertions.assertThrows(IllegalArgumentException.class, new BigQueryQueryItemReaderBuilder<>()::build);
140+
@ParameterizedTest
141+
@MethodSource("brokenBuilders")
142+
void testBuild_Exception(String expectedMessage, BigQueryQueryItemReaderBuilder<?> builder) {
143+
IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, builder::build);
144+
Assertions.assertEquals(expectedMessage, ex.getMessage());
145+
}
146+
147+
private static Stream<Arguments> brokenBuilders() {
148+
final class HumanDto {}
149+
return Stream.of(
150+
Arguments.of("No target type provided", new BigQueryQueryItemReaderBuilder<PersonDto>()),
151+
Arguments.of("Only Java record supported", new BigQueryQueryItemReaderBuilder<HumanDto>().targetType(HumanDto.class)),
152+
Arguments.of("No query provided", new BigQueryQueryItemReaderBuilder<PersonDto>().rowMapper(source -> null))
153+
);
105154
}
106155
}

0 commit comments

Comments
 (0)