1

我正在尝试使用 [JCA resoruce-adapters][1] 来使用 anMDB连接到kafka. 以下是standalone-full.xml 配置kafka的条目resoruce adapter和相关ejb-mdb定义:

请注意,我已将 部署mdbejbModule嵌入在ear

部署应用程序时出现以下错误

 22:43:53,073 ERROR [org.jboss.msc.service.fail] (MSC service thread 1-1) MSC000001: Failed to start service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: org.jboss.msc.service.StartException in service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: Failed to start service
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1731)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$ControllerTask.run(ServiceControllerImpl.java:1559)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1990)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1363)
        at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createActivationSpecs(MessageDrivenComponentCreateService.java:142)
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createComponent(MessageDrivenComponentCreateService.java:105)
        at org.jboss.as.ee@21.0.2.Final//org.jboss.as.ee.component.BasicComponentCreateService.start(BasicComponentCreateService.java:86)
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.start(MessageDrivenComponentCreateService.java:93)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.startService(ServiceControllerImpl.java:1739)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1701)
        ... 6 more
    
    22:43:53,076 ERROR [org.jboss.as.controller.management-operation] (External Management Request Threads -- 1) WFLYCTL0013: Operation ("add") failed - address: ([("deployment" => "kafkatemplatereceiver.ear")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
        Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
    22:43:53,077 ERROR [org.jboss.as.server] (External Management Request Threads -- 1) WFLYSRV0021: Deploy of deployment "kafkatemplatereceiver.ear" was rolled back with the following failure message: 
    {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
        Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}

**standalone-full.xml**
 

     <subsystem xmlns="urn:jboss:domain:ejb3:8.0">
                <session-bean>
                    <stateless>
                        <bean-instance-pool-ref pool-name="slsb-strict-max-pool"/>
                    </stateless>
                    <stateful default-access-timeout="5000" cache-ref="simple" passivation-disabled-cache-ref="simple"/>
                    <singleton default-access-timeout="5000"/>
                </session-bean>
                <mdb>
        <!--<resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:kafka-rar-0.6.0.rar}"/>-->
                    <resource-adapter-ref resource-adapter-name="kafka"/>
                    <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
                </mdb> ...

Standalone-full.xml 资源适配器配置

<subsystem xmlns="urn:jboss:domain:resource-adapters:6.0">
        <resource-adapters>
            <resource-adapter id="kafka">
                <archive>
                    kafka-rar-0.6.0.rar
                </archive>
                <transaction-support>XATransaction</transaction-support>
                <connection-definitions>
                    <connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                        <xa-pool>
                            <min-pool-size>1</min-pool-size>
                            <initial-pool-size>1</initial-pool-size>
                            <max-pool-size>20</max-pool-size>
                            <prefill>false</prefill>
                            <is-same-rm-override>false</is-same-rm-override>
                        </xa-pool>
                    </connection-definition>
                </connection-definitions>
            </resource-adapter>
        </resource-adapters>
    </subsystem>

MDB 类:

package com.zhun.euon.service;

import java.util.Date;
import javax.ejb.MessageDriven;
import javax.ejb.ActivationConfigProperty;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;

//import com.zhun.euon.service.serializer.Stringo;

import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;

@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "clientId", propertyValue = "testClient"),
    @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "test-consumer-group"),
    @ActivationConfigProperty(propertyName = "topics", propertyValue = "zhun_core_data_service_topic"),
    @ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "192.168.0.105:9092"),   
    @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),   
    @ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),   
  @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "1000")  ,   
//  @ActivationConfigProperty(propertyName="resourceAdapterName",propertyValue="kafka-rar-0.6.0.rar"), 
//  @ActivationConfigProperty(propertyName="resourceAdapterMid",propertyValue="kafka-rar-0.6.0.rar"),
     @ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),   
    @ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),   
    })
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {
     
    @OnRecord( topics={"zhun_core_data_service_topic"})
    public void getMessageTest(ConsumerRecord<String,String> record) {
        System.out.println("Got record on topic testing " + record + "at time"+ new Date());
    }
} 
 

我不确定我是否遗漏了一些东西,但是我无法弄清楚为什么服务器会在rar文件本身而不是我的文件中寻找监听器的接口实现EJB- Jar[1]:https ://github.com /payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR

4

1 回答 1

0

我面临着类似的问题。最终解决这个问题的方法是在 webapp/WEB-INF/ 下添加一个 jboss-deployment-structure.xml 文件。这指向已与 JBoss 服务器一起部署的 Kafka RAR:

<jboss-deployment-structure>
    <deployment>
        <dependencies>
            <module name="deployment.kafka-rar-0.6.1-SNAPSHOT.rar" export="TRUE"/>
        </dependencies>
    </deployment>
</jboss-deployment-structure>

这是我正在关注的教程-> http://www.mastertheboss.com/other/jboss-stuff/apache-kafka/getting-started-with-apache-kafka-and-wildfly

于 2021-06-14T14:09:15.073 回答