diff --git a/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar b/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar index 6449c997..d215106f 100644 Binary files a/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar and b/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar differ diff --git a/src/.classpath b/src/.classpath new file mode 100644 index 00000000..235189e8 --- /dev/null +++ b/src/.classpath @@ -0,0 +1,7 @@ + + + + + + + diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 00000000..ae3c1726 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/src/.project b/src/.project new file mode 100644 index 00000000..0f6f6a76 --- /dev/null +++ b/src/.project @@ -0,0 +1,17 @@ + + + src + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala index aff06f63..2c2ce7df 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala @@ -66,8 +66,12 @@ object CosmosDBConfig { val RootPropertyToSave = "rootpropertytosave" val BulkImport = "bulkimport" val BulkUpdate = "bulkupdate" + val BulkRead = "bulkread" val MaxMiniBatchUpdateCount = "maxminibatchupdatecount" + val MaxBulkReadBatchCount = "maxbulkreadbatchcount" val ClientInitDelay = "clientinitdelay" + val RangeQuery = "rangequery" + // Writing progress tracking val WritingBatchId = "writingbatchid" @@ -121,8 +125,10 @@ object CosmosDBConfig { val DefaultStreamingSlowSourceDelayMs = 1 val DefaultBulkImport = true val DefaultBulkUpdate = false + val DefaultBulkRead = false val DefaultMaxMiniBatchUpdateCount = 500 val DefaultClientInitDelay = 10 + val DefaultBulkReadBatchCount = 100 val DefaultAdlUseGuidForId = true val DefaultAdlUseGuidForPk = true diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala index 06d42b9c..222dbdfa 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala @@ -26,4 +26,6 @@ import org.apache.spark.Partition case class CosmosDBPartition(index: Int, partitionCount: Int, - partitionKeyRangeId: Int) extends Partition + partitionKeyRangeId: Int, + start: Char, + end: Char) extends Partition diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala index 00ab5e9e..0f6df3f6 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala @@ -41,56 +41,120 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with LoggingTrait { var connection: CosmosDBConnection = new CosmosDBConnection(config) var partitionKeyRanges = connection.getAllPartitions logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") - Array.tabulate(partitionKeyRanges.length){ - i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt) + if (config.get(CosmosDBConfig.RangeQuery).isDefined) { + val a1 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i') + } + val a2 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p') + } + val a3 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z') + } + val a4 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9') + } +// val a5 = Array.tabulate(partitionKeyRanges.length) { +// i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{') +// } +// val a6 = Array.tabulate(partitionKeyRanges.length) { +// i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4') +// } +// val a7 = Array.tabulate(partitionKeyRanges.length) { +// i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7') +// } +// val a8 = Array.tabulate(partitionKeyRanges.length) { +// i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':') +// } + + logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") + + (a1 ++ a2 ++ a3 ++ a4).toArray[Partition] + } + else { + Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z') + } } } - def computePartitions(config: Config, - requiredColumns: Array[String] = Array(), - filters: Array[Filter] = Array(), - hadoopConfig: mutable.Map[String, String]): Array[Partition] = { - val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined - var connection: CosmosDBConnection = new CosmosDBConnection(config) - if (adlImport) { - // ADL source - val hdfsUtils = new HdfsUtils(hadoopConfig.toMap) - val adlConnection: ADLConnection = ADLConnection(config) - val adlFiles = adlConnection.getFiles - val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath) - val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection) - val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId) - val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount) - .getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString) - .toInt - logDebug(s"The Adl folder has ${adlFiles.size()} files") - val partitions = new ListBuffer[ADLFilePartition] - var partitionIndex = 0 - var i = 0 - while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) { - var processed = true - if (adlCheckpointPath.isDefined) { - processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get) - } else if (adlCosmosDBFileStoreCollection.isDefined) { - val dbName = config.get[String](CosmosDBConfig.Database).get - val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}" - processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get) + def computePartitions(config: Config, + requiredColumns: Array[String] = Array(), + filters: Array[Filter] = Array(), + hadoopConfig: mutable.Map[String, String]): Array[Partition] = { + val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined + var connection: CosmosDBConnection = new CosmosDBConnection(config) + if (adlImport) { + // ADL source + val hdfsUtils = new HdfsUtils(hadoopConfig.toMap) + val adlConnection: ADLConnection = ADLConnection(config) + val adlFiles = adlConnection.getFiles + val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath) + val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection) + val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId) + val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount) + .getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString) + .toInt + logDebug(s"The Adl folder has ${adlFiles.size()} files") + val partitions = new ListBuffer[ADLFilePartition] + var partitionIndex = 0 + var i = 0 + while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) { + var processed = true + if (adlCheckpointPath.isDefined) { + processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get) + } else if (adlCosmosDBFileStoreCollection.isDefined) { + val dbName = config.get[String](CosmosDBConfig.Database).get + val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}" + processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get) + } + if (!processed) { + partitions += ADLFilePartition(partitionIndex, adlFiles.get(i)) + partitionIndex += 1 + } + i += 1 } - if (!processed) { - partitions += ADLFilePartition(partitionIndex, adlFiles.get(i)) - partitionIndex += 1 + partitions.toArray + } else { + // CosmosDB source + var query: String = FilterConverter.createQueryString(requiredColumns, filters) + var partitionKeyRanges = connection.getAllPartitions(query) + + if (config.get(CosmosDBConfig.RangeQuery).isDefined) { + val a1 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i') + } + val a2 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p') + } + val a3 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z') + } + val a4 = Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9') + } + // val a5 = Array.tabulate(partitionKeyRanges.length) { + // i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{') + // } + // val a6 = Array.tabulate(partitionKeyRanges.length) { + // i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4') + // } + // val a7 = Array.tabulate(partitionKeyRanges.length) { + // i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7') + // } + // val a8 = Array.tabulate(partitionKeyRanges.length) { + // i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':') + // } + + logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") + + (a1 ++ a2 ++ a3 ++ a4).toArray[Partition] + } + else { + Array.tabulate(partitionKeyRanges.length) { + i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z') + } } - i += 1 - } - partitions.toArray - } else { - // CosmosDB source - var query: String = FilterConverter.createQueryString(requiredColumns, filters) - var partitionKeyRanges = connection.getAllPartitions(query) - logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") - Array.tabulate(partitionKeyRanges.length) { - i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt) } } - } -} +} \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 380cb72d..97fd22ef 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -32,11 +32,17 @@ import com.microsoft.azure.cosmosdb.spark.schema._ import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, LoggingTrait} import com.microsoft.azure.documentdb._ +import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter import org.apache.commons.lang3.StringUtils import org.apache.spark._ import org.apache.spark.sql.sources.Filter +import scala.concurrent.{Await, Future} import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import scala.collection.JavaConversions._ object CosmosDBRDDIterator { @@ -186,10 +192,89 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], .get[String](CosmosDBConfig.QueryCustom) .getOrElse(FilterConverter.createQueryString(requiredColumns, filters)) logDebug(s"CosmosDBRDDIterator::LazyReader, convert to predicate: $queryString") - - if (queryString == FilterConverter.defaultQuery) { + + val bulkRead = config + .get[String](CosmosDBConfig.BulkRead) + if(bulkRead.isDefined) + { + val maxBatchSize = config + .get[String](CosmosDBConfig.MaxBulkReadBatchCount) + .getOrElse(CosmosDBConfig.DefaultBulkReadBatchCount.toString) + .toInt + + var collectionThroughput: Int = 0 + collectionThroughput = connection.getCollectionThroughput + val importer: DocumentBulkImporter = connection.getDocumentBulkImporter(collectionThroughput) + importer.readDocuments(partition.partitionKeyRangeId.toString, maxBatchSize) + } + else if (queryString == FilterConverter.defaultQuery) { // If there is no filters, read feed should be used - connection.readDocuments(feedOpts) + val rangeQuery = config + .get[String](CosmosDBConfig.RangeQuery) + + if(rangeQuery.isDefined) { + + val rangeQueryString = s"SELECT * FROM data o WHERE o.id >= '${partition.start}' and o.id <= '${partition.end}'" +// val queryString2 = "SELECT * FROM data o WHERE o.partitionKey >= 'e' and o.partitionKey <= 'i'" +// val queryString3 = "SELECT * FROM data o WHERE o.partitionKey >= 'i' and o.partitionKey <= 'l'" +// val queryString4 = "SELECT * FROM data o WHERE o.partitionKey >= 'l' and o.partitionKey <= 'p'" +// val queryString5 = "SELECT * FROM data o WHERE o.partitionKey >= 'p' and o.partitionKey <= 't'" +// val queryString6 = "SELECT * FROM data o WHERE o.partitionKey >= 't' and o.partitionKey <= '{'" +// val queryString7 = "SELECT * FROM data o WHERE o.partitionKey >= '0' and o.partitionKey <= '4'" +// val queryString8 = "SELECT * FROM data o WHERE o.partitionKey >= '4' and o.partitionKey <= '7'" +// val queryString9 = "SELECT * FROM data o WHERE o.partitionKey >= '7' and o.partitionKey <= ':'" +// +// +// val query1 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString1, feedOpts) +// } +// val query2 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString2, feedOpts) +// } +// val query3 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString3, feedOpts) +// } +// val query4 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString4, feedOpts) +// } +// val query5 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString5, feedOpts) +// } +// val query6 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString6, feedOpts) +// } +// val query7 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString7, feedOpts) +// } +// val query8 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString8, feedOpts) +// } +// val query9 = Future[Iterator[Document]] { +// connection.queryDocuments(queryString9, feedOpts) +// } +// +// val combinedFuture = +// for { +// r1 <- query1 +// r2 <- query2 +// r3 <- query3 +// r4 <- query4 +// r5 <- query5 +// r6 <- query6 +// r7 <- query7 +// r8 <- query8 +// r9 <- query9 +// } yield (r1, r2, r3, r4, r5, r6, r7, r8, r9) +// +// val (r1, r2, r3, r4, r5, r6, r7, r8, r9) = Await.result(combinedFuture, Duration.Inf) +// r1 ++ r2 ++ r3 ++ r4 ++ r5 ++ r6 ++ r7 ++ r8 ++ r9 + connection.queryDocuments(rangeQueryString, feedOpts) + } + else + { + connection.readDocuments(feedOpts) + } + } else { connection.queryDocuments(queryString, feedOpts) } @@ -355,3 +440,5 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], } } } + +