3

我正在尝试在 Spark 流中为 WSMQ 数据源实现客户接收器。我按照此处提供的示例进行操作。

后来我在这个 Github 存储库中进行了示例。

我遇到了三个问题:

1:错误(程序运行一段时间后出现此错误)

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
  1. 即使我在创建会话时使用了此代码,该程序也不会从 WSMQ 中删除消息

    MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
  2. 我需要Receiver在 Custom Receiver Spark API 上实现一个可靠的解释。它说:

    要实现可靠的接收器,您必须使用 store(multiple-records) 来存储数据。这种风格的 store 是一个阻塞调用,只有在所有给定的记录都存储在 Spark 中之后才会返回。如果接收方配置的存储级别使用复制(默认启用),则此调用在复制完成后返回。因此,它确保数据被可靠地存储,并且接收器现在可以适当地确认源。这确保了当接收器在复制数据的过程中失败时不会导致任何数据——缓冲的数据将不会被确认,因此稍后将由源重新发送。

我不知道如何处理存储(多条记录)?

我无法弄清楚为什么会发生这些错误以及如何实现可靠的Receiver.

这是代码:

public class JavaConnector extends Receiver<String> {

    String host = null;
    int port = -1;
    String qm=null;
    String qn=null;
    String channel=null;
    transient Gson gson=new Gson();
    transient MQQueueConnection qCon= null;
    String topic=null;

    Enumeration enumeration =null;
    private static MQQueueReceiver receiver = null;


    public JavaConnector(String host , int port, String qm, String channel, String qn) {
        super(StorageLevel.MEMORY_ONLY_2());
        this.host = host;
        this.port = port;
        this.qm=qm;
        this.qn=qn;
        this.channel=channel;


    }

    public void onStart()  {
        // Start the thread that receives data over a connection
        new Thread()  {
            @Override public void run() {
                try {
                    initConnection();
                    receive();
                }
                catch (JMSException ex)
                {
                    ex.printStackTrace();
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
            }
        }.start();
    }

    public void onStop() {

        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false

    }

    /** Create a MQ connection and receive data until receiver is stopped */
    private void receive() throws InterruptedException {
        System.out.print("Started receiving messages from MQ");


        try {

            JMSTextMessage receivedMessage= null;
            int cnt =0;

            //JMSTextMessage receivedMessage = (JMSTextMessage) receiver.receive(10000);

            boolean flag=false;
            while (!isStopped() && enumeration.hasMoreElements()&&cnt<50 )
            {

                receivedMessage= (JMSTextMessage) enumeration.nextElement();
                receivedMessage.acknowledge();
                String userInput = receivedMessage.getText();

                    ArrayList<String> list = new ArrayList<String>();
                    list.add(userInput);
                    Iterator<String> itr = list.iterator();
                    store(itr);
                cnt++;

            }
            /*while (!isStopped() && receivedMessage !=null)
            {

               // receivedMessage= (JMSTextMessage) enumeration.nextElement();
                String userInput = receivedMessage.getText();

                store(userInput);
        receivedMessage.acknowledge();

            }*/

            // Restart in an attempt to connect again when server is active again
            //restart("Trying to connect again");

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

        }
        catch(Exception e)
        {      Thread.sleep(100);
            System.out.println("WRONG"+e.toString());
            e.printStackTrace();
            restart("Trying to connect again");
        }
        catch(Throwable t) {
            Thread.sleep(100);
            System.out.println("WRONG-1"+t.toString());
            // restart if there is any other error
            restart("Error receiving data", t);
        }



    }

    public void initConnection() throws JMSException,InterruptedException {
        try {
            MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
            conFactory.setHostName(host);
            conFactory.setPort(port);
            conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
            conFactory.setQueueManager(qm);
            conFactory.setChannel(channel);
            conFactory.setMsgBatchSize(100);


            qCon = (MQQueueConnection) conFactory.createQueueConnection();
            MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            MQQueue queue = (MQQueue) qSession.createQueue(qn);
            MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
            qCon.start();
            //receiver = (MQQueueReceiver) qSession.createReceiver(queue);
            enumeration= browser.getEnumeration();


        } catch (Exception e) {
            Thread.sleep(1000);
        }
    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_ONLY_2();
    }
4

1 回答 1

1

最后我能够解决这个问题。解决方案 1:蒸汽上下文尝试写入 Kafka,因为 kafka 已关闭并且它给了我 IO 错误。那是我的愚蠢。:)

解决方案 2:我应该使用 MessageListener,QueueBrowser 用于读取消息,它实际上并不消耗消息。

于 2016-02-05T19:06:07.867 回答