1

我想将个性化用于我的应用推荐模型。获取我的当前应用分析数据。如本文档中所述,我已连接 pinpoint 以借助 kinesis firehose 获取数据。

但是当我将 kinesis data firehose 连接到 pinpoint 时。

查明

我的 pinpoint 将数据发送到 kinesis。但是输出与我想要的不同。

运动设置:

运动设置

和我得到的输出。

输出

有没有其他方法可以解决发送数据以从精确定位到个性化以启动活动。活动开始后,我可以根据文档通过活动发送数据。

4

1 回答 1

1

由于 Pinpoint 事件的形状和内容不同于 Personalize 所需的交互格式(作为交互 CSV批量导入或通过 PutEvents API增量导入),因此需要进行一些转换才能将这些事件转换为正确的格式. 您提到的解决方案使用定期批量导入,通过使用 Athena 将保存在 S3(通过 Kinesis Firehose)中的事件数据提取并格式化为 Personalize 预期的 CSV 格式,然后导入 Personalize。您可以在此处的解决方案的 CloudFormation 模板中找到 Athena 命名查询。

WITH evs AS (
    SELECT
    client.client_id as endpoint_id,
    attributes.campaign_id as campaign_id,
    event_type,
    arrival_timestamp
    FROM event
    WHERE
    (
        ${InteractionsQueryDateScope} > 0
        AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
        AND arrival_timestamp < CURRENT_DATE
    ) OR (
        ${InteractionsQueryDateScope} = -1
    )
    AND
    event_type != '_custom.recommender'
),
recs AS (
    SELECT
    attributes.personalize_user_id as personalize_user_id,
    client.client_id as endpoint_id,
    attributes.campaign_id as campaign_id,
    attributes.item_id as item_id,
    event_type,
    arrival_timestamp
    FROM event
    WHERE
    (
        ${InteractionsQueryDateScope} > 0
        AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
        AND arrival_timestamp < CURRENT_DATE
    ) OR (
        ${InteractionsQueryDateScope} = -1
    )
    AND
    event_type = '_custom.recommender'
)
SELECT
    r.personalize_user_id as USER_ID,
    r.item_id AS ITEM_ID,
    b.event_type AS EVENT_TYPE,
    v.EVENT_VALUE,
    CAST(to_unixtime(b.arrival_timestamp) AS BIGINT) AS TIMESTAMP
FROM endpoint_export a
INNER JOIN recs r
    ON a.id = r.endpoint_id
INNER JOIN evs b
    ON a.id = b.endpoint_id AND r.campaign_id = b.campaign_id
INNER JOIN event_value v
    ON b.event_type = v.event_type

以下是在 Glue 数据目录中创建表的方式。

CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event (
    client struct<client_id:string>,
    attributes struct<campaign_id:string, item_id:string, personalize_user_id:string>,
    event_type string,
    arrival_timestamp timestamp
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/events/'
TBLPROPERTIES ('has_encrypted_data'='false');

CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.endpoint_export (
    id string,
    channeltype string,
    address string,
    endpointstatus string,
    optout string,
    effectivedate string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/endpoint_exports/'
TBLPROPERTIES ('has_encrypted_data'='false');

CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event_value (
    event_type string,
    event_value double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
    'serialization.format' = ',',
    'field.delim' = ','
) LOCATION 's3://${DataS3Bucket}/event_values/'
TBLPROPERTIES ('has_encrypted_data'='false', 'skip.header.line.count'='1');

Kinesis Firehose/S3/Athena 的另一种方法是编写一个 Lambda 函数,该函数直接从 Kinesis 数据流中使用 Pinpoint 事件,将 Lambda 中的 Pinpoint 事件转换为对 Personalize 事件跟踪器的 PutEvents API 调用,然后创建一个一旦你积累了足够的交互数据,解决方案、解决方案版本和活动。

Personalize 的 Pinpoint 事件所需的最少字段是USER_IDITEM_IDTIMESTAMPUSER_ID很可能是 Pinpoint 端点 ( client.client_id),很ITEM_ID可能通过 Pinpoint 作为属性 ( attributes.item_id) 传递,并且TIMESTAMP将是arrival_timestamp。您还可以将 Pinpointevent_type用作EVENT_TYPEPersonalize 中的一个。上面的 SQL 语句向您展示了如何使用 Athena 完成此操作,但您也可以在 Lambda 的代码中为您从 Kinesis 数据流中消耗的每个小批量事件执行此操作。

于 2021-10-02T18:59:18.993 回答