首先是我的整个 EJB 文件:
package enkia.pulse.indexing.beans;
import enkia.pulse.core.Category;
import enkia.pulse.core.Product;
import enkia.pulse.core.Review;
import enkia.pulse.core.Twitter;
import enkia.utils.HarvestingConstants;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Resource;
import javax.ejb.SessionContext;
import javax.ejb.Stateless;
import javax.ejb.TimedObject;
import javax.ejb.Timer;
import javax.ejb.TimerService;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.UserTransaction;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
/**
*
* @author joshua
*/
@Stateless
public class TwitterBean implements TwitterBeanLocal,
TimedObject {
List<String> _twitterTopics;
Map<String,Integer> _tagCatRel;
TimerService _timerService;
Timer _timer;
/** The session context needed to create the timer */
@Resource
private SessionContext _sc;
@PersistenceContext(unitName=HarvestingConstants.PERSISTENCE_UNIT)
EntityManager _entityManager;
/** A logging object for formatted output to the server log. */
private Logger _logger;
private int errors;
/**
* Constructs the logger
*/
public TwitterBean(){
_logger = Logger.getLogger(this.getClass().getName());
_logger.log(Level.INFO,"Instantiating Twitter Bean");
}
/**
* Attempts to retrieve the configuration object. Creates the harvester
* with the configuration and then sets a timer to run the harvester
* periodically
*/
public void initialize() {
_logger.log(Level.INFO,"Initializing Twitter bean.");
_twitterTopics = new LinkedList<String>();
_tagCatRel = new HashMap<String,Integer>();
_timerService = _sc.getTimerService();
_timer = _timerService.createTimer(0,1000*60*60,null); //restart every hour
_logger.log(Level.INFO,"Starting Twitter timer");
}
public void ejbTimeout(Timer timer) {
_logger.log(Level.INFO,"Running Twitter timer");
findTopics();
try {
setupStream();
} catch (TwitterException ex) {
Logger.getLogger(TwitterBean.class.getName()).log(Level.SEVERE, null, ex);
}
}
private void setupStream() throws TwitterException{
StatusListener listener = new StatusListener(){
@Override
public void onStatus(Status status) {
insertStatus(status);
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
//DO nothing
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
_logger.log(Level.INFO,"Track limitation notice: "+numberOfLimitedStatuses);
}
@Override
public void onScrubGeo(long l, long l1) {
_logger.log(Level.INFO,"Scrub GEO");
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.setOAuthConsumer("secret", "secret");
twitterStream.setOAuthAccessToken(new AccessToken("secret","secret"));
FilterQuery query = new FilterQuery();
query = query.track(_twitterTopics.toArray(new String[_twitterTopics.size()]));
twitterStream.addListener(listener);
twitterStream.filter(query);
}
public void insertStatus(Status status){
String foundTag="";
for(String tag : _tagCatRel.keySet()){
if(status.getText().toLowerCase().contains(tag.toLowerCase())){
//found
foundTag=tag;
break;
}
}
if(foundTag.equals("")){
return;
}
Integer category = _tagCatRel.get(foundTag);
Query q=_entityManager.createNamedQuery("Category.findByCategoryId");
q.setParameter("categoryId",category);
Category c = (Category) q.getSingleResult();
Product p = new Product(c);
_entityManager.persist(p);
_entityManager.merge(p);
Review r = new Review();
r.setReview(status.getText());
r.setUrl("http://www.twitter.com/"+status.getUser().getScreenName()+"/statuses/"+status.getId());
r.setProcessed(0);
r.setDateCreated(status.getCreatedAt().getTime());
p.getPartNumber();
r.setProductId(p.getProductId());
_entityManager.persist(r);
_logger.log(Level.INFO,"Added tweet:" + r.getReview());
}
private void findTopics() {
_twitterTopics = new LinkedList<String>();
Query twitterQuery=_entityManager.createNamedQuery("Twitter.findAll");
String all="";
for(Object t: twitterQuery.getResultList()){
Twitter twitter=(Twitter) t;
for(String tag : twitter.getTags().split(" ")){
_twitterTopics.add(tag);
all+=tag+", ";
Integer test = twitter.getCategoryId();
_tagCatRel.put(tag,twitter.getCategoryId());
}
}
_logger.log(Level.INFO,"Tracking: "+all);
}
}
还有我的persistence.xml:
<?xml version="1.0" encoding="UTF-8"?>
<persistence version="1.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
<persistence-unit name="PulsePU" transaction-type="JTA">
<jta-data-source>pulseEJB</jta-data-source>
<class>enkia.pulse.core.Category</class>
<class>enkia.pulse.core.Department</class>
<class>enkia.pulse.core.Feature</class>
<class>enkia.pulse.core.Product</class>
<class>enkia.pulse.core.Review</class>
<class>enkia.pulse.core.ReviewSnippet</class>
<class>enkia.pulse.core.Sentiment</class>
<class>enkia.pulse.core.SentimentReview</class>
<class>enkia.pulse.core.Twitter</class>
<exclude-unlisted-classes>true</exclude-unlisted-classes>
<properties/>
</persistence-unit>
</persistence>
最后是我的 sun-resource.xml:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE resources PUBLIC "-//Sun Microsystems, Inc.//DTD Application Server 9.0 Resource Definitions //EN" "http://www.sun.com/software/appserver/dtds/sun-resources_1_3.dtd">
<resources>
<jdbc-resource enabled="true" jndi-name="pulseEJB" object-type="user" pool-name="mysqlPool"/>
<jdbc-connection-pool allow-non-component-callers="false" associate-with-thread="false" connection-creation-retry-attempts="0" connection-creation-retry-interval-in-seconds="10" connection-leak-reclaim="false" connection-leak-timeout-in-seconds="0" connection-validation-method="auto-commit" datasource-classname="com.mysql.jdbc.jdbc2.optional.MysqlDataSource" fail-all-connections="false" idle-timeout-in-seconds="300" is-connection-validation-required="false" is-isolation-level-guaranteed="true" lazy-connection-association="false" lazy-connection-enlistment="false" match-connections="false" max-connection-usage-count="0" max-pool-size="32" max-wait-time-in-millis="60000" name="mysqlPool" non-transactional-connections="false" pool-resize-quantity="2" res-type="javax.sql.DataSource" statement-timeout-in-seconds="-1" steady-pool-size="8" validate-atmost-once-period-in-seconds="0" wrap-jdbc-objects="false">
<property name="serverName" value="endpoint"/>
<property name="portNumber" value="3306"/>
<property name="databaseName" value="pulse"/>
<property name="User" value="user"/>
<property name="Password" value="password"/>
<property name="URL" value="jdbc:mysql://endpoint/pulse"/>
<property name="driverClass" value="com.mysql.jdbc.Driver"/>
我正在使用 Netbeans。
我EJB
在一个webproject
. 我有另一个 EJB 设置,与我在那里实例化的相同,它适用于容器管理的事务。我也试过只是说“搞砸”并使用了 UserTransaction 但合并出现问题并最终出现许多意想不到的问题,NPE 在 p 之后 "_entityManager.persist(p); _entityManager.merge(p);"
任何关于在哪里寻找两个 EJB 之间差异的建议都值得赞赏,因为我没有想法。
我还注意到netbeans
在有问题的 EJB 中为我的两个实体类生成源,"ap-source-output"
但在工作的 EJB 中没有。
生成的代码我不明白为什么会在下面生成:package enkia.pulse.core;
import javax.annotation.Generated;
import javax.persistence.metamodel.SingularAttribute;
import javax.persistence.metamodel.StaticMetamodel;
@Generated(value="EclipseLink-2.2.0.v20110202-r8913", date="2012-08-08T23:09:05")
@StaticMetamodel(Twitter.class)
public class Twitter_ {
public static volatile SingularAttribute<Twitter, Integer> id;
public static volatile SingularAttribute<Twitter, String> tags;
public static volatile SingularAttribute<Twitter, Integer> categoryId;
public static volatile SingularAttribute<Twitter, Long> lastStatus;
}
和
package enkia.pulse.core;
import enkia.pulse.core.Category;
import javax.annotation.Generated;
import javax.persistence.metamodel.SingularAttribute;
import javax.persistence.metamodel.StaticMetamodel;
@Generated(value="EclipseLink-2.2.0.v20110202-r8913", date="2012-08-08T23:41:31")
@StaticMetamodel(Product.class)
public class Product_ {
public static volatile SingularAttribute<Product, String> productBrand;
public static volatile SingularAttribute<Product, Category> category;
public static volatile SingularAttribute<Product, String> model;
public static volatile SingularAttribute<Product, byte[]> image;
public static volatile SingularAttribute<Product, String> productName;
public static volatile SingularAttribute<Product, String> imageURL;
public static volatile SingularAttribute<Product, String> specifications;
public static volatile SingularAttribute<Product, Integer> productId;
public static volatile SingularAttribute<Product, String> partNumber;
}
当我这样做时,我也会显示实体文件: /* * 要更改此模板,请选择工具 | 模板 * 并在编辑器中打开模板。*/ 包 enkia.pulse.core;
import java.io.Serializable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
/**
*
* @author fbarrow
*/
@Entity
@Table(name = "twitter")
@NamedQueries({
@NamedQuery(name = "Twitter.findAll", query = "SELECT t FROM Twitter t"),
@NamedQuery(name = "Twitter.findById", query = "SELECT t FROM Twitter t WHERE t.id = :id"),
@NamedQuery(name = "Twitter.findByCategoryId", query = "SELECT t FROM Twitter t WHERE t.categoryId = :categoryId"),
@NamedQuery(name = "Twitter.findByTags", query = "SELECT t FROM Twitter t WHERE t.tags = :tags"),
@NamedQuery(name = "Twitter.findByLastStatus", query = "SELECT t FROM Twitter t WHERE t.lastStatus = :lastStatus")})
public class Twitter implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private Integer id;
@Column(name = "categoryId")
private Integer categoryId;
@Column(name = "tags")
private String tags;
@Column(name = "lastStatus")
private Long lastStatus;
public Twitter() {
}
public Twitter(Integer id) {
this.id = id;
}
public Twitter(Integer id, Integer categoryId, String tags, Long lastStatus) {
this.id = id;
this.categoryId = categoryId;
this.tags = tags;
this.lastStatus = lastStatus;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getCategoryId() {
return categoryId;
}
public void setCategoryId(Integer categoryId) {
this.categoryId = categoryId;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public Long getLastStatus() {
return lastStatus;
}
public void setLastStatus(Long lastStatus) {
this.lastStatus = lastStatus;
}
@Override
public int hashCode() {
Integer hash = 0;
hash += (id != null ? id.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object object) {
// TODO: Warning - this method won't work in the case the id fields are not set
if (!(object instanceof Twitter)) {
return false;
}
Twitter other = (Twitter) object;
if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
return false;
}
return true;
}
@Override
public String toString() {
return "enkia.pulse.core.Twitter[ id=" + id + " ]";
}
}
错误:
SEVERE: javax.persistence.TransactionRequiredException
at com.sun.enterprise.container.common.impl.EntityManagerWrapper.doTxRequiredCheck(EntityManagerWrapper.java:163)
at com.sun.enterprise.container.common.impl.EntityManagerWrapper.doTransactionScopedTxCheck(EntityManagerWrapper.java:145)
at com.sun.enterprise.container.common.impl.EntityManagerWrapper.persist(EntityManagerWrapper.java:263)
at enkia.pulse.indexing.beans.TwitterBean.insertStatus(TwitterBean.java:154)
at enkia.pulse.indexing.beans.TwitterBean$1.onStatus(TwitterBean.java:99)
at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:78)
at twitter4j.AbstractStreamImplementation$1.run(AbstractStreamImplementation.java:107)
at twitter4j.internal.async.ExecuteThread.run(DispatcherImpl.java:114)