0

这是我发布的第一个问题,如果我错过了一些信息和平庸的格式,我深表歉意。如果需要,我可以更新。

我将尝试添加尽可能多的细节。我有一个不太优化的 Spark 作业,它将 RDBMS 数据转换为 Neo4j 中的图形节点和关系。

去做这个。这是我遵循的步骤:

  1. 使用 spark sql 和连接创建非规范化数据帧“数据”。
  2. 'data' 中的 Foreach 行运行一个 graphInsert 函数,该函数执行以下操作:

    一个。读取
    b 行的内容。制定一个 Neo4j 密码查询(我们使用Merge命令,因此我们只有一个城市,例如在 Neo4j 中创建的芝加哥,而芝加哥将出现在 RDBMS 表中的多行中)
    c。连接到 neo4j
    d. 执行查询
    e. 断开与 Neo4j 的连接

这是我面临的问题列表。

  1. 插入很慢。

我知道合并查询比创建慢,但有没有另一种方法可以做到这一点,而不是为每条记录连接和断开连接?这是我的第一个代码草案,也许我正在努力如何使用一个连接从不同 Spark 工作节点上的多个线程插入。因此,为每条记录连接和断开连接。

  1. 该作业不可扩展。它仅在 1 个核心下运行良好。一旦我用 2 个 spark 核心运行作业,我就会突然得到 2 个同名的城市,即使我正在运行合并查询。例如,有 2 个芝加哥城市违反了 Merge 的使用。我假设合并功能类似于“如果不存在则创建”。

我不知道我的实现在 neo4j 部分或 spark 中是否错误。如果有人可以指导我查看任何有助于我以更好的规模实现这一点的文档,那将很有帮助,因为我有一个大的火花集群,我需要充分利用它来完成这项工作。

如果您有兴趣查看代码而不是算法。这是 scala 中的 graphInsert 实现:

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}
4

2 回答 2

0

我已经完成了对流程的改进,但没有什么能像 Cypher 中的 LOAD 命令一样快。希望这对某人有所帮助:在执行此过程时使用foreachPartition而不是获得显着收益。foreach还使用密码添加定期提交。

于 2016-07-30T09:56:17.350 回答
0

无论你在闭包内写什么,即它需要在 Worker 上执行,都会被分发。您可以在这里阅读更多信息:http: //spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

而且随着您增加核心数量,我认为它一定不会影响应用程序,因为如果您不指定它!然后它采取贪婪的方法!我希望这份文件有所帮助。

于 2016-06-23T18:14:03.087 回答