1

我正在用 Spark/Scala 编写一个应用程序,我需要在其中计算一列的指数移动平均值。

EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6)

我面临的问题是我需要同一列的先前计算的值(EMA_t-1)。通过 mySQL,这可以通过使用 MODEL 或通过创建一个 EMA 列来实现,然后您可以更新每行的行,但我已经尝试过了,并且既不能使用 Spark SQL 也不能使用 Hive 上下文......有什么办法我可以访问这个 EMA_t-1?

我的数据如下所示:

timestamp price    
15:31 132.3 
15:32 132.48 
15:33 132.76 
15:34 132.66
15:35 132.71 
15:36 132.52
15:37 132.63
15:38 132.575
15:39 132.57

所以我需要添加一个新列,其中我的第一个值只是第一行的价格,然后我需要使用以前的值:EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6) 来计算该列中的以下行。我的 EMA 列必须是:

EMA
132.3
132.372
132.5272
132.58032
132.632192
132.5873152
132.6043891
132.5926335
132.5835801

我目前正在尝试使用 Spark SQL 和 Hive 来做到这一点,但如果可以用另一种方式来做到这一点,这将同样受欢迎!我还想知道如何使用 Spark Streaming 做到这一点。我的数据在数据框中,我使用的是 Spark 1.4.1。

非常感谢您提供的任何帮助!

4

2 回答 2

1

要回答您的问题:

我面临的问题是我需要同一列的先前计算的值(EMA_t-1)

我认为您需要两个功能:Window 和 Lag。(在计算 EMA 时,为了方便起见,我还将 null 值设为零)

my_window = Window.orderBy("timestamp")

df.withColumn("price_lag_1",when(lag(col("price"),1).over(my_window).isNull,lit(0)).otherwise(lag(col("price"),1).over(my_window)))

我也是 Spark Scala 的新手,我想看看我是否可以定义一个 UDF 来做指数平均。但是现在一个明显的走动将是手动添加所有滞后列( 0.4 * lag0 + 0.4*0.6*lag1 + 0.4 * 0.6^2*lag2 ...)像这样

df.withColumn("ema_price", 
price * lit(0.4) * Math.pow(0.6,0) + 
lag(col("price"),1).over(my_window) * 0.4 * Math.pow(0.6,1) +
lag(col("price"),2).over(my_window) * 0.4 * Math.pow(0.6,2)  + .... )

我忽略了 when.otherwise 以使其更清楚。这种方法现在对我有用..

- - 更新 - -

def emaFunc (y: org.apache.spark.sql.Column, group: org.apache.spark.sql.Column, order: org.apache.spark.sql.Column, beta: Double, lookBack: Int) : org.apache.spark.sql.Column = {
  val ema_window = Window.partitionBy(group).orderBy(order)
  var i = 1
  var result = y
  while (i < lookBack){
    result =  result + lit(1) * ( when(lag(y,i).over(ema_window).isNull,lit(0)).otherwise(lag(y,i).over(ema_window)) * beta * Math.pow((1-beta),i) 
    - when(lag(y,i).over(ema_window).isNull,lit(0)).otherwise(y * beta * Math.pow((1-beta),i))   )
    i = i + 1
  }
  return result } 

通过使用此功能,您应该能够获得价格的 EMA,例如..

df.withColumn("one",lit(1))
  .withColumn("ema_price", emaFunc('price,'one,'timestamp,0.1,10)

这将回顾 10 天并计算 beta=0.1 的估计 EMA。“一”列只是一个占位符,因为您没有分组列。

于 2018-08-14T17:07:05.123 回答
-1

您应该能够使用 1.4 中引入的 Spark 窗口函数来执行此操作:https ://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

w = Window().partitionBy().orderBy(col("timestamp")) df.select("*", lag("price").over(w).alias("ema"))

这将为您选择最后一个价格,以便您可以对其进行计算

于 2017-09-17T17:14:40.153 回答