我从一个主题 i ksql 创建了一个流。该流具有如下字段。我可以查询不同的字段,例如:select category from fake-data-119
. 我想知道如何从地图字段中获取单个项目,例如:状态?
来自源的数据是:
ProducerRecord(topic=fake-data-119, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"deviceId": 16, "category": "visibility sensors", "timeStamp": "Tue Jun 19 10:11:10 CEST 2018", "deviceProperties": {"visibility": "72", "status": "true"}}, timestamp=null)
ProducerRecord(topic=fake-data-119, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"deviceId": 6, "category": "fans", "timeStamp": "Tue Jun 19 10:11:11 CEST 2018", "deviceProperties": {"temperature": "22", "rotationSense": "1", "status": "false", "frequency": "56"}}, timestamp=null)
ProducerRecord(topic=fake-data-119, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"deviceId": 23, "category": "air quality monitors", "timeStamp": "Tue Jun 19 10:11:12 CEST 2018", "deviceProperties": {"coPpm": "136", "status": "false", "Co2Ppm": "450"}}, timestamp=null)
我正在使用下面的语句来创建流:
CREATE STREAM fakeData119 WITH (KAFKA_TOPIC='fake-data-119', VALUE_FORMAT='AVRO');
Field | Type
---------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DEVICEID | INTEGER
CATEGORY | VARCHAR(STRING)
TIMESTAMP | VARCHAR(STRING)
DEVICEPROPERTIES | MAP[VARCHAR(STRING),VARCHAR(STRING)]
---------------------------------------------------------
ksql> select * from fakeData119;
1529394182864 | null | 6 | fans | Tue Jun 19 09:43:02 CEST 2018 | {temperature=36, rotationSense=1, status=false, frequency=72}
1529394183869 | null | 5 | fans | Tue Jun 19 09:43:03 CEST 2018 | {temperature=23, rotationSense=1, status=true, frequency=76}
1529394184872 | null | 16 | visibility sensors | Tue Jun 19 09:43:04 CEST 2018 | {visibility=14, status=true}
1529394185875 | null | 25 | air quality monitors | Tue Jun 19 09:43:05 CEST 2018 | {coPpm=280, status=false, Co2Ppm=170}