1
    val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://xxxx/fifa/pocs/smallMetrics.csv")
    val product_df = spark.read.json("s3://xxxx/fifa/pocs/smallCatalogue.json").toDF("id", "product", "style_id")
    val product_json_df = product_df.select($"style_id",to_json($"product").alias("product"))

    val product_final_df = product_json_df.select($"style_id", get_json_object(($"product"), "$.brand").alias("brand")
      , get_json_object(($"product"), "$.gender").alias("gender")
      , get_json_object(($"product"), "$.article_type").alias("article_type")
      , get_json_object(($"product"), "$.business_unit").alias("business_unit")
      , get_json_object(($"product"), "$.season").alias("season")
      , get_json_object(($"product"), "$.season_code").alias("season_code")
      , get_json_object(($"product"), "$.brand_code").alias("brand_code")
      , get_json_object(($"product"), "$.style_catalogued_date").alias("style_catalogued_date")
      , get_json_object(($"product"), "$.base_colour").alias("base_colour")
      , get_json_object(($"product"), "$.image").alias("image")
      , get_json_object(($"product"), "$.image_array").alias("image_array")
      , get_json_object(($"product"), "$.MRP").alias("mrp")
      , get_json_object(($"product"), "$.attrs").alias("product_attributes")
    )

    product_final_df.show(false)

    |style_id|brand          |gender|article_type|business_unit       |season|season_code|brand_code|style_catalogued_date|base_colour|image|image_array                         |mrp |product_attributes                                                                                                                                                                                                                                                           |
    +--------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |2024270 |Marks & Spencer|Women |Jeans       |International Brands|Fall  |FW17       |MKSP      |null                 |Khaki      |null |[null,null,null,null,null,null,null]|2299|{"ALL":"STYLES","Add-Ons":"NA","Brand Fit Name":"NA","Closure":"Button and Zip","Distress":"Clean Look","Fabric":"Cotton","Fade":"No Fade","Features":"NA","Fit":"Super Skinny Fit","Occasion":"Casual","Shade":"Dark","Waist Rise":"Mid-Rise","Waistband":"With belt loops"}|
    |2023709 |Bossini        |Boys  |Tshirts     |Kids Wear           |Fall  |FW17       |BILE      |null                 |NA         |null |[null,null,null,null,null,null,null]|599 |{"ALL":"STYLES","Fabric":"Polyester","Fabric Type":"Single jersey","Fit":"Regular Fit","Multipack Set":"Single","Neck":"Henley Neck","Pattern":"Solid","Pattern Coverage":"NA","Print or Pattern Type":"Solid","Sleeve Length":"Long Sleeves","Surface Styling":"NA"}        |
    |2024333 |Marks & Spencer|Women |Tops        |International Brands|Fall  |FW17       |MKSP      |null                 |null       |null |[null,null,null,null,null,null,null]|1999|{"ALL":"STYLES","Fabric":"Polyester","Neck":"Round Neck","Pattern":"Solid","Print or Pattern Type":"Solid","Sleeve Length":"Short Sleeves","Sleeve Styling":"Flared Sleeves","Surface Styling":"NA","Type":"Regular","Weave Type":"Knitted"}        

val product_metrics_df = ordersDF.join(product_final_df,"style_id")
product_metrics_df.show(false)

