在使用 CQLEngine python 库更改 Cassandra 中的模式后,我试图将行插入表中。在更改之前,模型如下所示:
class MetricsByDevice(Model):
device = columns.Text(primary_key=True, partition_key=True)
datetime = columns.DateTime(primary_key=True, clustering_order="DESC")
load_power = columns.Double()
inverter_power = columns.Double()
我已将架构更改为此,添加了四列(DSO、节点、公园和商业化):
class MetricsByDevice(Model):
device = columns.Text(primary_key=True, partition_key=True)
datetime = columns.DateTime(primary_key=True, clustering_order="DESC")
DSO = columns.Text(index=True, default='DSO_1'),
node = columns.Text(index=True, default='Node_1'),
park = columns.Integer(index=True, default=6),
commercializer = columns.Text(index=True, default='Commercializer_1'),
load_power = columns.Double()
inverter_power = columns.Double()
然后,我将表格与包含该行的脚本同步
sync_table(MetricsByDate)
我检查了数据库并创建了四列。现有行的这些字段的值为 NULL(如预期的那样)。
然后我修改了负责插入批处理行的脚本,包括与新字段对应的值。看起来像:
batch = BatchQuery()
for idx, message in enumerate(consumer):
data = message.value
ts_to_insert = dateutil.parser.parse(data['timestamp'])
filters = get_filters(message.partition_key)
MetricsByDate.batch(batch).create(
device=device,
date=str(ts_to_insert.date()),
time=str(ts_to_insert.time()),
created_at=now,
DSO=str(filters['DSO']),
node=str(filters['node']),
park=int(filters['park']),
commercializer=str(filters['commercializer']),
load_power=data['loadPower'],
inverter_power=data['inverterPower'],
)
if idx % 100 == 0: # Insert every 100 messages
batch.execute()
# Reset batch
batch = BatchQuery()
我已经检查过与新字段对应的值不是 None 并且具有正确的类型。尽管如此,它正确插入了所有行,但新字段中的值在 Cassandra 中为 NULL。
批量插入不会返回任何错误。我不知道我是否遗漏了什么,或者我是否需要做一个额外的步骤来更新架构。我一直在查看文档,但找不到任何有用的东西。
有什么我做错了吗?
编辑
在 Alex Ott 的建议下,我一一插入了这些行。将代码更改为:
for idx, message in enumerate(consumer):
data = message.value
ts_to_insert = dateutil.parser.parse(data['timestamp'])
filters = get_filters(message.partition_key)
metrics_by_date = MetricsByDate(
device=device,
date=str(ts_to_insert.date()),
time=str(ts_to_insert.time()),
created_at=now,
DSO=str(filters['DSO']),
node=str(filters['node']),
park=int(filters['park']),
commercializer=str(filters['commercializer']),
load_power=data['loadPower'],
inverter_power=data['inverterPower'],
)
metrics_by_date.save()
如果在执行该行之前metrics_by_date.save()
我添加了这些打印语句:
print(metrics_by_date.DSO)
print(metrics_by_date.park)
print(metrics_by_date.load_power)
print(metrics_by_date.device)
print(metrics_by_date.date)
输出是:
(<cassandra.cqlengine.columns.Text object at 0x7ff0b492a670>,)
(<cassandra.cqlengine.columns.Integer object at 0x7ff0b492d190>,)
256.99
SQ3-3.2.3.1-70-17444
2020-04-22
在新的字段中,我得到了一个 cassandra 对象,但在其他字段中,我得到了它们的值。这可能是一个线索,因为它继续在新列中插入 NULL。