0

我正在通过宁静将卡夫卡流推入德鲁伊。kafka版本是0.9.1,宁静是0.8,德鲁伊是0.10。当没有消息产生时,宁静开始很好,但是当生产者发送消息时,我会得到这样的 JsonMappingException:

ava.lang.IllegalArgumentException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token
 at [Source: N/A; line: -1, column: -1]
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
    at com.metamx.tranquility.druid.DruidBeams$.makeFireDepartment(DruidBeams.scala:406) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:291) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:199) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.<init>(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67]
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]

我的 kafka.json 是:

{
  "dataSources" : {
     "stock-index-topic" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "stock-index-topic",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "timestamp",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : ["code","name","acronym","market","tradeVolume","totalValueTraded","preClosePx","openPrice","highPrice","lowPrice","tradePrice","closePx","timestamp"],
                "dimensionExclusions" : [
                  "timestamp",
                  "value"
                ]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "DAY",
            "queryGranularity" : "none",
            "intervals":"no"
          },
          "metricsSpec" : [
            {
              "name" : "firstPrice",
              "type" : "doubleFirst",
              "fieldName" : "tradePrice"
            },{
              "name" : "lastPrice",
              "type" : "doubleLast",
              "fieldName" : "tradePrice"
            }, {
              "name" : "minPrice",
              "type" : "doubleMin",
              "fieldName" : "tradePrice"
            }, {
              "name" : "maxPrice",
              "type" : "doubleMax",
              "fieldName" : "tradePrice"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "stock-index-topic"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "localhost:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "localhost:2181",
    "kafka.group.id" : "tranquility-kafka"
  }
}

我使用 kafka-console-consumer 来获取数据,它看起来像

{"code": "399982", "name": "500等权", "acronym": "500DQ", "market": "102", "tradeVolume": 0, "totalValueTraded": 0.0, "preClosePx": 0.0, "openPrice": 0.0, "highPrice": 0.0, "lowPrice": 0.0, "tradePrice": 7184.7142, "closePx": 0.0, "timestamp": "2017-05-16T09:06:39.000+08:00"}

知道为什么吗?谢谢。

4

1 回答 1

0
"metricsSpec" : [
            {
              "name" : "firstPrice",
              "type" : "doubleFirst",
              "fieldName" : "tradePrice"
            },{
              "name" : "lastPrice",
              "type" : "doubleLast",
              "fieldName" : "tradePrice"
            }, {
              "name" : "minPrice",
              "type" : "doubleMin",
              "fieldName" : "tradePrice"
            }, {
              "name" : "maxPrice",
              "type" : "doubleMax",
              "fieldName" : "tradePrice"
            }
          ]
        },

这是错误的。文档说:First 和 Last 聚合器不能用于摄取规范,只能作为查询的一部分指定。这样,问题就解决了。

于 2017-05-17T03:30:23.197 回答