Skip to content

Commit 33a8f5e

Browse files
authored
Merge pull request #184 from data-integrations/CDAP-19447_bug
CDAP-19447- Reducing bigQuery API calls by fetching all tables at once
2 parents 32b847b + ec064fc commit 33a8f5e

File tree

2 files changed

+79
-10
lines changed

2 files changed

+79
-10
lines changed

src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.bigquery.JobId;
2626
import com.google.cloud.bigquery.JobInfo;
2727
import com.google.cloud.bigquery.QueryJobConfiguration;
28+
import com.google.cloud.bigquery.Table;
2829
import com.google.cloud.bigquery.TableId;
2930
import com.google.cloud.bigquery.TableResult;
3031
import com.google.common.collect.Iterables;
@@ -42,6 +43,7 @@
4243
import java.util.Iterator;
4344
import java.util.List;
4445
import java.util.Map;
46+
import java.util.Optional;
4547
import java.util.Set;
4648
import java.util.UUID;
4749
import java.util.regex.Pattern;
@@ -100,13 +102,23 @@ static long getMaximumExistingSequenceNumber(Set<SourceTable> allTables, String
100102
static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables, String project,
101103
@Nullable String datasetName, BigQuery bigQuery,
102104
EncryptionConfiguration encryptionConfiguration) {
105+
SourceTable table0 = allTables.stream().findFirst().get();
106+
Set<TableId> existingTableIDs = new HashSet<>();
107+
String dataset = datasetName != null ? normalizeDatasetName(datasetName) :
108+
normalizeDatasetName(table0.getDatabase());
109+
if (bigQuery.getDataset(dataset) != null) {
110+
for (Table table : bigQuery.listTables(dataset).iterateAll()) {
111+
existingTableIDs.add(table.getTableId());
112+
}
113+
}
114+
103115
StringBuilder builder = new StringBuilder();
104116
builder.append("SELECT MAX(max_sequence_num) FROM (");
105117
List<String> maxSequenceNumQueryPerTable = new ArrayList<>();
106118
for (SourceTable table : allTables) {
107119
TableId tableId = TableId.of(project, datasetName != null ? normalizeDatasetName(datasetName) :
108120
normalizeDatasetName(table.getDatabase()), normalizeTableName(table.getTable()));
109-
if (bigQuery.getTable(tableId) != null) {
121+
if (existingTableIDs.contains(tableId)) {
110122
maxSequenceNumQueryPerTable.add(String.format("SELECT MAX(_sequence_num) as max_sequence_num FROM %s",
111123
wrapInBackTick(tableId.getDataset(), tableId.getTable())));
112124
}

src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package io.cdap.delta.bigquery;
1919

20+
import com.google.api.gax.paging.Page;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.GoogleCredentials;
23+
import com.google.cloud.PageImpl;
2224
import com.google.cloud.bigquery.BigQuery;
2325
import com.google.cloud.bigquery.BigQueryOptions;
26+
import com.google.cloud.bigquery.Dataset;
2427
import com.google.cloud.bigquery.DatasetId;
2528
import com.google.cloud.bigquery.DatasetInfo;
2629
import com.google.cloud.bigquery.Field;
@@ -32,6 +35,7 @@
3235
import com.google.cloud.bigquery.TableDefinition;
3336
import com.google.cloud.bigquery.TableId;
3437
import com.google.cloud.bigquery.TableInfo;
38+
import com.google.cloud.bigquery.TableResult;
3539
import com.google.common.base.Strings;
3640
import io.cdap.delta.api.SourceTable;
3741
import org.junit.AfterClass;
@@ -52,7 +56,10 @@
5256
import java.io.FileInputStream;
5357
import java.io.InputStream;
5458
import java.nio.charset.StandardCharsets;
59+
import java.util.ArrayList;
60+
import java.util.Arrays;
5561
import java.util.HashSet;
62+
import java.util.List;
5663
import java.util.Set;
5764

5865
import static org.junit.Assert.assertEquals;
@@ -69,19 +76,21 @@ public class BigQueryUtilsTest {
6976

7077
private static final String DATASET = "demodataset";
7178
private static final String TABLE_PREFIX = "demotable_";
79+
private static final String PROJECT = "testproject";
7280

7381
@PrepareForTest(BigQueryUtils.class)
7482
@RunWith(PowerMockRunner.class)
7583
public static class LocalIndependentTests {
7684
private BigQuery bigQueryMock;
77-
private Table tableMock;
7885

7986
@Before
8087
public void init() throws Exception {
8188
//Mocks
8289
bigQueryMock = Mockito.mock(BigQuery.class);
83-
tableMock = Mockito.mock(Table.class);
90+
Table tableMock = Mockito.mock(Table.class);
91+
Dataset datasetMock = Mockito.mock(Dataset.class);
8492
Mockito.when(bigQueryMock.getTable(ArgumentMatchers.any())).thenReturn(tableMock);
93+
Mockito.when(bigQueryMock.getDataset("demodataset")).thenReturn(datasetMock);
8594
PowerMockito.spy(BigQueryUtils.class);
8695

8796
//Stubs
@@ -155,7 +164,8 @@ public void testNormalizeFieldName() {
155164
public void testGetMaximumExistingSequenceNumberZeroInvocations() throws Exception {
156165
// Zero Tables
157166
Set<SourceTable> allTables = generateSourceTableSet(0);
158-
long tableResult0 = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
167+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(0));
168+
long tableResult0 = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
159169
null, bigQueryMock, null, 1000);
160170
assertEquals(0L, tableResult0);
161171
PowerMockito.verifyPrivate(BigQueryUtils.class, times(0))
@@ -169,7 +179,8 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep
169179

170180
// Subtest : One Table
171181
Set<SourceTable> allTables = generateSourceTableSet(1);
172-
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
182+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1));
183+
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
173184
null, bigQueryMock, null, 1000);
174185
assertEquals(1L, tableResult);
175186
PowerMockito.verifyPrivate(BigQueryUtils.class, times(1))
@@ -178,7 +189,8 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep
178189

179190
// Subtest2 : Ten Tables
180191
allTables = generateSourceTableSet(10);
181-
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
192+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(10));
193+
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
182194
null, bigQueryMock, null, 1000);
183195
assertEquals(2L, tableResult);
184196
PowerMockito.verifyPrivate(BigQueryUtils.class, times(2))
@@ -187,7 +199,8 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep
187199

188200
// Subtest3 : 1000 Tables
189201
allTables = generateSourceTableSet(1000);
190-
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
202+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1000));
203+
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
191204
null, bigQueryMock, null, 1000);
192205
assertEquals(3L, tableResult);
193206
PowerMockito.verifyPrivate(BigQueryUtils.class, times(3))
@@ -201,7 +214,8 @@ public void testGetMaximumExistingSequenceNumberDoubleInvocations() throws Excep
201214

