1

在使用 CQLEngine python 库更改 Cassandra 中的模式后,我试图将行插入表中。在更改之前,模型如下所示:

class MetricsByDevice(Model):
    device = columns.Text(primary_key=True, partition_key=True)
    datetime = columns.DateTime(primary_key=True, clustering_order="DESC")

    load_power = columns.Double()
    inverter_power = columns.Double()

我已将架构更改为此,添加了四列(DSO、节点、公园和商业化):

class MetricsByDevice(Model):
    device = columns.Text(primary_key=True, partition_key=True)
    datetime = columns.DateTime(primary_key=True, clustering_order="DESC")

    DSO = columns.Text(index=True, default='DSO_1'),
    node = columns.Text(index=True, default='Node_1'),
    park = columns.Integer(index=True, default=6),
    commercializer = columns.Text(index=True, default='Commercializer_1'),

    load_power = columns.Double()
    inverter_power = columns.Double()

然后,我将表格与包含该行的脚本同步

sync_table(MetricsByDate)

我检查了数据库并创建了四列。现有行的这些字段的值为 NULL(如预期的那样)。

然后我修改了负责插入批处理行的脚本,包括与新字段对应的值。看起来像:

        batch = BatchQuery()
        for idx, message in enumerate(consumer):

            data = message.value
            ts_to_insert = dateutil.parser.parse(data['timestamp'])

            filters = get_filters(message.partition_key)

            MetricsByDate.batch(batch).create(
                device=device,
                date=str(ts_to_insert.date()),
                time=str(ts_to_insert.time()),
                created_at=now,
                DSO=str(filters['DSO']),
                node=str(filters['node']),
                park=int(filters['park']),
                commercializer=str(filters['commercializer']),
                load_power=data['loadPower'],
                inverter_power=data['inverterPower'],
            )

            if idx % 100 == 0: # Insert every 100 messages

                batch.execute()

                # Reset batch
                batch = BatchQuery()

我已经检查过与新字段对应的值不是 None 并且具有正确的类型。尽管如此,它正确插入了所有行,但新字段中的值在 Cassandra 中为 NULL。

批量插入不会返回任何错误。我不知道我是否遗漏了什么,或者我是否需要做一个额外的步骤来更新架构。我一直在查看文档,但找不到任何有用的东西。

有什么我做错了吗?

编辑

在 Alex Ott 的建议下,我一一插入了这些行。将代码更改为:

for idx, message in enumerate(consumer):

            data = message.value
            ts_to_insert = dateutil.parser.parse(data['timestamp'])

            filters = get_filters(message.partition_key)

            metrics_by_date = MetricsByDate(
                device=device,
                date=str(ts_to_insert.date()),
                time=str(ts_to_insert.time()),
                created_at=now,
                DSO=str(filters['DSO']),
                node=str(filters['node']),
                park=int(filters['park']),
                commercializer=str(filters['commercializer']),
                load_power=data['loadPower'],
                inverter_power=data['inverterPower'],
            )

            metrics_by_date.save()

如果在执行该行之前metrics_by_date.save()我添加了这些打印语句:

print(metrics_by_date.DSO)
print(metrics_by_date.park)
print(metrics_by_date.load_power)
print(metrics_by_date.device)
print(metrics_by_date.date)

输出是:

(<cassandra.cqlengine.columns.Text object at 0x7ff0b492a670>,)
(<cassandra.cqlengine.columns.Integer object at 0x7ff0b492d190>,)
256.99
SQ3-3.2.3.1-70-17444
2020-04-22

在新的字段中,我得到了一个 cassandra 对象,但在其他字段中,我得到了它们的值。这可能是一个线索,因为它继续在新列中插入 NULL。

4

1 回答 1

2

最后我得到了它。

在模型定义中这是愚蠢的,出于未知的原因,我在分隔字段而不是换行符中添加了逗号......因此将模型定义更正为:

class MetricsByDevice(Model):
    device = columns.Text(primary_key=True, partition_key=True)
    datetime = columns.DateTime(primary_key=True, clustering_order="DESC")

    DSO = columns.Text(index=True, default='DSO_1')
    node = columns.Text(index=True, default='Node_1')
    park = columns.Integer(index=True, default=6)
    commercializer = columns.Text(index=True, default='Commercializer_1')

    load_power = columns.Double()
    inverter_power = columns.Double()

有用!!

于 2020-04-22T13:46:02.460 回答