1

我无法通过 JMS 使用 HCATALOG 接收通知。我写了简单的生产者消费者程序。Apache MQ 服务正在后台运行。我能够使用 ApacheMQ 发送简单的文本消息。但是“markPartitionForEvent()”无法将事件发送到消费者的“onMessage()”调用。我参考了以下链接: https ://cwiki.apache.org/confluence/display/Hive/HCatalog+Notification

请指导

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import javax.jms.MessageConsumer;
import javax.management.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hcatalog.common.HCatConstants;
import java.util.Properties;
import javax.jms.*;
import javax.jdo.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.*;
import javax.jms.MessageConsumer;
import org.slf4j.LoggerFactory;
import org.datanucleus.api.jdo.*;
import javax.jdo.metadata.JDOMetadata;
import org.datanucleus.store.rdbms.RDBMSStoreManager;
import org.datanucleus.properties.PropertyStore;
import org.datanucleus.store.AbstractStoreManager;
import org.apache.hadoop.hive.metastore.api.Table;

class Consumer implements MessageListener
{
    public void start(){
        try
        {   
            HiveConf hiveConf;
            ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection conn = connFac.createConnection();
            conn.start();

            hiveConf = new HiveConf(Consumer.class);
            HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);

            Table table = msc.getTable("mydb","myTbl");
            table.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, "TOPIC"+ ".HCATALOG");
            System.out.println("TABLE = " + table.toString());
            Map<String,String> map = table.getParameters();
            System.out.println("MAP= " + map.toString());
            String fetchTopic = map.get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);

            System.out.println("fetchTopic = " + fetchTopic);


            String topicName = msc.getTable("mydb",
                    "mytbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
            System.out.println("TOPICNAME = " + topicName);

            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            if (session == null)
                System.out.println("NULL");
            System.out.println(session.toString());

            Destination hcatTopic = session.createTopic(fetchTopic);
            MessageConsumer consumer = session.createConsumer(hcatTopic);
            consumer.setMessageListener(this);
        }
        catch(Exception e){
            System.out.println("ERROR");
            e.printStackTrace();
        }
    }
    @Override
        public void onMessage(Message message){
            try
            {
                if(message.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)){
                    MapMessage mapMsg = (MapMessage)message;
                    Enumeration<String> keys = mapMsg.getMapNames();

                    while(keys.hasMoreElements())
                    {
                        String key = keys.nextElement();
                        System.out.println(key + " : " + mapMsg.getString(key));
                    }
                    System.out.println("Message: "+message);
                }
            }
            catch(Exception e){
                System.out.println("ERROR");
            }

        }
};

class Producer extends Thread
{
    private HiveConf hiveConf;

    public Producer()
    {
    }

    @Override
        public void run() {
            try
            {

                hiveConf = new HiveConf(this.getClass());
                HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);

                HashMap<String,String> partMap = new HashMap<String, String>();
                partMap.put("date","20110711");
                partMap.put("date","20110712");
                partMap.put("date","20110714");

                while(true)
                {
                    msc.markPartitionForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);
                    Thread.sleep(1000);
                }
            }
            catch(Exception e){
                System.out.println("ERROR");
            }
        }

};
4

0 回答 0