1

我正在使用 Apache Flink 和 Neo4J(社区版)开发数据分析应用程序。

在这个应用程序中,Flink sink 必须在 Neo4J 中保存/更新关系。

哪个是 Neo4J 会话管理的最佳方式,为什么?

第一次实现:

public class MySink extends RichSinkFunction<Link> {

  private DbConfiguration dbconfig;

  private Driver driver;

  @Override
  public void open(Configuration parameters) throws Exception {
    this.driver = Neo4JManager.open(this.dbconfig);
  }

  @Override
  public void close() throws Exception {
    this.driver.close();
  }

  @Override
  public void invoke(Link link) throws Exception {
    Session session = this.driver.session();

    Neo4JManager.saveLink(session, link);

    session.close();
  }
}

第二种实现:

public class MySink extends RichSinkFunction<Link> {

  private DbConfiguration dbconfig;

  private Driver driver;

  private Session session;

  @Override
  public void open(Configuration parameters) throws Exception {
    this.driver = Neo4JManager.open(this.dbconfig);
    this.session = driver.session();
  }

  @Override
  public void close() throws Exception {
    this.session.close();
    this.driver.close();
  }

  @Override
  public void invoke(Link link) throws Exception {
    Neo4JManager.saveLink(this.session, link);
  }
}

在这两种实现中,都使用了以下函数:

public class Neo4JManager {
  public static Driver open(DbConfiguration dbconf) {
    AuthToken auth = AuthTokens.basic(dbconf.getUsername(), dbconf.getPassword());
    Config config = Config.build().withEncryptionLevel(Config.EncryptionLevel.NONE ).toConfig();
    return GraphDatabase.driver(dbconf.getHostname(), auth, config);
  }

  public static void saveLink(Session session, Link link) {
    Value params = parameters("x", link.x, "y", link.y);
    session.run('CREATE (Person {id:{x}}-[FOLLOWS]->(Person {id:{y}}))'
  }
}

谢谢你。

4

0 回答 0