Skip to content

Commit 13aebab

Browse files
committed
[bq] introduce BigQuery Storage Write API (JSON)
Signed-off-by: Dgray16 <vova235@gmail.com>
1 parent 053859b commit 13aebab

File tree

39 files changed

+1097
-226
lines changed

39 files changed

+1097
-226
lines changed

spring-batch-bigquery/README.adoc

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,41 @@
22

33
Spring Batch extension which contains an `ItemWriter` and `ItemReader` implementations for https://cloud.google.com/bigquery[BigQuery].
44

5-
`ItemWriter` supports next formats (https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]):
5+
`ItemWriter` support:
6+
7+
[cols="h,1,1"]
8+
|===
9+
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)]
10+
11+
|link:++https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported
12+
|link:++https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported |
13+
|===
14+
15+
`ItemReader` support:
16+
17+
[cols="h,1"]
18+
|===
19+
20+
|link:++https://en.wikipedia.org/wiki/JSON[JSON] |Supported
21+
|link:++https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported
22+
|===
623

7-
* https://en.wikipedia.org/wiki/Comma-separated_values[CSV]
8-
* https://en.wikipedia.org/wiki/JSON[JSON]
924

1025
Based on https://github.com/googleapis/java-bigquery[Java BigQuery].
1126

12-
== Example of `BigQueryCsvItemWriter`
27+
== Example of `BigQueryLoadJobCsvItemWriter`
1328

