-1

我刚开始学习将 AWS 的 golang sdk 用于 kinesis 数据流。在 Suzuken 的帖子之后,我创建了一个脚本来从 aws 中的数据流中获取记录,其生产者是 cloudwatch 日志:

func suzukenGetRecords() {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }

    // Create an Amazon S3 service client
    kc := kinesis.NewFromConfig(cfg)

    // Get the stream
    streams, err := kc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
        StreamName: aws.String("aws-go-starter"),
    })
    if err != nil {
        log.Fatal(err)
    }

    putOutput, err := kc.PutRecord(context.TODO(), &kinesis.PutRecordInput{
        Data:         []byte("hoge"),
        StreamName:   aws.String("aws-go-starter"),
        PartitionKey: aws.String("key1"),
    })
    if err != nil {
        panic(err)
    }
    fmt.Printf("%v\n", putOutput)

    // retrieve iterator
    iteratorOutput, err := kc.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
        ShardId:                aws.String(*streams.StreamDescription.Shards[0].ShardId),
        ShardIteratorType:      "TRIM_HORIZON",
        StreamName:             aws.String("aws-go-starter"),
        StartingSequenceNumber: new(string),
        Timestamp:              &time.Time{},
    })
    if err != nil {
        log.Panic(err)
    }

    records, err := kc.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
        ShardIterator: iteratorOutput.ShardIterator,
    })
    if err != nil {
        panic(err)
    }
    fmt.Printf("%v\n", records)

}

我以为我会得到手动输入到 cloudwatch 日志中的内容,例如“test1”、“test2”或“testtest”,但我得到了一堆数字:

2022/02/07 18:48:25 GetRecords Data: [31 139 8 0 0 0 0 0 0 0 93 143 77 107 2 49 16 134 255 75 206 10 153 124 76 38 222 22 186 245 212 211 122 43 82 162 134 37 224 110 150 36 86 138 248 223 59 149 122 241 250 60 243 190 51 115 19 83 172 53 140 113 247 179 68 177 17 111 221 174 251 250 232 135 161 219 246 98 37 242 117 142 133 49 121 141 202 42 139 142 44 227 115 30 183 37 95 22 54 225 90 215 99 94 215 22 74 227 209 135 27 90 137 97 98 249 79 129 113 189 28 234 177 164 165 165 60 191 167 51 211 42 54 159 175 241 253 35 223 127 199 185 253 233 155 72 39 174 209 136 72 202 104 231 201 32 73 13 206 146 113 64 30 189 51 142 164 67 235 1 180 84 164 128 180 145 222 27 75 210 16 175 109 137 255 107 97 226 83 1 141 81 40 61 120 112 110 245 252 155 235 79 161 138 251 254 254 11 46 196 173 166 11 1 0 0]
2022/02/07 18:48:25 GetRecords Data: [31 139 8 0 0 0 0 0 0 0 93 143 61 111 131 48 16 134 255 203 205 137 100 31 248 176 217 144 66 51 117 34 91 137 42 39 181 144 165 128 145 237 52 170 16 255 189 71 218 46 29 238 67 207 123 159 11 140 46 37 59 184 211 215 236 160 134 67 115 106 222 95 219 174 107 142 45 236 32 60 38 23 25 107 83 16 42 84 84 105 197 248 22 134 99 12 247 153 21 251 72 251 33 236 83 182 49 115 233 83 235 114 116 118 100 241 151 74 198 233 126 73 215 232 231 236 195 244 226 111 76 19 212 111 255 219 207 207 254 246 211 77 121 147 23 240 31 60 166 32 34 141 37 161 16 5 153 202 20 12 80 34 59 69 12 140 212 108 146 176 82 164 133 48 85 169 81 242 161 196 107 179 231 255 178 29 249 84 73 101 137 36 5 146 214 184 251 251 155 199 47 125 223 131 189 92 183 80 255 228 155 95 97 61 175 223 196 138 122 129 32 1 0 0]