+--------+--------+------+-------+--------+----------------+---------------+--------------+----------+-----------------+---------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|style_id|date    |mrp   |revenue|quantity|product_discount|coupon_discount|total_discount|list_count|add_to_cart_count|pdp_count|brand          |gender|article_type|business_unit       |season|season_code|brand_code|style_catalogued_date|base_colour|image|image_array                         |product_attributes                                                                                                                                                                                                                                                           |
+--------+--------+------+-------+--------+----------------+---------------+--------------+----------+-----------------+---------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270 |20170101|1000.0|1000.0 |1000    |1000.0          |1000.0         |1000.0        |1000      |2000             |2000     |Marks & Spencer|Women |Jeans       |International Brands|Fall  |FW17       |MKSP      |null                 |Khaki      |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Add-Ons":"NA","Brand Fit Name":"NA","Closure":"Button and Zip","Distress":"Clean Look","Fabric":"Cotton","Fade":"No Fade","Features":"NA","Fit":"Super Skinny Fit","Occasion":"Casual","Shade":"Dark","Waist Rise":"Mid-Rise","Waistband":"With belt loops"}|
|2024333 |20170101|1000.0|1000.0 |1000    |1000.0          |1000.0         |1000.0        |1000      |2000             |2000     |Marks & Spencer|Women |Tops        |International Brands|Fall  |FW17       |MKSP      |null                 |null       |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Fabric":"Polyester","Neck":"Round Neck","Pattern":"Solid","Print or Pattern Type":"Solid","Sleeve Length":"Short Sleeves","Sleeve Styling":"Flared Sleeves","Surface Styling":"NA","Type":"Regular","Weave Type":"Knitted"}                                 |
|2023709 |20170101|1000.0|1000.0 |1000    |1000.0          |1000.0         |1000.0        |1000      |2000             |2000     |Bossini        |Boys  |Tshirts     |Kids Wear           |Fall  |FW17       |BILE      |null                 |NA         |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Fabric":"Polyester","Fabric Type":"Single jersey","Fit":"Regular Fit","Multipack Set":"Single","Neck":"Henley Neck","Pattern":"Solid","Pattern Coverage":"NA","Print or Pattern Type":"Solid","Sleeve Length":"Long Sleeves","Surface Styling":"NA"}        |
+--------+--------+------+-------+--------+----------------+---------------+--------------+----------+-----------------+---------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


product_metrics_df.saveToEs(elasticConf)

product_attributes列被写入 ES 时,它会被反斜杠和双引号转义,

product_attributes  "{\"ALL\":\"STYLES\",\"Add-Ons\":\"NA\",\"Brand Fit Name\":\"NA\",\"Closure\":\"Button and Zip\",\"Distress\":\"Clean Look\",\"Fabric\":\"Cotton\",\"Fade\":\"No Fade\",\"Features\":\"NA\",\"Fit\":\"Super Skinny Fit\",\"Occasion\":\"Casual\",\"Shade\":\"Dark\",\"Waist Rise\":\"Mid-Rise\",\"Waistband\":\"With belt loops\"}"

有什么办法可以避免json被反斜杠转义?由于 product_attributes 下的哪些键值对没有被单独索引,并且由于它不是有效的 json,ES 将其解释为单个 String 字段

我已将数据框写入 S3 以交叉检查 product_attributes 数据是否被转义,并且 json 也被反斜杠字符转义。

product_metrics_df.write.json("s3://xxxxx/fifa/pocs/output.csv")

ES 索引模板:https ://pastebin.com/e4tmATHE

使用 spark 和 python 可以很好地将数据写入 ES,所以 ES 索引模板很好

我尝试了另一种方法,我使用 json4s 库构建了 json,然后将 json 写入 ES,但这里也面临同样的问题

  val json =
    (
        ("style_id" -> row.getInt(0)) ~
        ("date" -> row.getInt(1)) ~
        ("mrp" -> row.getFloat(2)) ~
        ("revenue" -> row.getFloat(3)) ~
        ("quantity" -> row.getInt(4)) ~
        ("product_discount" -> row.getFloat(5)) ~
        ("coupon_discount" -> row.getFloat(6)) ~
        ("total_discount" -> row.getFloat(7)) ~
        ("list_count" -> row.getInt(8)) ~
        ("add_to_cart_count" -> row.getInt(9)) ~
        ("pdp_count" -> row.getInt(10)) ~
          ("brand" -> row.getString(11)) ~
          ("gender" -> row.getString(12)) ~
          ("article_type" -> row.getString(13)) ~
          ("business_unit" -> row.getString(14)) ~
          ("season" -> row.getString(15)) ~
          ("season_code" -> row.getString(16)) ~
          ("brand_code" -> row.getString(17)) ~
          ("style_catalogued_date" -> row.getString(18)) ~
          ("base_colour" -> row.getString(19)) ~
          ("image" -> row.getString(20)) ~
          ("image_array" -> row.getString(21)) ~
          ("product_attributes" -> row.getString(22) )
      )
     compact(render(json)).toString

}

val product_metrics_df = ordersDF.join(product_final_df,"style_id").map(convertRowToJSON)

现在一旦 json 准备好了,将es.input.json属性设置为 true 并尝试过,但没有运气

也尝试过saveJsonToEs方法,没有运气,json 仍然被转义并被视为单个对象

product_metrics_df.rdd.saveJsonToEs(elasticConf)

谢谢

4

0 回答 0