For testing I have managed to run an embedded standalone pulsar server and client. I also can send and receive messages. However I actually want to (integration-)test functions (implementing org.apache.pulsar.functions.api.Function
). How can I register functions in the embedded setup?
package kic.data.stream.pulsar
import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification
import java.util.concurrent.TimeUnit
@Log
class PulsarEmbeddedTest extends Specification {
static final String TOPIC = "hello";
static final int NUM_OF_MESSAGES = 100;
static PulsarStandalone standalone
static PulsarService pulsarService
def setupSpec() {
def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
log.info("${PulsarStandalone.properties}")
standalone = PulsarStandaloneBuilder.instance()
.withConfig(conf)
.withNoStreamStorage(true)
.build()
standalone.configFile = configFile
standalone.start()
pulsarService = new PulsarService(conf)
}
def test() {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarService.brokerServiceUrl)
.build()
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC)
.enableBatching(false)
.create()
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
//.subscriptionInitialPosition()
.subscriptionName("test-subs-1")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(Mesa)
.subscribe()
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
producer.send("Hello_" + i)
}
Message<String> message
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
// This calls blocks until a message is available.
message = consumer.receive(1, TimeUnit.SECONDS)
//log.info("Message received : ${message.getValue()}")
println("Message received : ${message.messageId}:${message.value}")
consumer.acknowledge(message)
}
producer.close()
consumer.close()
client.close()
expect:
1==1
}
def cleanupSpec() {
standalone.close()
}
}