我正在编写集成测试用例来发送和接收来自 Amazon SQS 的消息。为此,我正在从 image 运行 docker 容器s12v/elasticmq:0.13.2
。为了启动容器,我使用的是 Spotify docker 客户端。
依赖关系: -
"com.whisk" %% "docker-testkit-scalatest" % "0.9.9" % "test",
"com.whisk" %% "docker-testkit-impl-spotify" % "0.9.9" % "test"
特征DockerSqsService
代码看起来像
import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
import com.whisk.docker.DockerReadyChecker.LogLineContains
import com.whisk.docker._
import com.whisk.docker.impl.spotify.SpotifyDockerFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Suite}
import scala.concurrent.duration._
trait DockerSqsService
extends Suite
with DockerKit
with BeforeAndAfterAll
with ScalaFutures
with PatienceConfig {
val defaultSqsPort = 9324
val sqsTopicArn = "sqs-arn"
val awsAccessKey = "access-key"
val awsSecretKey = "secret-key"
val sqsContainer: DockerContainer = DockerContainer("s12v/elasticmq:0.13.2")
.withPorts(defaultSqsPort -> None)
.withReadyChecker(
LogLineContains("SQS server has started")
.looped(15, 1.second)
)
val sqsPort: Int = {
sqsContainer
.getPorts()
.map(_.apply(defaultSqsPort))
.futureValue
}
val sqsEndpoint = s"http://localhost:$sqsPort"
private val client: DockerClient = DefaultDockerClient.fromEnv().build()
override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory(client)
override lazy val containerManager = new DockerContainerManager(dockerContainers, dockerFactory.createExecutor())
abstract override def dockerContainers: List[DockerContainer] =
sqsContainer :: super.dockerContainers
override def beforeAll(): Unit = {
super.beforeAll()
}
}
然后将此特征与包含所有测试配置和 AWS 设置的基本特征混合。BaseSetup
特点是
import java.net.URI
import akka.actor.ActorSystem
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import fileuploader.gateway.{AWSUtils, SqsClient}
import org.mockito.Mockito.verifyNoMoreInteractions
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterEach, Inside}
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.utils.AttributeMap
trait BaseSetup extends AnyFlatSpec
with MockFactory
with Matchers
with Inside
with Eventually
with BeforeAndAfterEach
with DockerSqsService {
private val logger = LoggerFactory.getLogger(getClass)
implicit val actorSystem: ActorSystem = ActorSystem("sqs-client-actor-system")
val httpClient: SdkAsyncHttpClient = AkkaHttpClient
.builder()
.buildWithDefaults(AttributeMap.empty())
val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(awsAccessKey, awsSecretKey)
)
implicit val asyncSqsClient: SqsAsyncClient =
SqsAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_CENTRAL_1)
.httpClient(httpClient)
.overrideConfiguration(AWSUtils.overrideConfig)
.build()
val sqsClient = new SqsClient(sqsTopicArn)
// Client to create queue in SQS
val sqsAsyncClient: AmazonSQSAsync = {
val sqsClientBuilder: AmazonSQSAsyncClientBuilder = AmazonSQSAsyncClientBuilder.standard()
val awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(awsAccessKey, awsSecretKey)
)
sqsClientBuilder.withCredentials(
awsStaticCredentialsProvider
)
sqsClientBuilder.withEndpointConfiguration(
new EndpointConfiguration(sqsEndpoint, Region.EU_CENTRAL_1.toString)
)
sqsClientBuilder.build()
}
override def beforeAll(): Unit = {
initialize()
}
override def afterEach(): Unit = {
verifyNoMoreInteractions(sqsClient)
}
override def afterAll(): Unit = {
actorSystem.terminate()
}
protected[fileuploader] def createSqsQueue(sqsAsync: AmazonSQSAsync, queueName: String): String = {
val url = new URI(sqsAsync.createQueue(queueName).getQueueUrl)
val sqsEndpointUrl = s"$sqsEndpoint${url.getPath}"
logger.info(sqsEndpointUrl)
sqsEndpointUrl
}
protected[fileuploader] def initialize(): Unit = {
createSqsQueue(sqsAsyncClient, "file_uploader")
}
}
最终,这个 BaseSetup 特征与我的规范文件混淆了,即
class SqsClientBase extends BaseSetup {
"SqsClient" should "restart if it fails to connect to SQS" in {
//given
//when
//then
}
}
所以,我现在尝试运行这个规范,在任何测试用例运行之前,我得到一个错误
An exception or error caused a run to abort.
java.lang.NullPointerException
at fileuploader.util.DockerSqsService.containerManager(DockerSqsService.scala:43)
at fileuploader.util.DockerSqsService.containerManager$(DockerSqsService.scala:43)
at fileuploader.gateway.SqsClientBase.containerManager$lzycompute(SqsClientBase.scala:6)
at fileuploader.gateway.SqsClientBase.containerManager(SqsClientBase.scala:6)
at com.whisk.docker.DockerKit.getContainerState(DockerKit.scala:36)
at com.whisk.docker.DockerKit.getContainerState$(DockerKit.scala:35)
at fileuploader.gateway.SqsClientBase.getContainerState(SqsClientBase.scala:6)
at com.whisk.docker.DockerKit.containerToState(DockerKit.scala:40)
at com.whisk.docker.DockerKit.containerToState$(DockerKit.scala:39)
at fileuploader.gateway.SqsClientBase.containerToState(SqsClientBase.scala:6)
at fileuploader.util.DockerSqsService.$init$(DockerSqsService.scala:32)
at fileuploader.gateway.SqsClientBase.<init>(SqsClientBase.scala:6)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at org.scalatest.tools.Runner$.genSuiteConfig(Runner.scala:1402)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$8(Runner.scala:1199)
at scala.collection.immutable.List.map(List.scala:246)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1198)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
我真的不知道如何解决这个问题。这里的任何指示都会非常有帮助。TIA