1

我的意图是cqlengine.models.Model在单独的进程中创建实例并通过队列将其发送到数据库管理器。

multiprocessing queue不幸的是,如果使用了,我会收到 TypeError

该模型

from cassandra.cqlengine import columns 
from cassandra.cqlengine.models import Model

class MyModel(Model):
    __keyspace__='mykeyspace'

    class Meta:
        get_pk_field = 'uid'

    uid = columns.UUID(partition_key = True, default=uuid.uuid4)
    start = columns.TimeUUID(primary_key = True, clustering_order='desc')
    interrupt = columns.List(columns.Tuple(columns.TimeUUID(),columns.TimeUUID(), columns.Text()))
    stop = columns.TimeUUID()

例子

以下代码连接到 Cassandra 数据库,创建模型实例并将其存储到数据库中:

    from cassandra.cqlengine import connection
    from vo.record import MyModel

    import uuid

    #Queue
    queue = multiprocessing.Queue()
    #Connect to the database
    connection.setup(('localhost',),'mykeyspace')

    #Create uuid for the record
    uuid = uuid.uuid1()
    #Prepare the new record
    wt = Workingtime(start = uuid, uid = uuid)

    #Store it
    wt.save()

但是,如果我将模型放入队列并稍后将其出列,则会引发 TypeError。

    queue.put(wt)
    item = queue.get()
    item.save()

如果打印出模型属性 _timeout,则会显示两个不同的对象 id(如预期的那样)

print wt._timeout
print item._timeout

输出:

<object object at 0x76d32508>
<object object at 0x76d32e28>

回溯(最近一次通话最后):

  File "test.py", line 25, in <module>
    item.save() 
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/models.py", line 731, in save
    if_exists=self._if_exists).save()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1437, in save
    return self.update()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1420, in update
    self._delete_null_columns(delete_conditionals)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1369, in _delete_null_columns
    self._execute(ds)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1334, in _execute
    results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1485, in _execute_statement
    return conn.execute(s, params, timeout=timeout, connection=connection)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/connection.py", line 286, in execute
    result = conn.session.execute(query, params, timeout=timeout)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1998, in execute
    return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 2038, in execute_async
    future.send_request()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3365, in send_request
    self._start_timer()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3316, in _start_timer
    self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/io/asyncorereactor.py", line 289, in create_timer
    timer = Timer(timeout, callback)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/connection.py", line 1030, in __init__
    self.end = time.time() + timeout
TypeError: unsupported operand type(s) for +: 'float' and 'object'

如何防止错误?

4

1 回答 1

0

超时必须明确定义:

wt.timeout(0)

我无法解释为什么 save() 不适用于多处理队列。

于 2017-03-02T21:35:57.620 回答