202215
//Subtest1 : 1001 Tables : Should call bigquery 2 times. 1000+1
203216
Set<SourceTable> allTables = generateSourceTableSet(1001);
204-
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
217+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1001));
218+
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
205219
null, bigQueryMock, null, 1000);
206220
assertEquals(2L, tableResult);
207221
PowerMockito.verifyPrivate(BigQueryUtils.class, times(2))
@@ -210,7 +224,8 @@ public void testGetMaximumExistingSequenceNumberDoubleInvocations() throws Excep
210224

211225
//Subtest2 : 2000 Tables : Should call bigquery 2 times. 1000+1000
212226
allTables = generateSourceTableSet(2000);
213-
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
227+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(2000));
228+
tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
214229
null, bigQueryMock, null, 1000);
215230
assertEquals(4L, tableResult);
216231
PowerMockito.verifyPrivate(BigQueryUtils.class, times(4))
@@ -224,7 +239,8 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep
224239

225240
//Subtest1 : 2500 Tables : Should call bigquery 3 times. 1000+1000+500
226241
Set<SourceTable> allTables = generateSourceTableSet(2500);
227-
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, "testproject",
242+
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(2500));
243+
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
228244
null, bigQueryMock, null, 1000);
229245
assertEquals(3L, tableResult);
230246
PowerMockito.verifyPrivate(BigQueryUtils.class, times(3))
@@ -350,4 +366,45 @@ private static Set<SourceTable> generateSourceTableSet(int noOfTables) {
350366
}
351367
return allTables;
352368
}
369+
370+
private static Page<Table> generateBQTablesPage(int num) {
371+
Page<Table> pg = new Page<Table>() {
372+
@Override
373+
public boolean hasNextPage() {
374+
return false;
375+
}
376+
377+
@Override
378+
public String getNextPageToken() {
379+
return null;
380+
}
381+
382+
@Override
383+
public Page<Table> getNextPage() {
384+
return null;
385+
}
386+
387+
@Override
388+
public Iterable<Table> iterateAll() {
389+
List<Table> tableList = new ArrayList<>();
390+
391+
for (int i = 1; i <= num; i++) {
392+
// Create Table
393+
Table tableMock2 = Mockito.mock(Table.class);
394+
395+
String tableName = TABLE_PREFIX + i;
396+
TableId tableId = TableId.of(PROJECT, DATASET, tableName);
397+
Mockito.when(tableMock2.getTableId()).thenReturn(tableId);
398+
tableList.add(tableMock2);
399+
}
400+
return tableList;
401+
}
402+
403+
@Override
404+
public Iterable<Table> getValues() {
405+
return null;
406+
}
407+
};
408+
return pg;
409+
}
353410
}

0 commit comments

Comments
 (0)