1

For testing I have managed to run an embedded standalone pulsar server and client. I also can send and receive messages. However I actually want to (integration-)test functions (implementing org.apache.pulsar.functions.api.Function). How can I register functions in the embedded setup?

package kic.data.stream.pulsar

import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification

import java.util.concurrent.TimeUnit

@Log
class PulsarEmbeddedTest extends Specification {

    static final String TOPIC = "hello";
    static final int NUM_OF_MESSAGES = 100;
    static PulsarStandalone standalone
    static PulsarService pulsarService

    def setupSpec() {
        def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
        def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
        log.info("${PulsarStandalone.properties}")
        standalone = PulsarStandaloneBuilder.instance()
                                            .withConfig(conf)
                                            .withNoStreamStorage(true)
                                            .build()
        standalone.configFile = configFile
        standalone.start()
        pulsarService = new PulsarService(conf)
    }

    def test() {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarService.brokerServiceUrl)
                .build()

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC)
                .enableBatching(false)
                .create()

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC)
                //.subscriptionInitialPosition()
                .subscriptionName("test-subs-1")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener(Mesa)
                .subscribe()



        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            producer.send("Hello_" + i)
        }


        Message<String> message
        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            // This calls blocks until a message is available.
            message = consumer.receive(1, TimeUnit.SECONDS)
            //log.info("Message received : ${message.getValue()}")
            println("Message received : ${message.messageId}:${message.value}")

            consumer.acknowledge(message)
        }

        producer.close()
        consumer.close()
        client.close()

        expect:
        1==1

    }

    def cleanupSpec() {
        standalone.close()
    }

}
4

1 回答 1

2

您应该能够通过 Pulsar Admin API 创建 Pulsar 函数,就像创建普通 Pulsar 集群一样,例如

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")

pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());

Apache Pulsar 项目中也有不少用于测试 Pulsar 功能的集成测试。有基于 docker 的真正集成测试,也有单进程“集成”测试。以下是您可以参考的单进程“集成”测试示例:

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java

于 2019-06-09T18:22:26.217 回答