我正在尝试从 flink 读取 kinesis (实际上是使用kinesa运行本地模拟)。这是我的消费者配置:
val consumerConfig = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val kinesis =
env.addSource(new FlinkKinesisConsumer[String](
inputStreamName, new SimpleStringSchema(), consumerConfig))
.print()
但是在流上放置记录时:
aws --endpoint-url=http://localhost:4567 kinesis put-record --data "hello" --partition-key 0 --stream-name ExampleInputStream
我收到以下错误:
org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:247)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:401)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:244)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
at com.fasterxml.jackson.dataformat.cbor.CBORParser.getBinaryValue(CBORParser.java:1660)
at com.fasterxml.jackson.core.JsonParser.getBinaryValue(JsonParser.java:1484)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:198)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:196)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:61)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:29)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:92)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:46)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:53)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:29)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
... 20 more