diff --git a/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar b/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar
index 6449c997..d215106f 100644
Binary files a/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar and b/releases/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT/azure-cosmosdb-spark_2.1.0_2.11-0.0.5-SNAPSHOT-uber.jar differ
diff --git a/src/.classpath b/src/.classpath
new file mode 100644
index 00000000..235189e8
--- /dev/null
+++ b/src/.classpath
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
diff --git a/src/.gitignore b/src/.gitignore
new file mode 100644
index 00000000..ae3c1726
--- /dev/null
+++ b/src/.gitignore
@@ -0,0 +1 @@
+/bin/
diff --git a/src/.project b/src/.project
new file mode 100644
index 00000000..0f6f6a76
--- /dev/null
+++ b/src/.project
@@ -0,0 +1,17 @@
+
+
+ src
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+
+
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
index aff06f63..2c2ce7df 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
@@ -66,8 +66,12 @@ object CosmosDBConfig {
val RootPropertyToSave = "rootpropertytosave"
val BulkImport = "bulkimport"
val BulkUpdate = "bulkupdate"
+ val BulkRead = "bulkread"
val MaxMiniBatchUpdateCount = "maxminibatchupdatecount"
+ val MaxBulkReadBatchCount = "maxbulkreadbatchcount"
val ClientInitDelay = "clientinitdelay"
+ val RangeQuery = "rangequery"
+
// Writing progress tracking
val WritingBatchId = "writingbatchid"
@@ -121,8 +125,10 @@ object CosmosDBConfig {
val DefaultStreamingSlowSourceDelayMs = 1
val DefaultBulkImport = true
val DefaultBulkUpdate = false
+ val DefaultBulkRead = false
val DefaultMaxMiniBatchUpdateCount = 500
val DefaultClientInitDelay = 10
+ val DefaultBulkReadBatchCount = 100
val DefaultAdlUseGuidForId = true
val DefaultAdlUseGuidForPk = true
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala
index 06d42b9c..222dbdfa 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartition.scala
@@ -26,4 +26,6 @@ import org.apache.spark.Partition
case class CosmosDBPartition(index: Int,
partitionCount: Int,
- partitionKeyRangeId: Int) extends Partition
+ partitionKeyRangeId: Int,
+ start: Char,
+ end: Char) extends Partition
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
index 00ab5e9e..0f6df3f6 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
@@ -41,56 +41,120 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with LoggingTrait {
var connection: CosmosDBConnection = new CosmosDBConnection(config)
var partitionKeyRanges = connection.getAllPartitions
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
- Array.tabulate(partitionKeyRanges.length){
- i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt)
+ if (config.get(CosmosDBConfig.RangeQuery).isDefined) {
+ val a1 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i')
+ }
+ val a2 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p')
+ }
+ val a3 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z')
+ }
+ val a4 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9')
+ }
+// val a5 = Array.tabulate(partitionKeyRanges.length) {
+// i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{')
+// }
+// val a6 = Array.tabulate(partitionKeyRanges.length) {
+// i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4')
+// }
+// val a7 = Array.tabulate(partitionKeyRanges.length) {
+// i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7')
+// }
+// val a8 = Array.tabulate(partitionKeyRanges.length) {
+// i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':')
+// }
+
+ logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
+
+ (a1 ++ a2 ++ a3 ++ a4).toArray[Partition]
+ }
+ else {
+ Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z')
+ }
}
}
- def computePartitions(config: Config,
- requiredColumns: Array[String] = Array(),
- filters: Array[Filter] = Array(),
- hadoopConfig: mutable.Map[String, String]): Array[Partition] = {
- val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined
- var connection: CosmosDBConnection = new CosmosDBConnection(config)
- if (adlImport) {
- // ADL source
- val hdfsUtils = new HdfsUtils(hadoopConfig.toMap)
- val adlConnection: ADLConnection = ADLConnection(config)
- val adlFiles = adlConnection.getFiles
- val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath)
- val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection)
- val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId)
- val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount)
- .getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString)
- .toInt
- logDebug(s"The Adl folder has ${adlFiles.size()} files")
- val partitions = new ListBuffer[ADLFilePartition]
- var partitionIndex = 0
- var i = 0
- while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) {
- var processed = true
- if (adlCheckpointPath.isDefined) {
- processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get)
- } else if (adlCosmosDBFileStoreCollection.isDefined) {
- val dbName = config.get[String](CosmosDBConfig.Database).get
- val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}"
- processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get)
+ def computePartitions(config: Config,
+ requiredColumns: Array[String] = Array(),
+ filters: Array[Filter] = Array(),
+ hadoopConfig: mutable.Map[String, String]): Array[Partition] = {
+ val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined
+ var connection: CosmosDBConnection = new CosmosDBConnection(config)
+ if (adlImport) {
+ // ADL source
+ val hdfsUtils = new HdfsUtils(hadoopConfig.toMap)
+ val adlConnection: ADLConnection = ADLConnection(config)
+ val adlFiles = adlConnection.getFiles
+ val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath)
+ val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection)
+ val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId)
+ val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount)
+ .getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString)
+ .toInt
+ logDebug(s"The Adl folder has ${adlFiles.size()} files")
+ val partitions = new ListBuffer[ADLFilePartition]
+ var partitionIndex = 0
+ var i = 0
+ while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) {
+ var processed = true
+ if (adlCheckpointPath.isDefined) {
+ processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get)
+ } else if (adlCosmosDBFileStoreCollection.isDefined) {
+ val dbName = config.get[String](CosmosDBConfig.Database).get
+ val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}"
+ processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get)
+ }
+ if (!processed) {
+ partitions += ADLFilePartition(partitionIndex, adlFiles.get(i))
+ partitionIndex += 1
+ }
+ i += 1
}
- if (!processed) {
- partitions += ADLFilePartition(partitionIndex, adlFiles.get(i))
- partitionIndex += 1
+ partitions.toArray
+ } else {
+ // CosmosDB source
+ var query: String = FilterConverter.createQueryString(requiredColumns, filters)
+ var partitionKeyRanges = connection.getAllPartitions(query)
+
+ if (config.get(CosmosDBConfig.RangeQuery).isDefined) {
+ val a1 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i')
+ }
+ val a2 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p')
+ }
+ val a3 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z')
+ }
+ val a4 = Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9')
+ }
+ // val a5 = Array.tabulate(partitionKeyRanges.length) {
+ // i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{')
+ // }
+ // val a6 = Array.tabulate(partitionKeyRanges.length) {
+ // i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4')
+ // }
+ // val a7 = Array.tabulate(partitionKeyRanges.length) {
+ // i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7')
+ // }
+ // val a8 = Array.tabulate(partitionKeyRanges.length) {
+ // i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':')
+ // }
+
+ logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
+
+ (a1 ++ a2 ++ a3 ++ a4).toArray[Partition]
+ }
+ else {
+ Array.tabulate(partitionKeyRanges.length) {
+ i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z')
+ }
}
- i += 1
- }
- partitions.toArray
- } else {
- // CosmosDB source
- var query: String = FilterConverter.createQueryString(requiredColumns, filters)
- var partitionKeyRanges = connection.getAllPartitions(query)
- logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
- Array.tabulate(partitionKeyRanges.length) {
- i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt)
}
}
- }
-}
+}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
index 380cb72d..97fd22ef 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
@@ -32,11 +32,17 @@ import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, LoggingTrait}
import com.microsoft.azure.documentdb._
+import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter
import org.apache.commons.lang3.StringUtils
import org.apache.spark._
import org.apache.spark.sql.sources.Filter
+import scala.concurrent.{Await, Future}
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import scala.collection.JavaConversions._
object CosmosDBRDDIterator {
@@ -186,10 +192,89 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
.get[String](CosmosDBConfig.QueryCustom)
.getOrElse(FilterConverter.createQueryString(requiredColumns, filters))
logDebug(s"CosmosDBRDDIterator::LazyReader, convert to predicate: $queryString")
-
- if (queryString == FilterConverter.defaultQuery) {
+
+ val bulkRead = config
+ .get[String](CosmosDBConfig.BulkRead)
+ if(bulkRead.isDefined)
+ {
+ val maxBatchSize = config
+ .get[String](CosmosDBConfig.MaxBulkReadBatchCount)
+ .getOrElse(CosmosDBConfig.DefaultBulkReadBatchCount.toString)
+ .toInt
+
+ var collectionThroughput: Int = 0
+ collectionThroughput = connection.getCollectionThroughput
+ val importer: DocumentBulkImporter = connection.getDocumentBulkImporter(collectionThroughput)
+ importer.readDocuments(partition.partitionKeyRangeId.toString, maxBatchSize)
+ }
+ else if (queryString == FilterConverter.defaultQuery) {
// If there is no filters, read feed should be used
- connection.readDocuments(feedOpts)
+ val rangeQuery = config
+ .get[String](CosmosDBConfig.RangeQuery)
+
+ if(rangeQuery.isDefined) {
+
+ val rangeQueryString = s"SELECT * FROM data o WHERE o.id >= '${partition.start}' and o.id <= '${partition.end}'"
+// val queryString2 = "SELECT * FROM data o WHERE o.partitionKey >= 'e' and o.partitionKey <= 'i'"
+// val queryString3 = "SELECT * FROM data o WHERE o.partitionKey >= 'i' and o.partitionKey <= 'l'"
+// val queryString4 = "SELECT * FROM data o WHERE o.partitionKey >= 'l' and o.partitionKey <= 'p'"
+// val queryString5 = "SELECT * FROM data o WHERE o.partitionKey >= 'p' and o.partitionKey <= 't'"
+// val queryString6 = "SELECT * FROM data o WHERE o.partitionKey >= 't' and o.partitionKey <= '{'"
+// val queryString7 = "SELECT * FROM data o WHERE o.partitionKey >= '0' and o.partitionKey <= '4'"
+// val queryString8 = "SELECT * FROM data o WHERE o.partitionKey >= '4' and o.partitionKey <= '7'"
+// val queryString9 = "SELECT * FROM data o WHERE o.partitionKey >= '7' and o.partitionKey <= ':'"
+//
+//
+// val query1 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString1, feedOpts)
+// }
+// val query2 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString2, feedOpts)
+// }
+// val query3 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString3, feedOpts)
+// }
+// val query4 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString4, feedOpts)
+// }
+// val query5 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString5, feedOpts)
+// }
+// val query6 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString6, feedOpts)
+// }
+// val query7 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString7, feedOpts)
+// }
+// val query8 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString8, feedOpts)
+// }
+// val query9 = Future[Iterator[Document]] {
+// connection.queryDocuments(queryString9, feedOpts)
+// }
+//
+// val combinedFuture =
+// for {
+// r1 <- query1
+// r2 <- query2
+// r3 <- query3
+// r4 <- query4
+// r5 <- query5
+// r6 <- query6
+// r7 <- query7
+// r8 <- query8
+// r9 <- query9
+// } yield (r1, r2, r3, r4, r5, r6, r7, r8, r9)
+//
+// val (r1, r2, r3, r4, r5, r6, r7, r8, r9) = Await.result(combinedFuture, Duration.Inf)
+// r1 ++ r2 ++ r3 ++ r4 ++ r5 ++ r6 ++ r7 ++ r8 ++ r9
+ connection.queryDocuments(rangeQueryString, feedOpts)
+ }
+ else
+ {
+ connection.readDocuments(feedOpts)
+ }
+
} else {
connection.queryDocuments(queryString, feedOpts)
}
@@ -355,3 +440,5 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
}
}
}
+
+