1429
[source,java]
1530
----
1631
@Bean
17-
BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
32+
BigQueryLoadJobCsvItemWriter<MyDto> bigQueryCsvWriter() {
1833
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
1934
.newBuilder(TableId.of("csv_dataset", "csv_table"))
2035
.setAutodetect(true)
2136
.setFormatOptions(FormatOptions.csv())
2237
.build();
2338
24-
return new BigQueryCsvItemWriterBuilder<MyDto>()
39+
return new BigQueryLoadJobCsvItemWriterBuilder<MyDto>()
2540
.bigQuery(bigQueryService)
2641
.writeChannelConfig(writeConfiguration)
2742
.build();

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.batch.extensions.bigquery.reader.builder;
1818

1919
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.BigQueryOptions;
2021
import com.google.cloud.bigquery.FieldValueList;
2122
import com.google.cloud.bigquery.QueryJobConfiguration;
2223
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
@@ -113,7 +114,7 @@ public BigQueryQueryItemReaderBuilder<T> targetType(final Class<T> targetType) {
113114
public BigQueryQueryItemReader<T> build() {
114115
final BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();
115116

116-
reader.setBigQuery(this.bigQuery);
117+
reader.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
117118

118119
if (this.rowMapper == null) {
119120
Assert.notNull(this.targetType, "No target type provided");
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.batch.extensions.bigquery.writer;
17+
package org.springframework.batch.extensions.bigquery.writer.loadjob;
1818

1919
import com.google.cloud.bigquery.*;
2020
import org.apache.commons.logging.Log;
2121
import org.apache.commons.logging.LogFactory;
22+
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;
2223
import org.springframework.batch.item.Chunk;
2324
import org.springframework.batch.item.ItemWriter;
2425
import org.springframework.beans.factory.InitializingBean;
@@ -34,13 +35,13 @@
3435
import java.util.function.Consumer;
3536

3637
/**
37-
* Base class that holds shared code for JSON and CSV writers.
38+
* Base class that holds shared code for load job JSON and CSV writers.
3839
*
3940
* @param <T> your DTO type
4041
* @author Volodymyr Perebykivskyi
4142
* @since 0.1.0
4243
*/
43-
public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, InitializingBean {
44+
public abstract class BigQueryLoadJobBaseItemWriter<T> implements ItemWriter<T>, InitializingBean {
4445

4546
/** Logger that can be reused */
4647
protected final Log logger = LogFactory.getLog(getClass());
@@ -71,7 +72,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, Initia
7172
/**
7273
* Fetches table from the provided configuration.
7374
*
74-
* @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig}
75+
* @return {@link Table} that is described in {@link BigQueryLoadJobBaseItemWriter#writeChannelConfig}
7576
*/
7677
protected Table getTable() {
7778
return this.bigQuery.getTable(this.writeChannelConfig.getDestinationTable());
@@ -119,8 +120,8 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
119120
final List<? extends T> items = chunk.getItems();
120121
doInitializeProperties(items);
121122

122-
if (this.logger.isDebugEnabled()) {
123-
this.logger.debug(String.format("Mapping %d elements", items.size()));
123+
if (logger.isDebugEnabled()) {
124+
logger.debug(String.format("Mapping %d elements", items.size()));
124125
}
125126

126127
doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
@@ -147,8 +148,8 @@ private ByteBuffer mapDataToBigQueryFormat(final List<? extends T> items) throws
147148
}
148149

149150
private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
150-
if (this.logger.isDebugEnabled()) {
151-
this.logger.debug("Writing data to BigQuery");
151+
if (logger.isDebugEnabled()) {
152+
logger.debug("Writing data to BigQuery");
152153
}
153154

154155
TableDataWriteChannel writeChannel = null;
@@ -174,8 +175,8 @@ private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
174175
}
175176
}
176177

177-
if (this.logger.isDebugEnabled()) {
178-
this.logger.debug(logMessage);
178+
if (logger.isDebugEnabled()) {
179+
logger.debug(logMessage);
179180
}
180181
}
181182
}

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java renamed to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.batch.extensions.bigquery.writer;
17+
package org.springframework.batch.extensions.bigquery.writer.loadjob.csv;
1818

1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectWriter;
2121
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
2222
import com.google.cloud.bigquery.FormatOptions;
2323
import com.google.cloud.bigquery.Table;
24+
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
2425
import org.springframework.core.convert.converter.Converter;
2526
import org.springframework.util.Assert;
2627
import org.springframework.util.ObjectUtils;
@@ -37,7 +38,7 @@
3738
* @since 0.2.0
3839
* @see <a href="https://en.wikipedia.org/wiki/Comma-separated_values">CSV</a>
3940
*/
40-
public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> {
41+
public class BigQueryLoadJobCsvItemWriter<T> extends BigQueryLoadJobBaseItemWriter<T> {
4142

4243
private Converter<T, byte[]> rowMapper;
4344
private ObjectWriter objectWriter;
Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,20 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.batch.extensions.bigquery.writer.builder;
17+
package org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder;
1818

1919
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.BigQueryOptions;
2021
import com.google.cloud.bigquery.DatasetInfo;
2122
import com.google.cloud.bigquery.Job;
2223
import com.google.cloud.bigquery.WriteChannelConfiguration;
23-
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
24+
import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter;
2425
import org.springframework.core.convert.converter.Converter;
2526

2627
import java.util.function.Consumer;
2728

2829
/**
29-
* A builder for {@link BigQueryCsvItemWriter}.
30+
* A builder for {@link BigQueryLoadJobCsvItemWriter}.
3031
*
3132
* @param <T> your DTO type
3233
* @author Volodymyr Perebykivskyi
@@ -47,7 +48,7 @@ public class BigQueryCsvItemWriterBuilder<T> {
4748
*
4849
* @param rowMapper your row mapper
4950
* @return {@link BigQueryCsvItemWriterBuilder}
50-
* @see BigQueryCsvItemWriter#setRowMapper(Converter)
51+
* @see BigQueryLoadJobCsvItemWriter#setRowMapper(Converter)
5152
*/
5253
public BigQueryCsvItemWriterBuilder<T> rowMapper(Converter<T, byte[]> rowMapper) {
5354
this.rowMapper = rowMapper;
@@ -59,7 +60,7 @@ public BigQueryCsvItemWriterBuilder<T> rowMapper(Converter<T, byte[]> rowMapper)
5960
*
6061
* @param datasetInfo BigQuery dataset info
6162
* @return {@link BigQueryCsvItemWriterBuilder}
62-
* @see BigQueryCsvItemWriter#setDatasetInfo(DatasetInfo)
63+
* @see BigQueryLoadJobCsvItemWriter#setDatasetInfo(DatasetInfo)
6364
*/
6465
public BigQueryCsvItemWriterBuilder<T> datasetInfo(DatasetInfo datasetInfo) {
6566
this.datasetInfo = datasetInfo;
@@ -71,7 +72,7 @@ public BigQueryCsvItemWriterBuilder<T> datasetInfo(DatasetInfo datasetInfo) {
7172
*
7273
* @param consumer your consumer
7374
* @return {@link BigQueryCsvItemWriterBuilder}
74-
* @see BigQueryCsvItemWriter#setJobConsumer(Consumer)
75+
* @see BigQueryLoadJobCsvItemWriter#setJobConsumer(Consumer)
7576
*/
7677
public BigQueryCsvItemWriterBuilder<T> jobConsumer(Consumer<Job> consumer) {
7778
this.jobConsumer = consumer;
@@ -83,7 +84,7 @@ public BigQueryCsvItemWriterBuilder<T> jobConsumer(Consumer<Job> consumer) {
8384
*
8485
* @param configuration BigQuery channel configuration
8586
* @return {@link BigQueryCsvItemWriterBuilder}
86-
* @see BigQueryCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
87+
* @see BigQueryLoadJobCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
8788
*/
8889
public BigQueryCsvItemWriterBuilder<T> writeChannelConfig(WriteChannelConfiguration configuration) {
8990
this.writeChannelConfig = configuration;
@@ -95,25 +96,26 @@ public BigQueryCsvItemWriterBuilder<T> writeChannelConfig(WriteChannelConfigurat
9596
*
9697
* @param bigQuery BigQuery service
9798
* @return {@link BigQueryCsvItemWriterBuilder}
98-
* @see BigQueryCsvItemWriter#setBigQuery(BigQuery)
99+
* @see BigQueryLoadJobCsvItemWriter#setBigQuery(BigQuery)
99100
*/
100101
public BigQueryCsvItemWriterBuilder<T> bigQuery(BigQuery bigQuery) {
101102
this.bigQuery = bigQuery;
102103
return this;
103104
}
104105

105106
/**
106-
* Please remember about {@link BigQueryCsvItemWriter#afterPropertiesSet()}.
107+
* Please remember about {@link BigQueryLoadJobCsvItemWriter#afterPropertiesSet()}.
107108
*
108-
* @return {@link BigQueryCsvItemWriter}
109+
* @return {@link BigQueryLoadJobCsvItemWriter}
109110
*/
110-
public BigQueryCsvItemWriter<T> build() {
111-
BigQueryCsvItemWriter<T> writer = new BigQueryCsvItemWriter<>();
111+
public BigQueryLoadJobCsvItemWriter<T> build() {
112+
BigQueryLoadJobCsvItemWriter<T> writer = new BigQueryLoadJobCsvItemWriter<>();
113+
114+
writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
112115

113116
writer.setRowMapper(this.rowMapper);
114117
writer.setWriteChannelConfig(this.writeChannelConfig);
115118
writer.setJobConsumer(this.jobConsumer);
116-
writer.setBigQuery(this.bigQuery);
117119
writer.setDatasetInfo(this.datasetInfo);
118120

119121
return writer;

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java renamed to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.batch.extensions.bigquery.writer;
17+
package org.springframework.batch.extensions.bigquery.writer.loadjob.json;
1818

1919
import com.google.cloud.bigquery.FormatOptions;
2020
import com.google.cloud.bigquery.Table;
21+
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
2122
import org.springframework.batch.item.json.JsonObjectMarshaller;
2223
import org.springframework.util.Assert;
2324
import org.springframework.util.ObjectUtils;
@@ -28,14 +29,14 @@
2829
import java.util.function.Predicate;
2930

3031
/**
31-
* JSON writer for BigQuery.
32+
* JSON writer for BigQuery using Load Job.
3233
*
3334
* @param <T> your DTO type
3435
* @author Volodymyr Perebykivskyi
3536
* @since 0.2.0
3637
* @see <a href="https://en.wikipedia.org/wiki/JSON">JSON</a>
3738
*/
38-
public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> {
39+
public class BigQueryLoadJobJsonItemWriter<T> extends BigQueryLoadJobBaseItemWriter<T> {
3940

4041
private static final String LF = "\n";
4142

@@ -59,12 +60,12 @@ protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
5960

6061
@Override
6162
protected void performFormatSpecificChecks() {
62-
Assert.notNull(this.marshaller, "Marshaller is mandatory");
63+
Assert.notNull(this.marshaller, "Marshaller must be provided");
6364

6465
Table table = getTable();
6566

6667
if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
67-
if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
68+
if (tableHasDefinedSchema(table) && logger.isWarnEnabled()) {
6869
logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
6970
}
7071
} else {

0 commit comments

Comments
 (0)