我的意图是cqlengine.models.Model
在单独的进程中创建实例并通过队列将其发送到数据库管理器。
multiprocessing
queue
不幸的是,如果使用了,我会收到 TypeError
该模型
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
class MyModel(Model):
__keyspace__='mykeyspace'
class Meta:
get_pk_field = 'uid'
uid = columns.UUID(partition_key = True, default=uuid.uuid4)
start = columns.TimeUUID(primary_key = True, clustering_order='desc')
interrupt = columns.List(columns.Tuple(columns.TimeUUID(),columns.TimeUUID(), columns.Text()))
stop = columns.TimeUUID()
例子
以下代码连接到 Cassandra 数据库,创建模型实例并将其存储到数据库中:
from cassandra.cqlengine import connection
from vo.record import MyModel
import uuid
#Queue
queue = multiprocessing.Queue()
#Connect to the database
connection.setup(('localhost',),'mykeyspace')
#Create uuid for the record
uuid = uuid.uuid1()
#Prepare the new record
wt = Workingtime(start = uuid, uid = uuid)
#Store it
wt.save()
但是,如果我将模型放入队列并稍后将其出列,则会引发 TypeError。
queue.put(wt)
item = queue.get()
item.save()
如果打印出模型属性 _timeout,则会显示两个不同的对象 id(如预期的那样)
print wt._timeout
print item._timeout
输出:
<object object at 0x76d32508>
<object object at 0x76d32e28>
回溯(最近一次通话最后):
File "test.py", line 25, in <module>
item.save()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/models.py", line 731, in save
if_exists=self._if_exists).save()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1437, in save
return self.update()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1420, in update
self._delete_null_columns(delete_conditionals)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1369, in _delete_null_columns
self._execute(ds)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1334, in _execute
results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1485, in _execute_statement
return conn.execute(s, params, timeout=timeout, connection=connection)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/connection.py", line 286, in execute
result = conn.session.execute(query, params, timeout=timeout)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1998, in execute
return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 2038, in execute_async
future.send_request()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3365, in send_request
self._start_timer()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3316, in _start_timer
self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout)
File "/usr/local/lib/python2.7/dist-packages/cassandra/io/asyncorereactor.py", line 289, in create_timer
timer = Timer(timeout, callback)
File "/usr/local/lib/python2.7/dist-packages/cassandra/connection.py", line 1030, in __init__
self.end = time.time() + timeout
TypeError: unsupported operand type(s) for +: 'float' and 'object'