diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/DeepQueryTest.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/DeepQueryTest.scala deleted file mode 100644 index 3feaea5..0000000 --- a/example/src/main/scala/com/vesoft/nebula/algorithm/DeepQueryTest.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* Copyright (c) 2022 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package com.vesoft.nebula.algorithm - -import com.vesoft.nebula.connector.connector.NebulaDataFrameReader -import com.facebook.thrift.protocol.TCompactProtocol -import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig} -import org.apache.log4j.Logger -import org.apache.spark.SparkConf -import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, Pregel, VertexId} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Encoder, SparkSession} - -import scala.collection.mutable - -object DeepQueryTest { - private val LOGGER = Logger.getLogger(this.getClass) - - def main(args: Array[String]): Unit = { - val sparkConf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - val spark = SparkSession - .builder() - .config(sparkConf) - .getOrCreate() - val iter = args(0).toInt - val id = args(1).toInt - - query(spark, iter, id) - } - - def readNebulaData(spark: SparkSession): DataFrame = { - - val config = - NebulaConnectionConfig - .builder() - .withMetaAddress("192.168.15.5:9559") - .withTimeout(6000) - .withConenctionRetry(2) - .build() - val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig - .builder() - .withSpace("twitter") - .withLabel("FOLLOW") - .withNoColumn(true) - .withLimit(20000) - .withPartitionNum(120) - .build() - val df: DataFrame = - spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() - df - } - - def deepQuery(df: DataFrame, - maxIterations: Int, - startId: Int): Graph[mutable.HashSet[Int], Double] = { - implicit val encoder: Encoder[Edge[Double]] = org.apache.spark.sql.Encoders.kryo[Edge[Double]] - val edges: RDD[Edge[Double]] = df - .map(row => { - Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, 1.0) - })(encoder) - .rdd - - val graph = Graph.fromEdges(edges, None) - - val queryGraph = graph.mapVertices { (vid, _) => - mutable.HashSet[Int](vid.toInt) - } - queryGraph.cache() - queryGraph.numVertices - queryGraph.numEdges - df.unpersist() - - def sendMessage(edge: EdgeTriplet[mutable.HashSet[Int], Double]) - : Iterator[(VertexId, mutable.HashSet[Int])] = { - val (smallSet, largeSet) = if (edge.srcAttr.size < edge.dstAttr.size) { - (edge.srcAttr, edge.dstAttr) - } else { - (edge.dstAttr, edge.srcAttr) - } - - if (smallSet.size == maxIterations) { - Iterator.empty - } else { - val newNeighbors = - (for (id <- smallSet; neighbor <- largeSet if neighbor != id) yield neighbor) - Iterator((edge.dstId, newNeighbors)) - } - } - - val initialMessage = mutable.HashSet[Int]() - - val pregelGraph = Pregel(queryGraph, initialMessage, maxIterations, EdgeDirection.Both)( - vprog = (id, attr, msg) => attr ++ msg, - sendMsg = sendMessage, - mergeMsg = (a, b) => { - val setResult = a ++ b - setResult - } - ) - pregelGraph.cache() - pregelGraph.numVertices - pregelGraph.numEdges - queryGraph.unpersist() - pregelGraph - } - - def query(spark: SparkSession, maxIter: Int, startId: Int): Unit = { - val start = System.currentTimeMillis() - val df = readNebulaData(spark) - df.cache() - df.count() - println(s"read data cost time ${(System.currentTimeMillis() - start)}") - - val startQuery = System.currentTimeMillis() - val graph = deepQuery(df, maxIter, startId) - - val endQuery = System.currentTimeMillis() - val num = graph.vertices.filter(row => row._2.contains(startId)).count() - val end = System.currentTimeMillis() - println(s"query cost: ${endQuery - startQuery}") - println(s"count: ${num}, cost: ${end - endQuery}") - } -} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 4aa1812..c4e04ed 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -17,6 +17,8 @@ import com.vesoft.nebula.algorithm.config.{ HanpConfig, JaccardConfig, KCoreConfig, + KNeighborsConfig, + KNeighborsParallelConfig, LPAConfig, LouvainConfig, Node2vecConfig, @@ -36,6 +38,8 @@ import com.vesoft.nebula.algorithm.lib.{ HanpAlgo, JaccardAlgo, KCoreAlgo, + KStepNeighbors, + KStepNeighborsParallel, LabelPropagationAlgo, LouvainAlgo, Node2vecAlgo, @@ -83,7 +87,9 @@ object Main { val startTime = System.currentTimeMillis() // reader - val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum) + val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum) + dataSet.cache() + dataSet.count() val readTime = System.currentTimeMillis() // algorithm @@ -218,6 +224,15 @@ object Main { val jaccardConfig = JaccardConfig.getJaccardConfig(configs) JaccardAlgo(spark, dataSet, jaccardConfig) } + case "kneighbors" => { + val kNeighborsConfig = KNeighborsConfig.getKNeighborConfig(configs) + KStepNeighbors(spark, dataSet, kNeighborsConfig) + } + case "keignborsparallel" => { + val kNeighborsParallelConfig = + KNeighborsParallelConfig.getKNeighborParallelConfig(configs) + KStepNeighborsParallel(spark, dataSet, kNeighborsParallelConfig) + } case _ => throw new UnknownParameterException("unknown executeAlgo name.") } } @@ -225,6 +240,9 @@ object Main { } private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = { + if (algoResult == null) { + return + } val dataSink = configs.dataSourceSinkEntry.sink dataSink.toLowerCase match { case "nebula" => { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala index 9811185..2303aec 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala @@ -341,6 +341,46 @@ object JaccardConfig { } } +/** + * k-step neighbors + */ +case class KNeighborsConfig(steps: List[Int], startId: Long) + +object KNeighborsConfig { + var steps: List[Int] = _ + var startId: Long = _ + + def getKNeighborConfig(configs: Configs): KNeighborsConfig = { + val kNeighborConfig = configs.algorithmConfig.map + steps = kNeighborConfig("algorithm.kneighbors.steps").toString.split(",").map(_.toInt).toList + startId = kNeighborConfig("algorithm.kneighbors.startId").toInt + KNeighborsConfig(steps, startId) + } +} + +/** + * k-step neighbors for multi ids + */ +case class KNeighborsParallelConfig(steps: List[Int], startIds: List[Long]) + +object KNeighborsParallelConfig { + var steps: List[Int] = _ + var startIds: List[Long] = _ + + def getKNeighborParallelConfig(configs: Configs): KNeighborsParallelConfig = { + val kNeighborParallelConfig = configs.algorithmConfig.map + steps = kNeighborParallelConfig("algorithm.keignborsparallel.steps").toString + .split(",") + .map(_.toInt) + .toList + startIds = kNeighborParallelConfig("algorithm.keignborsparallel.startIds").toString + .split(",") + .map(_.toLong) + .toList + KNeighborsParallelConfig(steps, startIds) + } +} + case class AlgoConfig(configs: Configs) object AlgoConfig { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighbors.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighbors.scala new file mode 100644 index 0000000..f059732 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighbors.scala @@ -0,0 +1,75 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, KCoreConfig, KNeighborsConfig} +import com.vesoft.nebula.algorithm.lib.KCoreAlgo.execute +import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil} +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, Pregel, VertexId} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SparkSession} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +object KStepNeighbors { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "KStepNeighbors" + + def apply(spark: SparkSession, + dataset: Dataset[Row], + kStepConfig: KNeighborsConfig): DataFrame = { + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + graph.persist() + graph.numVertices + graph.numEdges + dataset.unpersist(blocking = false) + + execute(graph, kStepConfig.steps, kStepConfig.startId) + null + } + + def execute(graph: Graph[None.type, Double], steps: List[Int], startId: Long): Unit = { + val queryGraph = graph.mapVertices { case (vid, _) => vid == startId } + + val initialMessage = false + def sendMessage(edge: EdgeTriplet[Boolean, Double]): Iterator[(VertexId, Boolean)] = { + if (edge.srcAttr && !edge.dstAttr) + Iterator((edge.dstId, true)) + else if (edge.dstAttr && !edge.srcAttr) + Iterator((edge.srcId, true)) + else + Iterator.empty + } + + val costs: ArrayBuffer[Long] = new ArrayBuffer[Long](steps.size) + val counts: ArrayBuffer[Long] = new ArrayBuffer[Long](steps.size) + + for (iter <- steps) { + LOGGER.info(s">>>>>>>>>>>>>>> query ${iter} steps for $startId >>>>>>>>>>>>>>> ") + val startQuery = System.currentTimeMillis() + val pregelGraph = Pregel(queryGraph, initialMessage, iter, EdgeDirection.Either)( + vprog = (id, attr, msg) => attr | msg, + sendMsg = sendMessage, + mergeMsg = (a, b) => a | b + ) + val endQuery = System.currentTimeMillis() + val num = pregelGraph.vertices.filter(row => row._2).count() + costs.append(endQuery - startQuery) + counts.append(num) + } + + val timeCosts = costs.toArray + val neighborNum = counts.toArray + for (i <- 1 to steps.size) { + print(s"query ${steps(i - 1)} step neighbors cost: ${timeCosts(i - 1)}, ") + println(s"neighbor number is : ${neighborNum(i - 1)} ") + } + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsParallel.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsParallel.scala new file mode 100644 index 0000000..dd75d97 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsParallel.scala @@ -0,0 +1,98 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.KNeighborsParallelConfig +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, Pregel, VertexId} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +object KStepNeighborsParallel { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "KStepNeighborsParallel" + + def apply(spark: SparkSession, + dataset: Dataset[Row], + kStepConfig: KNeighborsParallelConfig): DataFrame = { + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + graph.persist() + graph.numVertices + graph.numEdges + dataset.unpersist(blocking = false) + + execute(graph, kStepConfig.steps, kStepConfig.startIds) + null + } + + def execute(graph: Graph[None.type, Double], steps: List[Int], startIds: List[Long]): Unit = { + val queryGraph = graph.mapVertices { + case (vid, _) => + if (startIds.contains(vid)) Map[VertexId, Boolean](vid -> true) + else Map[VertexId, Boolean]() + } + + val initialMessage = Map[VertexId, Boolean]() + + def sendMessage( + edge: EdgeTriplet[Map[Long, Boolean], Double]): Iterator[(VertexId, Map[Long, Boolean])] = { + if (edge.srcAttr.equals(edge.dstAttr)) { + Iterator.empty + } else if (edge.srcAttr.isEmpty) { + Iterator((edge.srcId, edge.dstAttr)) + } else if (edge.dstAttr.isEmpty) { + Iterator((edge.dstId, edge.srcAttr)) + } else + Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)) + } + + val costs: ArrayBuffer[Long] = new ArrayBuffer[Long](steps.size) + val nums: ArrayBuffer[mutable.Map[Long, Long]] = + new ArrayBuffer[mutable.Map[VertexId, VertexId]](steps.size) + + for (iter <- steps) { + LOGGER.info(s">>>>>>>>>>>>>>> query ${iter} steps for $startIds >>>>>>>>>>>>>>> ") + val startQuery = System.currentTimeMillis() + val pregelGraph = Pregel(queryGraph, initialMessage, iter, EdgeDirection.Either)( + vprog = (id, attr, msg) => attr ++ msg, + sendMsg = sendMessage, + mergeMsg = (a, b) => a ++ b + ) + + val endQuery = System.currentTimeMillis() + val vertexCounts: mutable.Map[Long, Long] = new mutable.HashMap[Long, Long]() + for (id <- startIds) { + val num = + pregelGraph.vertices.filter(row => row._2.contains(id)).count() + vertexCounts.put(id, num) + } + costs.append(endQuery - startQuery) + nums.append(vertexCounts) + } + + val timeCosts = costs.toArray + val neighborNum = nums.toArray + for (i <- 1 to steps.size) { + print(s"query ${steps(i - 1)} step neighbors cost: ${timeCosts(i - 1)}, ") + println(s"neighbor number is : ${printNeighbors(neighborNum(i - 1))} ") + } + } + + def printNeighbors(result: mutable.Map[Long, Long]): String = { + val sb = new StringBuilder + for (key <- result.keySet) { + sb.append(key) + sb.append(":") + sb.append(result(key)) + sb.append(";") + } + sb.toString.substring(0, sb.length - 1) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsSuite.scala new file mode 100644 index 0000000..12bfa3f --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KStepNeighborsSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package scala.com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.KNeighborsConfig +import com.vesoft.nebula.algorithm.lib.KStepNeighbors +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class KStepNeighborsSuite { + @Test + def kcoreSuite(): Unit = { + val spark = + SparkSession.builder().master("local").config("spark.sql.shuffle.partitions", 5).getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val kStepConfig = new KNeighborsConfig(List(1, 2, 3), 1) + KStepNeighbors.apply(spark, data, kStepConfig) + } +}