diff --git a/pom.xml b/pom.xml index c69107e..ad96eb6 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ 2.6.2 0.9.0 0.7.3 - 2.1.0 + 2.4.4 3.18.1 1.18.20 1.3.2 @@ -1006,8 +1006,8 @@ org.apache.spark - spark-streaming_2.10 - ${spark-streaming_2.10.version} + spark-streaming_2.11 + ${spark-streaming_2.11.version} com.google.guava diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml index 3736168..cffc168 100644 --- a/pulsar-spark/pom.xml +++ b/pulsar-spark/pom.xml @@ -53,7 +53,7 @@ org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 jackson-annotations @@ -69,6 +69,22 @@ + + net.alchim31.maven + scala-maven-plugin + 4.3.0 + + + + compile + add-source + + + + + -target:jvm-1.8 + + org.apache.maven.plugins maven-shade-plugin diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala new file mode 100644 index 0000000..49292b5 --- /dev/null +++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.pulsar + +import org.apache.pulsar.client.api.MessageId + +// Wrapper class for PubSub Message since Message is not Serializable +case class SparkPulsarMessage(data: Array[Byte], key: String, messageId: MessageId, publishTime: Long, eventTime: Long, topicName: String, properties: Map[String, String]) extends Serializable + diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala new file mode 100644 index 0000000..d4a02ac --- /dev/null +++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.pulsar + +import com.google.common.util.concurrent.RateLimiter +import org.apache.pulsar.client.api._ +import org.apache.pulsar.client.impl.PulsarClientImpl +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import org.slf4j.LoggerFactory + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +/** + * Custom spark receiver to pull messages from Pubsub topic and push into reliable store. + * If backpressure is enabled,the message ingestion rate for this receiver will be managed by Spark. + * + * Following spark configurations can be used to control rates and block size + * spark.streaming.backpressure.enabled + * spark.streaming.backpressure.initialRate + * spark.streaming.receiver.maxRate + * spark.streaming.blockQueueSize: Controlling block size + * spark.streaming.backpressure.pid.minRate + * + * See Spark streaming configurations doc + * pay._1 -> pay._2.asInstanceOf[AnyRef])) + .topics(topics).receiverQueueSize(maxPollSize).consumerName(consumerName) + .subscribe() + } catch { + case e: Exception => + SparkStreamingReliablePulsarReceiver.LOG.error("Failed to start subscription : {}", e.getMessage) + e.printStackTrace() + restart("Restart a consumer") + } + latestStorePushTime = System.currentTimeMillis() + + consumerThread = new Thread() { + override def run() { + receive() + } + } + + consumerThread.start() + + } + + override def onStop(): Unit = try { + consumerThread.join(30000L) + if (consumer != null) consumer.close() + if (pulsarClient != null) pulsarClient.close() + } catch { + case e: PulsarClientException => + SparkStreamingReliablePulsarReceiver.LOG.error("Failed to close client : {}", e.getMessage) + } + + // Function that continuously keeps polling records from pulsar and store them. + def receive(): Unit = { + while(!isStopped()){ + try { + // Update rate limit if necessary + updateRateLimit + + val messages = consumer.batchReceive() + + if (messages != null && messages.size() > 0) { + buffer ++= messages.map(msg => { + SparkPulsarMessage(msg.getData, msg.getKey, msg.getMessageId, msg.getPublishTime, msg.getEventTime, msg.getTopicName, msg.getProperties.asScala.toMap) + }) + + } + } + catch { + case e: Exception => reportError("Failed to get messages", e) + } + + try { + + val timeSinceLastStorePush = System.currentTimeMillis() - latestStorePushTime + + // Push messages to spark store if `blockIntervalMs` has passed or we have more messages than blockSize + if(timeSinceLastStorePush >= blockIntervalMs || buffer.length >= blockSize) { + val (completeBlocks, partialBlock) = buffer.grouped(blockSize) + .partition(block => block.size == blockSize) + val blocksToStore = if (completeBlocks.hasNext) completeBlocks else partialBlock + + buffer.clear() + + if(completeBlocks.hasNext && partialBlock.hasNext) + buffer.appendAll(partialBlock.next()) + + while (blocksToStore.hasNext){ + val groupedMessages = blocksToStore.next().toList + SparkStreamingReliablePulsarReceiver.LOG.debug("Pushing " + groupedMessages.size + " messages in store.") + rateLimiter.acquire(groupedMessages.size) + store(groupedMessages.toIterator) + if(autoAcknowledge) + consumer.acknowledge(groupedMessages.map(_.messageId)) + } + latestStorePushTime = System.currentTimeMillis() + } + + } + catch { + case e: Exception => reportError("Failed to store messages", e) + e.printStackTrace() + } + } + } + + def updateRateLimit(): Unit = { + val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit) + if (rateLimiter.getRate != newRateLimit) { + SparkStreamingReliablePulsarReceiver.LOG.info("New rate limit: " + newRateLimit) + rateLimiter.setRate(newRateLimit) + } + } + +} + +object SparkStreamingReliablePulsarReceiver { + private val LOG = LoggerFactory.getLogger(classOf[SparkStreamingReliablePulsarReceiver]) +} \ No newline at end of file diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml index 50b6d7d..a7faf10 100644 --- a/tests/pulsar-spark-test/pom.xml +++ b/tests/pulsar-spark-test/pom.xml @@ -57,7 +57,7 @@ org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 test