我正在使用 Neo4j REST Batch API 进行关系插入。我从多个实例进行插入,有时我会收到带有空消息的 BatchOperationFailedException。
经过调查,我写了一个小程序,它显示了一个问题。似乎其余 API 无法处理并发问题。
所以,我创建了一个控制台应用程序,它启动 10 个线程,5 个线程执行第一个查询,另外 5 个线程执行第二个查询。
第一个查询将关系添加到节点11,然后添加到节点12。第二个查询将关系添加到节点12,然后添加到节点11。
而且,我得到了错误。但是:如果两个查询将具有相同的顺序(都将关系添加到 11,然后添加到 12,或者都将关系添加到 12,然后添加到 11)一切正常。
我收到这样的错误:
{
"message" : "",
"exception" : "BatchOperationFailedException",
"fullname" : "org.neo4j.server.rest.domain.BatchOperationFailedException",
"stacktrace" : [ "org.neo4j.server.rest.batch.NonStreamingBatchOperations.invoke(NonStreamingBatchOperations.java:63)", "org.neo4j.server.rest.batch.BatchOperations.performRequest(BatchOperations.java:188)", "org.neo4j.server.rest.batch.BatchOperations.parseAndPerform(BatchOperations.java:159)", "org.neo4j.server.rest.batch.NonStreamingBatchOperations.performBatchJobs(NonStreamingBatchOperations.java:48)", "org.neo4j.server.rest.web.BatchOperationService.batchProcess(BatchOperationService.java:117)", "org.neo4j.server.rest.web.BatchOperationService.performBatchOperations(BatchOperationService.java:71)", "java.lang.reflect.Method.invoke(Unknown Source)" ]
}
出现错误的查询:
1.
[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},
{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},
{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]
2.
[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},
{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},
{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]
和程序:
static string text1 = null;
static string text2 = null;
static void Main(string[] args)
{
StreamReader sr1 = new StreamReader(@"C:\git\PerformanceTest\Perf\1.txt");
StreamReader sr2 = new StreamReader(@"C:\git\PerformanceTest\Perf\2.txt");
text1 = sr1.ReadToEnd();
text2 = sr2.ReadToEnd();
for (int i = 0; i < 5; i++)
{
Thread thread2 = new Thread(InsertUsers1);
thread2.Start();
}
for (int i = 0; i < 5; i++)
{
Thread thread2 = new Thread(InsertUsers2);
thread2.Start();
}
Thread.CurrentThread.Join();
}
private static void InsertUsers1()
{
//while (true)
//{
Random r = new Random((int)DateTime.Now.Ticks);
string newCommand = text1.Replace("{UserId}", r.Next(1000000, 100000000).ToString());
var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
request.Method = "POST";
request.ContentType = "application/json; charset=utf-8";
request.Accept = "application/json";
request.AutomaticDecompression = DecompressionMethods.GZip;
//request.Timeout = _timeout;
try
{
using (var stream = request.GetRequestStream())
{
var bytes = Encoding.UTF8.GetBytes(newCommand);
stream.Write(bytes, 0, bytes.Length);
string queryResult = null;
Console.WriteLine("1");
using (var responseStream = request.GetResponse().GetResponseStream())
{
using (var reader = new StreamReader(responseStream))
{
queryResult = reader.ReadToEnd();
}
}
}
Thread.Sleep(r.Next(5));
}
catch (Exception e)
{
LogException("InsertBatch1", e);
Console.WriteLine("e1");
}
//}
}
private static void InsertUsers2()
{
//while (true)
//{
Random r = new Random((int)DateTime.Now.Ticks);
string newCommand = text2.Replace("{UserId}", "46239");
var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
request.Method = "POST";
request.ContentType = "application/json; charset=utf-8";
request.Accept = "application/json";
request.AutomaticDecompression = DecompressionMethods.GZip;
//request.Timeout = _timeout;
try
{
using (var stream = request.GetRequestStream())
{
var bytes = Encoding.UTF8.GetBytes(newCommand);
stream.Write(bytes, 0, bytes.Length);
string queryResult = null;
//Thread.Sleep(10);
Console.WriteLine("2");
using (var responseStream = request.GetResponse().GetResponseStream())
{
using (var reader = new StreamReader(responseStream))
{
queryResult = reader.ReadToEnd();
}
}
}
Thread.Sleep(r.Next(5));
}
catch (Exception e)
{
LogException("InsertBatch2", e);
Console.WriteLine("e2");
}
//}
}
private static void LogException(string methodName, Exception ex, string request = null)
{
if (ex is WebException)
{
try
{
string neoError = new StreamReader((ex as WebException).Response.GetResponseStream()).ReadToEnd();
Console.WriteLine("Neo4jContext." + methodName + " - Request error " + neoError);
}
catch
{
Console.WriteLine("Neo4jContext." + methodName + " - Request error");
}
}
else
{
Console.WriteLine("Neo4jContext." + methodName + " - Request error");
}
}
问题:有人遇到过同样的问题吗?你如何处理这个问题?我们如何在没有并发问题的情况下插入关系并同时具有良好的性能?
PS:在有关批量插入的文档中(http://docs.neo4j.org/chunked/milestone/batchinsert.html),请注意批量插入不是线程安全的。但是我们使用 REST API(http://docs.neo4j.org/chunked/milestone/rest-api-batch-ops.html)并且没有关于并发问题的说法,我认为一切都应该正常工作。
PS:我在 Neo4j 1.9 和 2.0 上测试过