0

我正在尝试使用 Scala Futures 将消息异步写入 Amazon Kinesis,以便对应用程序进行负载测试。

这段代码有效,我可以看到数据沿着我的管道移动,以及输出打印到控制台。

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}    

object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => send(int)).map(msg => println(msg))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

此代码适用于 Scala Futures。

import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

def doIt(x: Int) = {Thread.sleep(1000); x + 1}
(1 to 10).map(x => future{doIt(x)}).map(y => y.onSuccess({case x => println(x)}))

您会注意到,序列映射的语法几乎相同。但是,以下方法不起作用(即,它既不打印到控制台也不向我的管道发送数据)。

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global


object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => future {send(int)}).map(f => f.onSuccess({case msg => println(msg)}))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

关于这个项目的更多说明。我正在使用 Maven 进行构建(从命令行),并且运行所有上述代码(也从命令行)只是花花公子。

我的问题是:为什么使用相同的语法我的函数“发送”似乎没有执行?

4

0 回答 0