4

I need to establish a connection from Spark Streaming to Neo4j graph database.The RDDs are of type((is,I),(am,Hello)(sam,happy)....). I need to establish a edge between each pair of words in Neo4j.

In Spark Streaming documentation I found

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

to the push to the data to an external database.

I am doing this in Scala. I am little confused about how to go about? I found AnormCypher and Neo4jScala wrapper. Can I use these to get work done? If so, how can I do that? If not, all there any better alternatives?

Thank you all....

4

2 回答 2

2

我用AnormCypher做了一个实验

像这样:

implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
  .flatMap( _.split(" "))
  .map( w =>
    Cypher("CREATE(:Word {text:{text}})")
      .on( "text" -> w ).execute()
   ).filter( _ ).count()

Neo4j 2.2.x 具有出色的并发写入性能,您可以在 Spark 中使用。因此,您可以写入 Neo4j 的并发线程越多越好。如果您可以在每个请求中批量处理 100 到 1000 个语句,那就更好了。

于 2015-06-25T20:50:51.600 回答
1

看看 MazeRunner ( http://www.kennybastani.com/2014/11/using-apache-spark-and-neo4j-for-big.html ),因为它会给你一些想法。

于 2015-06-25T20:43:55.057 回答