我正在尝试使用 [JCA resoruce-adapters][1] 来使用 anMDB
连接到kafka
. 以下是standalone-full.xml
配置kafka的条目resoruce adapter
和相关ejb-mdb
定义:
请注意,我已将 部署mdb
为ejbModule
嵌入在ear
部署应用程序时出现以下错误
22:43:53,073 ERROR [org.jboss.msc.service.fail] (MSC service thread 1-1) MSC000001: Failed to start service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: org.jboss.msc.service.StartException in service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: Failed to start service
at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1731)
at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$ControllerTask.run(ServiceControllerImpl.java:1559)
at org.jboss.threads@2.4.0.Final//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1990)
at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1363)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka
at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createActivationSpecs(MessageDrivenComponentCreateService.java:142)
at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createComponent(MessageDrivenComponentCreateService.java:105)
at org.jboss.as.ee@21.0.2.Final//org.jboss.as.ee.component.BasicComponentCreateService.start(BasicComponentCreateService.java:86)
at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.start(MessageDrivenComponentCreateService.java:93)
at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.startService(ServiceControllerImpl.java:1739)
at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1701)
... 6 more
22:43:53,076 ERROR [org.jboss.as.controller.management-operation] (External Management Request Threads -- 1) WFLYCTL0013: Operation ("add") failed - address: ([("deployment" => "kafkatemplatereceiver.ear")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
22:43:53,077 ERROR [org.jboss.as.server] (External Management Request Threads -- 1) WFLYSRV0021: Deploy of deployment "kafkatemplatereceiver.ear" was rolled back with the following failure message:
{"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
**standalone-full.xml**
<subsystem xmlns="urn:jboss:domain:ejb3:8.0">
<session-bean>
<stateless>
<bean-instance-pool-ref pool-name="slsb-strict-max-pool"/>
</stateless>
<stateful default-access-timeout="5000" cache-ref="simple" passivation-disabled-cache-ref="simple"/>
<singleton default-access-timeout="5000"/>
</session-bean>
<mdb>
<!--<resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:kafka-rar-0.6.0.rar}"/>-->
<resource-adapter-ref resource-adapter-name="kafka"/>
<bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
</mdb> ...
Standalone-full.xml 资源适配器配置
<subsystem xmlns="urn:jboss:domain:resource-adapters:6.0">
<resource-adapters>
<resource-adapter id="kafka">
<archive>
kafka-rar-0.6.0.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<connection-definitions>
<connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<initial-pool-size>1</initial-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
</connection-definition>
</connection-definitions>
</resource-adapter>
</resource-adapters>
</subsystem>
MDB 类:
package com.zhun.euon.service;
import java.util.Date;
import javax.ejb.MessageDriven;
import javax.ejb.ActivationConfigProperty;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;
//import com.zhun.euon.service.serializer.Stringo;
import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "clientId", propertyValue = "testClient"),
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "test-consumer-group"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "zhun_core_data_service_topic"),
@ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "192.168.0.105:9092"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "1000") ,
// @ActivationConfigProperty(propertyName="resourceAdapterName",propertyValue="kafka-rar-0.6.0.rar"),
// @ActivationConfigProperty(propertyName="resourceAdapterMid",propertyValue="kafka-rar-0.6.0.rar"),
@ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
})
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {
@OnRecord( topics={"zhun_core_data_service_topic"})
public void getMessageTest(ConsumerRecord<String,String> record) {
System.out.println("Got record on topic testing " + record + "at time"+ new Date());
}
}
我不确定我是否遗漏了一些东西,但是我无法弄清楚为什么服务器会在rar
文件本身而不是我的文件中寻找监听器的接口实现EJB- Jar
[1]:https ://github.com /payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR