0

我正在编写集成测试用例来发送和接收来自 Amazon SQS 的消息。为此,我正在从 image 运行 docker 容器s12v/elasticmq:0.13.2。为了启动容器,我使用的是 Spotify docker 客户端。

依赖关系: -

    "com.whisk" %% "docker-testkit-scalatest" % "0.9.9" % "test",
    "com.whisk" %% "docker-testkit-impl-spotify" % "0.9.9" % "test"

特征DockerSqsService代码看起来像

import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
import com.whisk.docker.DockerReadyChecker.LogLineContains
import com.whisk.docker._
import com.whisk.docker.impl.spotify.SpotifyDockerFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.concurrent.duration._

trait DockerSqsService
  extends Suite
    with DockerKit
    with BeforeAndAfterAll
    with ScalaFutures
    with PatienceConfig {

  val defaultSqsPort = 9324
  val sqsTopicArn = "sqs-arn"
  val awsAccessKey = "access-key"
  val awsSecretKey = "secret-key"

  val sqsContainer: DockerContainer = DockerContainer("s12v/elasticmq:0.13.2")
    .withPorts(defaultSqsPort -> None)
    .withReadyChecker(
      LogLineContains("SQS server has started")
        .looped(15, 1.second)
    )

  val sqsPort: Int = {
    sqsContainer
      .getPorts()
      .map(_.apply(defaultSqsPort))
      .futureValue
  }
  val sqsEndpoint = s"http://localhost:$sqsPort"

  private val client: DockerClient = DefaultDockerClient.fromEnv().build()

  override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory(client)

  override lazy val containerManager = new DockerContainerManager(dockerContainers, dockerFactory.createExecutor())

  abstract override def dockerContainers: List[DockerContainer] =
    sqsContainer :: super.dockerContainers

  override def beforeAll(): Unit = {
    super.beforeAll()
  }
}

然后将此特征与包含所有测试配置和 AWS 设置的基本特征混合。BaseSetup特点是

import java.net.URI

import akka.actor.ActorSystem
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import fileuploader.gateway.{AWSUtils, SqsClient}
import org.mockito.Mockito.verifyNoMoreInteractions
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterEach, Inside}
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.utils.AttributeMap

trait BaseSetup extends AnyFlatSpec
  with MockFactory
  with Matchers
  with Inside
  with Eventually
  with BeforeAndAfterEach
  with DockerSqsService {

  private val logger = LoggerFactory.getLogger(getClass)
  implicit val actorSystem: ActorSystem = ActorSystem("sqs-client-actor-system")
  val httpClient: SdkAsyncHttpClient = AkkaHttpClient
    .builder()
    .buildWithDefaults(AttributeMap.empty())
  val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(
    AwsBasicCredentials.create(awsAccessKey, awsSecretKey)
  )
  implicit val asyncSqsClient: SqsAsyncClient =
    SqsAsyncClient
      .builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.EU_CENTRAL_1)
      .httpClient(httpClient)
      .overrideConfiguration(AWSUtils.overrideConfig)
      .build()

  val sqsClient = new SqsClient(sqsTopicArn)

  // Client to create queue in SQS
  val sqsAsyncClient: AmazonSQSAsync = {
    val sqsClientBuilder: AmazonSQSAsyncClientBuilder = AmazonSQSAsyncClientBuilder.standard()
    val awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(
      new BasicAWSCredentials(awsAccessKey, awsSecretKey)
    )
    sqsClientBuilder.withCredentials(
      awsStaticCredentialsProvider
    )

    sqsClientBuilder.withEndpointConfiguration(
      new EndpointConfiguration(sqsEndpoint, Region.EU_CENTRAL_1.toString)
    )
    sqsClientBuilder.build()
  }

  override def beforeAll(): Unit = {
    initialize()
  }

  override def afterEach(): Unit = {
    verifyNoMoreInteractions(sqsClient)
  }

  override def afterAll(): Unit = {
    actorSystem.terminate()
  }

  protected[fileuploader] def createSqsQueue(sqsAsync: AmazonSQSAsync, queueName: String): String = {
    val url = new URI(sqsAsync.createQueue(queueName).getQueueUrl)
    val sqsEndpointUrl = s"$sqsEndpoint${url.getPath}"
    logger.info(sqsEndpointUrl)
    sqsEndpointUrl
  }

  protected[fileuploader] def initialize(): Unit = {
    createSqsQueue(sqsAsyncClient, "file_uploader")
  }
}

最终,这个 BaseSetup 特征与我的规范文件混淆了,即

class SqsClientBase extends BaseSetup {

  "SqsClient" should "restart if it fails to connect to SQS" in {
    //given
    //when
    //then
  }
}

所以,我现在尝试运行这个规范,在任何测试用例运行之前,我得到一个错误

An exception or error caused a run to abort. 
java.lang.NullPointerException
    at fileuploader.util.DockerSqsService.containerManager(DockerSqsService.scala:43)
    at fileuploader.util.DockerSqsService.containerManager$(DockerSqsService.scala:43)
    at fileuploader.gateway.SqsClientBase.containerManager$lzycompute(SqsClientBase.scala:6)
    at fileuploader.gateway.SqsClientBase.containerManager(SqsClientBase.scala:6)
    at com.whisk.docker.DockerKit.getContainerState(DockerKit.scala:36)
    at com.whisk.docker.DockerKit.getContainerState$(DockerKit.scala:35)
    at fileuploader.gateway.SqsClientBase.getContainerState(SqsClientBase.scala:6)
    at com.whisk.docker.DockerKit.containerToState(DockerKit.scala:40)
    at com.whisk.docker.DockerKit.containerToState$(DockerKit.scala:39)
    at fileuploader.gateway.SqsClientBase.containerToState(SqsClientBase.scala:6)
    at fileuploader.util.DockerSqsService.$init$(DockerSqsService.scala:32)
    at fileuploader.gateway.SqsClientBase.<init>(SqsClientBase.scala:6)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at org.scalatest.tools.Runner$.genSuiteConfig(Runner.scala:1402)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$8(Runner.scala:1199)
    at scala.collection.immutable.List.map(List.scala:246)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1198)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)

我真的不知道如何解决这个问题。这里的任何指示都会非常有帮助。TIA

4

0 回答 0