0

我对将大型集合插入 cassandra 数据库的最快方法有点困惑。我读到我不应该使用批量插入,因为它是为原子性而创建的。甚至 Cassandra 也为我提供了使用异步写入来提高性能的信息。我使用了没有'batch'关键字的最快插入代码:

var cluster = Cluster.Builder()
 .AddContactPoint(“127.0.0.1")
 .Build();
 var session = cluster.Connect();

 //Save off the prepared statement you’re going to use
 var statement = session.Prepare (“INSERT INTO tester.users (userID, firstName, lastName) VALUES (?,?,?)”);

 var tasks = new List<Task>();
 for (int i = 0; i < 1000; i++) 
 {

 //please bind with whatever actually useful data you’re importing
 var bind = statement.Bind (i, “John”, “Tester”);
 var resultSetFuture = session.ExecuteAsync (bind);
 tasks.Add (resultSetFuture);
 }

 Task.WaitAll(tasks.ToArray());
 cluster.Shutdown();

来自:https
://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e 但它仍然比我使用的批处理选项慢得多。我当前的代码如下所示:

IList<Movie> moviesList = Movie.CreateMoviesCollectionForCassandra(collectionEntriesNumber);
            var preparedStatements = new List<PreparedStatement>();
            foreach (var statement in preparedStatements)
            {
                statement.SetConsistencyLevel(ConsistencyLevel.One);
            }
            var statementBinding = new BatchStatement();
            statementBinding.SetBatchType(BatchType.Unlogged);

        for (int i = 0; i < collectionEntriesNumber; i++)
        {
            preparedStatements.Add(Session.Prepare("INSERT INTO Movies (id, title, description, year, genres, rating, originallanguage, productioncountry, votingsnumber, director) VALUES (?,?,?,?,?,?,?,?,?,?)"));
        }
        for (int i = 0; i < collectionEntriesNumber; i++)
        {
            statementBinding.Add(preparedStatements[i].Bind(moviesList[i].Id, moviesList[i].Title,
                moviesList[i].Description, moviesList[i].Year, moviesList[i].Genres, moviesList[i].Rating,
                moviesList[i].OriginalLanguage, moviesList[i].ProductionCountry, moviesList[i].VotingsNumber,
                new Director(moviesList[0].Director.Id, moviesList[i].Director.Firstname,
                    moviesList[i].Director.Lastname, moviesList[i].Director.Age)));
        }
        watch.Start();
        Session.ExecuteAsync(statementBinding);
        watch.Stop();

它确实工作得更快,但我只能插入约 2500 个准备好的语句,不能再插入了,我想测量大约 100000 个对象插入的时间。

我的代码正确吗?也许我应该增加插入阈值?请解释一下我的正确方法。

4

1 回答 1

0

请记住,您应该准备一次并重复使用它PreparedStatement来绑定到不同的参数。

如果您针对同一分区,您可以使用小批量,否则您应该使用单个请求。

使用单个请求时,您可以并行调度执行,并使用信号量限制未完成请求的数量。

就像是:

    public async Task<long> Execute(
        IStatement[] statements, int parallelism, int maxOutstandingRequests)
    {
        var semaphore = new SemaphoreSlim(maxOutstandingRequests);
        var tasks = new Task<RowSet>[statements.Length];
        var chunkSize = statements.Length / parallelism;
        if (chunkSize == 0)
        {
            chunkSize = 1;
        }
        var statementLength = statements.Length;
        var launchTasks = new Task[parallelism + 1];
        var watch = new Stopwatch();
        watch.Start();
        for (var i = 0; i < parallelism + 1; i++)
        {
            var startIndex = i * chunkSize;
            //start to launch in parallel
            launchTasks[i] = Task.Run(async () =>
            {
                for (var j = 0; j < chunkSize; j++)
                {
                    var index = startIndex + j;
                    if (index >= statementLength)
                    {
                        break;
                    }
                    await semaphore.WaitAsync();
                    var t = _session.ExecuteAsync(statements[index]);
                    tasks[index] = t;
                    var rs = await t;
                    semaphore.Release();
                }
            });
        }
        await Task.WhenAll(launchTasks);
        await Task.WhenAll(tasks);
        watch.Stop();
        return watch.ElapsedMilliseconds;
    }
于 2015-11-06T11:18:15.130 回答