diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java index c6a393cfeff..23dc48eec45 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java @@ -38,7 +38,6 @@ import datawave.query.config.ShardQueryConfiguration; import datawave.query.tables.BatchScannerSession; import datawave.query.tables.ScannerFactory; -import datawave.query.tables.ShardQueryLogic; import datawave.query.tables.async.ScannerChunk; import datawave.query.tables.async.event.VisitorFunction; import datawave.query.tables.stats.ScanSessionStats; @@ -223,7 +222,7 @@ public void close() { */ @Override public BatchScanner createBatchScanner(ShardQueryConfiguration config, ScannerFactory scannerFactory, QueryData qd) throws TableNotFoundException { - return ShardQueryLogic.createBatchScanner(config, scannerFactory, qd); + return scannerFactory.newScanner(config, qd); } @Override diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java index 47ea22180fa..c2298305c08 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java @@ -13,6 +13,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableNotFoundException; @@ -25,6 +26,7 @@ import com.google.common.base.Preconditions; import datawave.core.query.configuration.GenericQueryConfiguration; +import datawave.core.query.configuration.QueryData; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.microservice.query.Query; import datawave.mr.bulk.BulkInputFormat; @@ -515,6 +517,30 @@ public ScannerBase newRfileScanner(String tableName, Set auths, } } + public BatchScanner newScanner(ShardQueryConfiguration config, QueryData qd) throws TableNotFoundException { + final BatchScanner bs = this.newScanner(config.getShardTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery()); + + if (log.isTraceEnabled()) { + log.trace("Running with " + config.getAuthorizations() + " and " + config.getNumQueryThreads() + " threads: " + qd); + } + + bs.setRanges(qd.getRanges()); + + for (IteratorSetting cfg : qd.getSettings()) { + bs.addScanIterator(cfg); + } + + if (config.getTableConsistencyLevels().containsKey(config.getTableName())) { + bs.setConsistencyLevel(config.getTableConsistencyLevels().get(config.getTableName())); + } + + if (config.getTableHints().containsKey(config.getTableName())) { + bs.setExecutionHints(config.getTableHints().get(config.getTableName())); + } + + return bs; + } + /** * Apply table-specific scanner configs to the provided scanner base object using the table name as the key * diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java index 3257bbddd6a..6d55e005f97 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java @@ -23,8 +23,6 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; @@ -277,31 +275,6 @@ public ShardQueryLogic(ShardQueryLogic other) { } } - public static BatchScanner createBatchScanner(ShardQueryConfiguration config, ScannerFactory scannerFactory, QueryData qd) throws TableNotFoundException { - final BatchScanner bs = scannerFactory.newScanner(config.getShardTableName(), config.getAuthorizations(), config.getNumQueryThreads(), - config.getQuery()); - - if (log.isTraceEnabled()) { - log.trace("Running with " + config.getAuthorizations() + " and " + config.getNumQueryThreads() + " threads: " + qd); - } - - bs.setRanges(qd.getRanges()); - - for (IteratorSetting cfg : qd.getSettings()) { - bs.addScanIterator(cfg); - } - - if (config.getTableConsistencyLevels().containsKey(config.getTableName())) { - bs.setConsistencyLevel(config.getTableConsistencyLevels().get(config.getTableName())); - } - - if (config.getTableHints().containsKey(config.getTableName())) { - bs.setExecutionHints(config.getTableHints().get(config.getTableName())); - } - - return bs; - } - @Override public GenericQueryConfiguration initialize(AccumuloClient client, Query settings, Set auths) throws Exception { // whenever we reinitialize, ensure we have a fresh transformer