0

我正在使用 Neo4j REST Batch API 进行关系插入。我从多个实例进行插入,有时我会收到带有空消息的 BatchOperationFailedException。

经过调查,我写了一个小程序,它显示了一个问题。似乎其余 API 无法处理并发问题。

所以,我创建了一个控制台应用程序,它启动 10 个线程,5 个线程执行第一个查询,另外 5 个线程执行第二个查询。

第一个查询将关系添加到节点11,然后添加到节点12。第二个查询将关系添加到节点12,然后添加到节点11

而且,我得到了错误。但是:如果两个查询将具有相同的顺序(都将关系添加到 11,然后添加到 12,或者都将关系添加到 12,然后添加到 11)一切正常。

我收到这样的错误:

{
  "message" : "",
  "exception" : "BatchOperationFailedException",
  "fullname" : "org.neo4j.server.rest.domain.BatchOperationFailedException",
  "stacktrace" : [ "org.neo4j.server.rest.batch.NonStreamingBatchOperations.invoke(NonStreamingBatchOperations.java:63)", "org.neo4j.server.rest.batch.BatchOperations.performRequest(BatchOperations.java:188)", "org.neo4j.server.rest.batch.BatchOperations.parseAndPerform(BatchOperations.java:159)", "org.neo4j.server.rest.batch.NonStreamingBatchOperations.performBatchJobs(NonStreamingBatchOperations.java:48)", "org.neo4j.server.rest.web.BatchOperationService.batchProcess(BatchOperationService.java:117)", "org.neo4j.server.rest.web.BatchOperationService.performBatchOperations(BatchOperationService.java:71)", "java.lang.reflect.Method.invoke(Unknown Source)" ]
}

出现错误的查询:

1.

[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},

{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},

{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]

2.

[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},

{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},

{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]

和程序:

    static string text1 = null;
    static string text2 = null;

    static void Main(string[] args)
    {
        StreamReader sr1 = new StreamReader(@"C:\git\PerformanceTest\Perf\1.txt");
        StreamReader sr2 = new StreamReader(@"C:\git\PerformanceTest\Perf\2.txt");
        text1 = sr1.ReadToEnd();
        text2 = sr2.ReadToEnd();
        for (int i = 0; i < 5; i++)
        {
            Thread thread2 = new Thread(InsertUsers1);
            thread2.Start();
        }

        for (int i = 0; i < 5; i++)
        {
            Thread thread2 = new Thread(InsertUsers2);
            thread2.Start();
        }

        Thread.CurrentThread.Join();
    }


    private static void InsertUsers1()
    {
        //while (true)
        //{
            Random r = new Random((int)DateTime.Now.Ticks);
            string newCommand = text1.Replace("{UserId}", r.Next(1000000, 100000000).ToString());

            var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
            request.Method = "POST";
            request.ContentType = "application/json; charset=utf-8";
            request.Accept = "application/json";
            request.AutomaticDecompression = DecompressionMethods.GZip;
            //request.Timeout = _timeout;
            try
            {
                using (var stream = request.GetRequestStream())
                {
                    var bytes = Encoding.UTF8.GetBytes(newCommand);
                    stream.Write(bytes, 0, bytes.Length);
                    string queryResult = null;
                    Console.WriteLine("1");
                    using (var responseStream = request.GetResponse().GetResponseStream())
                    {
                        using (var reader = new StreamReader(responseStream))
                        {
                            queryResult = reader.ReadToEnd();
                        }
                    }
                }

                Thread.Sleep(r.Next(5));
            }
            catch (Exception e)
            {
                LogException("InsertBatch1", e);
                Console.WriteLine("e1");
            }
        //}
    }

    private static void InsertUsers2()
    {
        //while (true)
        //{
            Random r = new Random((int)DateTime.Now.Ticks);
            string newCommand = text2.Replace("{UserId}", "46239");

            var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
            request.Method = "POST";
            request.ContentType = "application/json; charset=utf-8";
            request.Accept = "application/json";
            request.AutomaticDecompression = DecompressionMethods.GZip;
            //request.Timeout = _timeout;
            try
            {
                using (var stream = request.GetRequestStream())
                {
                    var bytes = Encoding.UTF8.GetBytes(newCommand);
                    stream.Write(bytes, 0, bytes.Length);
                    string queryResult = null;
                    //Thread.Sleep(10);
                    Console.WriteLine("2");
                    using (var responseStream = request.GetResponse().GetResponseStream())
                    {
                        using (var reader = new StreamReader(responseStream))
                        {
                            queryResult = reader.ReadToEnd();
                        }
                    }
                }

                Thread.Sleep(r.Next(5));
            }
            catch (Exception e)
            {
                LogException("InsertBatch2", e);
                Console.WriteLine("e2");
            }
        //}
    }

    private static void LogException(string methodName, Exception ex, string request = null)
    {
        if (ex is WebException)
        {
            try
            {
                string neoError = new StreamReader((ex as WebException).Response.GetResponseStream()).ReadToEnd();
                Console.WriteLine("Neo4jContext." + methodName + " - Request error " + neoError);
            }
            catch
            {
                Console.WriteLine("Neo4jContext." + methodName + " - Request error");
            }
        }
        else
        {
            Console.WriteLine("Neo4jContext." + methodName + " - Request error");
        }
    }

问题:有人遇到过同样的问题吗?你如何处理这个问题?我们如何在没有并发问题的情况下插入关系并同时具有良好的性能?

PS:在有关批量插入的文档中(http://docs.neo4j.org/chunked/milestone/batchinsert.html),请注意批量插入不是线程安全的。但是我们使用 REST API(http://docs.neo4j.org/chunked/milestone/rest-api-batch-ops.html)并且没有关于并发问题的说法,我认为一切都应该正常工作。

PS:我在 Neo4j 1.9 和 2.0 上测试过

4

1 回答 1

1

我认为这是一个僵局,异常应该这么说。

你能检查data/logs一下data/graph.db/messages.log它们是否包含死锁异常吗?

在死锁时,您通常会重试失败的操作。

于 2013-05-27T12:45:17.543 回答