3

我正在尝试使用python 驱动程序从 Cassandra 加载和删除数据。我已经尝试过使用在 docker 容器中运行的 cassandra 以及在 docker 版本给我带来问题后再次在本地运行。这是我正在做的一个例子:

class Controller(object):
def __init__(self):
    self.cluster = Cluster()
    self.session = self.cluster.connect('mykeyspace')

def insert_into_cassandra(self):
    query = ('INSERT INTO mytable (mykey, indexed_key) VALUES (?, ?)')
    prepared = self.session.prepare(query)
    prepared.consistency_level = ConsistencyLevel.QUORUM
    params_gen = self.params_generator(fname)
    execute_concurrent_with_args(self.session, prepared, self.parameter_generator(), concurrency=50)

def delete_param_gen(self, results):
    for r in results:
        yield [r.mykey]

def delete_by_index(self, value):
    query = "SELECT mykey from mytable where indexed_key = '%s'" % value
    res = self.session.execute(query)
    delete_query = "DELETE from mytable where mykey = ?"
    prepared = self.session.prepare(delete_query)
    prepared.consistency_level = ConsistencyLevel.QUORUM
    params_gen = self.delete_param_gen(res)
    execute_concurrent_with_args(self.session, prepared, params_gen, concurrency=50)

没什么疯狂的。加载/删除数据时,我经常看到以下消息:

Sending options message heartbeat on idle connection (4422117360) 127.0.0.1
Heartbeat failed for connection (4422117360) to 127.0.0.1

以下是删除数据的一些日志。

[2017-02-28 08:37:20,562] [DEBUG] [cassandra.connection] Defuncting connection (4422117360) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closing connection (4422117360) to 127.0.0.1
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closed socket to 127.0.0.1
[2017-02-28 08:37:20,564] [DEBUG] [cassandra.pool] Defunct or closed connection (4422117360) returned to pool, potentially marking host 127.0.0.1 as down
[2017-02-28 08:37:20,566] [DEBUG] [cassandra.pool] Replacing connection (4422117360) to 127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.connection] Defuncting connection (4426057600) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreactor] Closing connection (4426057600) to 127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreacto[2017-02-28 08:37:20,568] [ERROR] [cassandra.cluster] Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 3536, in cassandra.cluster.ResponseFuture._set_result (cassandra/cluster.c:67556)
  File "cassandra/cluster.py", line 3711, in cassandra.cluster.ResponseFuture._set_final_result (cassandra/cluster.c:71769)
  File "cassandra/concurrent.py", line 154, in cassandra.concurrent._ConcurrentExecutor._on_success (cassandra/concurrent.c:3357)
  File "cassandra/concurrent.py", line 203, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5539)
  File "cassandra/concurrent.py", line 209, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5427)
  File "cassandra/concurrent.py", line 123, in cassandra.concurrent._ConcurrentExecutor._execute_next (cassandra/concurrent.c:2369)
  File "load_cassandra.py", line 148, in delete_param_gen
    for r in rows:
  File "cassandra/cluster.py", line 3991, in cassandra.cluster.ResultSet.next (cassandra/cluster.c:76025)
  File "cassandra/cluster.py", line 4006, in cassandra.cluster.ResultSet.fetch_next_page (cassandra/cluster.c:76193)
  File "cassandra/cluster.py", line 3781, in cassandra.cluster.ResponseFuture.result (cassandra/cluster.c:73073)
cassandra.cluster.NoHostAvailable: ('Unable to complete the operation against any hosts', {})r] Closed socket to 127.0.0.1

以下是插入数据的一些内容:

