我无法通过 JMS 使用 HCATALOG 接收通知。我写了简单的生产者消费者程序。Apache MQ 服务正在后台运行。我能够使用 ApacheMQ 发送简单的文本消息。但是“markPartitionForEvent()”无法将事件发送到消费者的“onMessage()”调用。我参考了以下链接: https ://cwiki.apache.org/confluence/display/Hive/HCatalog+Notification
请指导
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import javax.jms.MessageConsumer;
import javax.management.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hcatalog.common.HCatConstants;
import java.util.Properties;
import javax.jms.*;
import javax.jdo.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.*;
import javax.jms.MessageConsumer;
import org.slf4j.LoggerFactory;
import org.datanucleus.api.jdo.*;
import javax.jdo.metadata.JDOMetadata;
import org.datanucleus.store.rdbms.RDBMSStoreManager;
import org.datanucleus.properties.PropertyStore;
import org.datanucleus.store.AbstractStoreManager;
import org.apache.hadoop.hive.metastore.api.Table;
class Consumer implements MessageListener
{
public void start(){
try
{
HiveConf hiveConf;
ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = connFac.createConnection();
conn.start();
hiveConf = new HiveConf(Consumer.class);
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
Table table = msc.getTable("mydb","myTbl");
table.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, "TOPIC"+ ".HCATALOG");
System.out.println("TABLE = " + table.toString());
Map<String,String> map = table.getParameters();
System.out.println("MAP= " + map.toString());
String fetchTopic = map.get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
System.out.println("fetchTopic = " + fetchTopic);
String topicName = msc.getTable("mydb",
"mytbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
System.out.println("TOPICNAME = " + topicName);
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (session == null)
System.out.println("NULL");
System.out.println(session.toString());
Destination hcatTopic = session.createTopic(fetchTopic);
MessageConsumer consumer = session.createConsumer(hcatTopic);
consumer.setMessageListener(this);
}
catch(Exception e){
System.out.println("ERROR");
e.printStackTrace();
}
}
@Override
public void onMessage(Message message){
try
{
if(message.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)){
MapMessage mapMsg = (MapMessage)message;
Enumeration<String> keys = mapMsg.getMapNames();
while(keys.hasMoreElements())
{
String key = keys.nextElement();
System.out.println(key + " : " + mapMsg.getString(key));
}
System.out.println("Message: "+message);
}
}
catch(Exception e){
System.out.println("ERROR");
}
}
};
class Producer extends Thread
{
private HiveConf hiveConf;
public Producer()
{
}
@Override
public void run() {
try
{
hiveConf = new HiveConf(this.getClass());
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
HashMap<String,String> partMap = new HashMap<String, String>();
partMap.put("date","20110711");
partMap.put("date","20110712");
partMap.put("date","20110714");
while(true)
{
msc.markPartitionForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);
Thread.sleep(1000);
}
}
catch(Exception e){
System.out.println("ERROR");
}
}
};