diff --git a/pom.xml b/pom.xml
index c69107e..ad96eb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,7 @@
2.6.2
0.9.0
0.7.3
- 2.1.0
+ 2.4.4
3.18.1
1.18.20
1.3.2
@@ -1006,8 +1006,8 @@
org.apache.spark
- spark-streaming_2.10
- ${spark-streaming_2.10.version}
+ spark-streaming_2.11
+ ${spark-streaming_2.11.version}
com.google.guava
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index 3736168..cffc168 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -53,7 +53,7 @@
org.apache.spark
- spark-streaming_2.10
+ spark-streaming_2.11
jackson-annotations
@@ -69,6 +69,22 @@
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.3.0
+
+
+
+ compile
+ add-source
+
+
+
+
+ -target:jvm-1.8
+
+
org.apache.maven.plugins
maven-shade-plugin
diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala
new file mode 100644
index 0000000..49292b5
--- /dev/null
+++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.pulsar
+
+import org.apache.pulsar.client.api.MessageId
+
+// Wrapper class for PubSub Message since Message is not Serializable
+case class SparkPulsarMessage(data: Array[Byte], key: String, messageId: MessageId, publishTime: Long, eventTime: Long, topicName: String, properties: Map[String, String]) extends Serializable
+
diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala
new file mode 100644
index 0000000..d4a02ac
--- /dev/null
+++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.pulsar
+
+import com.google.common.util.concurrent.RateLimiter
+import org.apache.pulsar.client.api._
+import org.apache.pulsar.client.impl.PulsarClientImpl
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Custom spark receiver to pull messages from Pubsub topic and push into reliable store.
+ * If backpressure is enabled,the message ingestion rate for this receiver will be managed by Spark.
+ *
+ * Following spark configurations can be used to control rates and block size
+ * spark.streaming.backpressure.enabled
+ * spark.streaming.backpressure.initialRate
+ * spark.streaming.receiver.maxRate
+ * spark.streaming.blockQueueSize: Controlling block size
+ * spark.streaming.backpressure.pid.minRate
+ *
+ * See Spark streaming configurations doc
+ * pay._1 -> pay._2.asInstanceOf[AnyRef]))
+ .topics(topics).receiverQueueSize(maxPollSize).consumerName(consumerName)
+ .subscribe()
+ } catch {
+ case e: Exception =>
+ SparkStreamingReliablePulsarReceiver.LOG.error("Failed to start subscription : {}", e.getMessage)
+ e.printStackTrace()
+ restart("Restart a consumer")
+ }
+ latestStorePushTime = System.currentTimeMillis()
+
+ consumerThread = new Thread() {
+ override def run() {
+ receive()
+ }
+ }
+
+ consumerThread.start()
+
+ }
+
+ override def onStop(): Unit = try {
+ consumerThread.join(30000L)
+ if (consumer != null) consumer.close()
+ if (pulsarClient != null) pulsarClient.close()
+ } catch {
+ case e: PulsarClientException =>
+ SparkStreamingReliablePulsarReceiver.LOG.error("Failed to close client : {}", e.getMessage)
+ }
+
+ // Function that continuously keeps polling records from pulsar and store them.
+ def receive(): Unit = {
+ while(!isStopped()){
+ try {
+ // Update rate limit if necessary
+ updateRateLimit
+
+ val messages = consumer.batchReceive()
+
+ if (messages != null && messages.size() > 0) {
+ buffer ++= messages.map(msg => {
+ SparkPulsarMessage(msg.getData, msg.getKey, msg.getMessageId, msg.getPublishTime, msg.getEventTime, msg.getTopicName, msg.getProperties.asScala.toMap)
+ })
+
+ }
+ }
+ catch {
+ case e: Exception => reportError("Failed to get messages", e)
+ }
+
+ try {
+
+ val timeSinceLastStorePush = System.currentTimeMillis() - latestStorePushTime
+
+ // Push messages to spark store if `blockIntervalMs` has passed or we have more messages than blockSize
+ if(timeSinceLastStorePush >= blockIntervalMs || buffer.length >= blockSize) {
+ val (completeBlocks, partialBlock) = buffer.grouped(blockSize)
+ .partition(block => block.size == blockSize)
+ val blocksToStore = if (completeBlocks.hasNext) completeBlocks else partialBlock
+
+ buffer.clear()
+
+ if(completeBlocks.hasNext && partialBlock.hasNext)
+ buffer.appendAll(partialBlock.next())
+
+ while (blocksToStore.hasNext){
+ val groupedMessages = blocksToStore.next().toList
+ SparkStreamingReliablePulsarReceiver.LOG.debug("Pushing " + groupedMessages.size + " messages in store.")
+ rateLimiter.acquire(groupedMessages.size)
+ store(groupedMessages.toIterator)
+ if(autoAcknowledge)
+ consumer.acknowledge(groupedMessages.map(_.messageId))
+ }
+ latestStorePushTime = System.currentTimeMillis()
+ }
+
+ }
+ catch {
+ case e: Exception => reportError("Failed to store messages", e)
+ e.printStackTrace()
+ }
+ }
+ }
+
+ def updateRateLimit(): Unit = {
+ val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit)
+ if (rateLimiter.getRate != newRateLimit) {
+ SparkStreamingReliablePulsarReceiver.LOG.info("New rate limit: " + newRateLimit)
+ rateLimiter.setRate(newRateLimit)
+ }
+ }
+
+}
+
+object SparkStreamingReliablePulsarReceiver {
+ private val LOG = LoggerFactory.getLogger(classOf[SparkStreamingReliablePulsarReceiver])
+}
\ No newline at end of file
diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml
index 50b6d7d..a7faf10 100644
--- a/tests/pulsar-spark-test/pom.xml
+++ b/tests/pulsar-spark-test/pom.xml
@@ -57,7 +57,7 @@
org.apache.spark
- spark-streaming_2.10
+ spark-streaming_2.11
test