[2017-02-28 16:50:25,594] [DEBUG] [cassandra.connection] Sending options message heartbeat on idle connection (140301574604448) 127.0.0.1
[2017-02-28 16:50:25,595] [DEBUG] [cassandra.cluster] [control connection] Attempting to reconnect
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.cluster] [control connection] Opening new connection to 127.0.0.1
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Not sending options message for new connection(140301347717016) to 127.0.0.1 because compression is disabled and a cql version was not specified
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sending StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042>
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sent StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042>
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301347717016) to 127.0.0.1
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.connection] Connection to 127.0.0.1 was closed during the startup handshake
[2017-02-28 16:50:30,597] [WARNING] [cassandra.cluster] [control connection] Error connecting to 127.0.0.1:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 2623, in cassandra.cluster.ControlConnection._reconnect_internal (cassandra/cluster.c:47899)
  File "cassandra/cluster.py", line 2645, in cassandra.cluster.ControlConnection._try_connect (cassandra/cluster.c:48416)
  File "cassandra/cluster.py", line 1119, in cassandra.cluster.Cluster.connection_factory (cassandra/cluster.c:15085)
  File "cassandra/connection.py", line 333, in cassandra.connection.Connection.factory (cassandra/connection.c:5790)
cassandra.OperationTimedOut: errors=Timed out creating connection (5 seconds), last_host=None
[2017-02-28 16:50:39,309] [ERROR] [root] Exception inserting data into cassandra
Traceback (most recent call last):
  File "load_cassandra.py", line 54, in run
    controller.insert_into_cassandra(filename)
  File "extract_to_cassandra.py", line 141, in insert_into_cassandra
    for success, result in results:
  File "cassandra/concurrent.py", line 177, in _results (cassandra/concurrent.c:4856)
  File "cassandra/concurrent.py", line 186, in cassandra.concurrent.ConcurrentExecutorGenResults._results (cassandra/concurrent.c:4622)
  File "cassandra/concurrent.py", line 165, in cassandra.concurrent._ConcurrentExecutor._raise (cassandra/concurrent.c:3745)
cassandra.WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'consistency': 'QUORUM', 'required_responses': 1, 'received_responses': 0}
[2017-02-28 16:50:39,465] [DEBUG] [cassandra.connection] Received options response on connection (140301574604448) from 127.0.0.1
[2017-02-28 16:50:39,466] [DEBUG] [cassandra.cluster] Shutting down Cluster Scheduler
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.cluster] Shutting down control connection
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301574604448) to 127.0.0.1
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1
[2017-02-28 16:50:39,468] [DEBUG] [cassandra.pool] Defunct or closed connection (140301574604448) returned to pool, potentially marking host 127.0.0.1 as down

我进行了一致性调整,甚至将其设置为 1,但这不起作用。与 docker 相比,在本地运行 cassandra 时插入往往会更好地工作,但它们仍然会超时。删除通常工作几秒钟,然后挂起/超时。

编辑:这是失败时来自 cassandra 的日志:

INFO  18:39:11 MUTATION messages were dropped in last 5000 ms: 4 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 2933809 ms and Mean cross-node dropped latency: 0 msINFO  18:39:11 Pool Name                    Active   Pending      Completed   Blocked  All Time Blocked                                                                                            [48/1513]
INFO  18:39:11 MutationStage                    32        15            470         0                 0

INFO  18:39:11 ViewMutationStage                 0         0              0         0                 0

INFO  18:39:11 ReadStage                         0         0             59         0                 0

INFO  18:39:11 RequestResponseStage              0         0              0         0                 0

INFO  18:39:11 ReadRepairStage                   0         0              0         0                 0

INFO  18:39:11 CounterMutationStage              0         0              0         0                 0

INFO  18:39:11 MiscStage                         0         0              0         0                 0

INFO  18:39:11 CompactionExecutor                0         0           6399         0                 0

INFO  18:39:11 MemtableReclaimMemory             0         0             36         0                 0

INFO  18:39:11 PendingRangeCalculator            0         0              1         0                 0

INFO  18:39:11 GossipStage                       0         0              0         0                 0

INFO  18:39:11 SecondaryIndexManagement          0         0              0         0                 0

INFO  18:39:11 HintsDispatcher                   0         0              0         0                 0

INFO  18:39:11 MigrationStage                    0         0              2         0                 0

INFO  18:39:11 MemtablePostFlush                 0         0             62         0                 0

INFO  18:39:11 PerDiskMemtableFlushWriter_0         0         0             36         0                 0

INFO  18:39:11 ValidationExecutor                0         0              0         0                 0

INFO  18:39:11 Sampler                           0         0              0         0                 0

INFO  18:39:11 MemtableFlushWriter               0         0             36         0                 0