我从另一篇文章中看到,我可以在数据之前使用 []byte,但它仍然不起作用。

有人可以帮我吗?

- - 更新 - -

哦!难怪!我遵循的教程实际上有解组字节切片的行但是当我使用以下代码时:

for _, d := range records.Records {
    m := make(map[string]interface{})
    err := json.Unmarshal([]byte(d.Data), &m)
    if err != nil {
        log.Println(err)
        continue
    }
    log.Printf("GetRecords Data: %v\n", m)
    // log.Printf("GetRecords Data: %v\n", []byte(d.Data))
}

它返回以下消息,甚至不是错误:

2022/02/07 20:17:24 invalid character '\x1f' looking for beginning of value

如何解决这个无效问题?

- - 更新 - -

我验证了 aws cli 正在工作,我可以在使用 base64 解码后看到输出,如下所示:

echo -n "H4sIAAAAAAAAADWPPU8DMQyG/0vmDrETO063kzg6MV03VKGjRFVQ70NJCkJV/zuGwujHfmy/VzOlWsdT2n+tyWzNQ7fvXp76Yeh2vdmY5XNORbFEx0hIHIQUn5fTriyXVTvv4zHdydBKGqc/BMrq5bUeS15bXubHfG6pVLN9vhuHX6X/SHP7gVeT39R0zCwRKDITIlmnhY8iICTOktgQfLCCEAEV+8BkGQD1WMsapI2T/gTsPUawbNHR5j+grm86QeZ2uH0DFU6ZLPYAAAA=" | base64 -d | zcat

但是,当我运行 go 源代码时,我仍然收到相同的2022/02/07 23:00:03 无效字符 '\x1f' 寻找值消息的开头。

这是我运行的完整代码:

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func main() {
    kinesisGetRecords()
}

func kinesisGetRecords() {
    // Load the Shared AWS Configuration (~/.aws/config)
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }

    // Create an Kinesis service client
    kc := kinesis.NewFromConfig(cfg)

    // Define stream name
    streamName := aws.String("jace")

    // Get the stream
    streams, err := kc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
        StreamName: streamName,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Retrieve iterator
    iteratorOutput, err := kc.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
        ShardId:           aws.String(*streams.StreamDescription.Shards[0].ShardId),
        ShardIteratorType: "TRIM_HORIZON",
        StreamName:        streamName,
    })
    if err != nil {
        log.Panic(err)
    }

    // Display records
    shardIterator := iteratorOutput.ShardIterator
    var a *string

    // get data using infinity looping
    // we will attempt to consume data every 1 secons, if no data, nothing will be happen
    for {
        // get records use shard iterator for making request
        records, err := kc.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
            ShardIterator: shardIterator,
        })

        // if error, wait until 1 seconds and continue the looping process
        if err != nil {
            time.Sleep(1000 * time.Millisecond)
            continue
        }

        // process the data
        if len(records.Records) > 0 {
            for _, d := range records.Records {
                m := make(map[string]interface{})
                err := json.Unmarshal([]byte(d.Data), &m)
                if err != nil {
                    log.Println(err)
                    continue
                }
                log.Printf("GetRecords Data: %v\n", m)
            }
        } else if records.NextShardIterator == a || shardIterator == records.NextShardIterator || err != nil {
            log.Printf("GetRecords ERROR: %v\n", err)
            break
        }
        shardIterator = records.NextShardIterator
        time.Sleep(1000 * time.Millisecond)
    }
}

有人可以帮忙吗?

4

1 回答 1

0

你的问题不清楚..

您显示的输出看起来像几个字节切片 ([]byte)。对于许多存储“记录”的 AWS 服务(例如 Kinesis、SQS),记录通常是 JSON 字符串的字节切片。该字节切片可以 json.Unmarshal()'d 转换为适当的类型,可以是已定义的类型(例如:包含 S3 实体的 SQS 事件)或您定义的类型(您在 Kinesis 中放置的任何内容的 JSON 模式)

于 2022-02-08T00:10:11.910 回答