0

I'm getting events from Kafka and storing into Cassandra. Parsing json which contains fields eventID, sessionID, timestamp, userID to create columns for Cassandra table which looks like this:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

and in code:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

I need to add timestamp column as timeuuid. Since I'm parsing from json, extracted all values from header and created columns in this fashion:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

This part:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

is generating Error

java.lang.NumberFormatException: For input string: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

Even tried: java.util.UUID.fromString(x._1("timestamp").toString, also generating same Error. How to properly cast/convert timestamp as timeuuid and insert into Cassandra via spark job

4

3 回答 3

2

我已经用 UDF 解决了这个问题。

import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
 
val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
 
val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)
于 2020-07-22T06:48:08.780 回答
0

您有一个不是数字的字符串,并且您正在尝试使用toLong. 因此例外。

看着这个,看起来你可以使用这个方法获得一个基于某个时间戳的 UUID:

public static UUID getTimeUUID(long when)

您将不得不将字符串解析为 aDateTime或 anInstant然后将该 DateTime/ Instant 的毫秒数传递给getTimeUUID

于 2019-05-09T13:52:26.243 回答
0

我设法做到了,将timestamp格式转换为dateTimemillis,然后生成uuid

val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)

val allJson = rdd.
              map(x => {
                implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                //use serialization default to format a Map to JSON
                (x, Serialization.write(x))
              }).
              filter(x => x._1 isDefinedAt "header").
              map(x => (x._1("header"), x._2)).
              filter(x => (x._1 isDefinedAt "userID") &&
                (x._1 isDefinedAt "eventID") &&
                (x._1 isDefinedAt "sessionID") &&
                (x._1 isDefinedAt "timestamp").
              map(x => {
                var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                try {
                  val dateStr: String = x._1("timestamp").asInstanceOf[String]
                  // timestamp from event json
                  // create DateTime from Timestamp string
                  val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                  // create millis from DateTime
                  millis = dateTime.toInstant.toEpochMilli
                } catch {
                  case e: Exception =>
                    e.printStackTrace()
                }
                // generate timeuuid
                val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                // generate eventDate
                val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                cassFormat(x._1("eventID").toString,
                  x._1("sessionID").toString,
                  uuid,
                  x._1("userID").toString,
                  eventDate,
                  x._2)
              })
            allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
        }
      })

timestampcassandra 中的列现在看起来像:58976340-7313-11e9-910d-60dce7513b94

于 2019-05-10T11:13:34.640 回答