INFO  18:39:11 InternalResponseStage             0         0              0         0                 0

INFO  18:39:11 AntiEntropyStage                  0         0              0         0                 0

INFO  18:39:11 CacheCleanupExecutor              0         0              0         0                 0

INFO  18:39:11 Native-Transport-Requests        33         0            727         0                 0

INFO  18:39:11 CompactionManager                 0         0INFO  18:39:11 MessagingService                n/a       0/0
INFO  18:39:11 Cache Type                     Size                 Capacity               KeysToSave
INFO  18:39:11 KeyCache                       1368                 51380224                      all
INFO  18:39:11 RowCache                          0                        0                      all
INFO  18:39:11 Table                       Memtable ops,data
INFO  18:39:11 system_distributed.parent_repair_history                 0,0
INFO  18:39:11 system_distributed.repair_history                 0,0
INFO  18:39:11 system_distributed.view_build_status                 0,0
INFO  18:39:11 system.compaction_history               1,231
INFO  18:39:11 system.hints                              0,0
INFO  18:39:11 system.schema_aggregates                  0,0
INFO  18:39:11 system.IndexInfo                          0,0
INFO  18:39:11 system.schema_columnfamilies                 0,0
INFO  18:39:11 system.schema_triggers                    0,0
INFO  18:39:11 system.size_estimates                 40,1255
INFO  18:39:11 system.schema_functions                   0,0
INFO  18:39:11 system.paxos                              0,0
INFO  18:39:11 system.views_builds_in_progress                 0,0
INFO  18:39:11 system.built_views                        0,0
INFO  18:39:11 system.peer_events                        0,0
INFO  18:39:11 system.range_xfers                        0,0
INFO  18:39:11 system.peers                              0,0
INFO  18:39:11 system.batches                            0,0
INFO  18:39:11 system.schema_keyspaces                   0,0
INFO  18:39:11 system.schema_usertypes                   0,0
INFO  18:39:11 system.local                              0,0
INFO  18:39:11 system.sstable_activity                 6,117
INFO  18:39:11 system.available_ranges                   0,0
INFO  18:39:11 system.batchlog                           0,0
INFO  18:39:11 system.schema_columns                     0,0
INFO  18:39:11 system_schema.columns                     0,0
INFO  18:39:11 system_schema.types                       0,0
INFO  18:39:11 system_schema.indexes                     0,0
INFO  18:39:11 system_schema.keyspaces                   0,0
INFO  18:39:11 system_schema.dropped_columns                 0,0
INFO  18:39:11 system_schema.aggregates                  0,0
INFO  18:39:11 system_schema.triggers                    0,0
INFO  18:39:11 system_schema.tables                      0,0
INFO  18:39:11 system_schema.views                       0,0
INFO  18:39:11 system_schema.functions                   0,0
INFO  18:39:11 system_auth.roles                         0,0
INFO  18:39:11 system_auth.role_members                  0,0
INFO  18:39:11 system_auth.resource_role_permissons_index                 0,0
INFO  18:39:11 system_auth.role_permissions                 0,0
INFO  18:39:11 mykeyspace.mytable                       430,27163514
INFO  18:39:11 system_traces.sessions                    0,0
INFO  18:39:11 system_traces.events                      0,0
INFO  18:39:13 ParNew GC in 261ms.  CMS Old Gen: 46106544 -> 74868512; Par Eden Space: 208895224 -> 0; Par Survivor Space: 16012448 -> 26083328

我也看到这样的消息:

Out of 29 commit log syncs over the past 248s with average duration of      1596.14ms, 1 have exceeded the configured commit interval by an average of 18231.00ms
4

1 回答 1

1

您可以尝试的一件事是减少idle_heartbeat_interval连接中的设置。默认情况下为 30 秒,但您可以在实例化集群类时进行配置。在本例中,我将其设置为 10 秒:

def __init__(self):
    self.cluster = Cluster(idle_heartbeat_interval=10)
    self.session = self.cluster.connect('mykeyspace')

如果这没有帮助,那么可能是时候检查您的数据模型是否存在反模式了。

于 2017-03-01T14:53:21.613 回答