这是我发布的第一个问题,如果我错过了一些信息和平庸的格式,我深表歉意。如果需要,我可以更新。
我将尝试添加尽可能多的细节。我有一个不太优化的 Spark 作业,它将 RDBMS 数据转换为 Neo4j 中的图形节点和关系。
去做这个。这是我遵循的步骤:
- 使用 spark sql 和连接创建非规范化数据帧“数据”。
'data' 中的 Foreach 行运行一个 graphInsert 函数,该函数执行以下操作:
一个。读取
b 行的内容。制定一个 Neo4j 密码查询(我们使用Merge命令,因此我们只有一个城市,例如在 Neo4j 中创建的芝加哥,而芝加哥将出现在 RDBMS 表中的多行中)
c。连接到 neo4j
d. 执行查询
e. 断开与 Neo4j 的连接
这是我面临的问题列表。
- 插入很慢。
我知道合并查询比创建慢,但有没有另一种方法可以做到这一点,而不是为每条记录连接和断开连接?这是我的第一个代码草案,也许我正在努力如何使用一个连接从不同 Spark 工作节点上的多个线程插入。因此,为每条记录连接和断开连接。
- 该作业不可扩展。它仅在 1 个核心下运行良好。一旦我用 2 个 spark 核心运行作业,我就会突然得到 2 个同名的城市,即使我正在运行合并查询。例如,有 2 个芝加哥城市违反了 Merge 的使用。我假设合并功能类似于“如果不存在则创建”。
我不知道我的实现在 neo4j 部分或 spark 中是否错误。如果有人可以指导我查看任何有助于我以更好的规模实现这一点的文档,那将很有帮助,因为我有一个大的火花集群,我需要充分利用它来完成这项工作。
如果您有兴趣查看代码而不是算法。这是 scala 中的 graphInsert 实现:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}