0

首先是我的整个 EJB 文件:

package enkia.pulse.indexing.beans;

import enkia.pulse.core.Category;
import enkia.pulse.core.Product;
import enkia.pulse.core.Review;
import enkia.pulse.core.Twitter;
import enkia.utils.HarvestingConstants;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Resource;
import javax.ejb.SessionContext;
import javax.ejb.Stateless;
import javax.ejb.TimedObject;
import javax.ejb.Timer;
import javax.ejb.TimerService;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.UserTransaction;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;

/**
 *
 * @author joshua
 */
@Stateless
public class TwitterBean implements TwitterBeanLocal,
    TimedObject {

    List<String> _twitterTopics;
    Map<String,Integer> _tagCatRel;
    TimerService _timerService;
    Timer _timer;


    /** The session context needed to create the timer */
    @Resource
    private SessionContext _sc;

    @PersistenceContext(unitName=HarvestingConstants.PERSISTENCE_UNIT)
    EntityManager _entityManager;

    /** A logging object for formatted output to the server log. */
    private Logger _logger;
    private int errors;


    /**
     * Constructs the logger
     */
    public TwitterBean(){
    _logger = Logger.getLogger(this.getClass().getName());
        _logger.log(Level.INFO,"Instantiating Twitter Bean");
    }

    /**
     * Attempts to retrieve the configuration object. Creates the harvester
     * with the configuration and then sets a timer to run the harvester
     * periodically
     */
    public void initialize() {
        _logger.log(Level.INFO,"Initializing Twitter bean.");
        _twitterTopics = new LinkedList<String>();
        _tagCatRel = new HashMap<String,Integer>();
        _timerService = _sc.getTimerService();

    _timer = _timerService.createTimer(0,1000*60*60,null); //restart every hour

        _logger.log(Level.INFO,"Starting Twitter timer");
    }

    public void ejbTimeout(Timer timer) {
        _logger.log(Level.INFO,"Running Twitter timer");
        findTopics();
        try {
            setupStream();
        } catch (TwitterException ex) {
            Logger.getLogger(TwitterBean.class.getName()).log(Level.SEVERE, null, ex);
        }

    }

    private void setupStream() throws TwitterException{
        StatusListener listener = new StatusListener(){

            @Override
            public void onStatus(Status status) {
                insertStatus(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                //DO nothing
            }

            @Override
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
               _logger.log(Level.INFO,"Track limitation notice: "+numberOfLimitedStatuses);
            }

            @Override
            public void onScrubGeo(long l, long l1) {
                _logger.log(Level.INFO,"Scrub GEO");
            }

            @Override
            public void onException(Exception ex) {
                ex.printStackTrace();
            }


        };
        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
        twitterStream.setOAuthConsumer("secret", "secret");
        twitterStream.setOAuthAccessToken(new AccessToken("secret","secret"));

        FilterQuery query = new FilterQuery();
        query = query.track(_twitterTopics.toArray(new String[_twitterTopics.size()]));

        twitterStream.addListener(listener);
        twitterStream.filter(query);
    }

    public void insertStatus(Status status){

            String foundTag="";
            for(String tag : _tagCatRel.keySet()){
                if(status.getText().toLowerCase().contains(tag.toLowerCase())){
                    //found
                    foundTag=tag;
                    break;
                }
            }
            if(foundTag.equals("")){
                return;
            }
            Integer category = _tagCatRel.get(foundTag);
            Query q=_entityManager.createNamedQuery("Category.findByCategoryId");
            q.setParameter("categoryId",category);
            Category c = (Category) q.getSingleResult();
            Product p = new Product(c);

            _entityManager.persist(p);
            _entityManager.merge(p);

            Review r = new Review();

            r.setReview(status.getText());
            r.setUrl("http://www.twitter.com/"+status.getUser().getScreenName()+"/statuses/"+status.getId());
            r.setProcessed(0);
            r.setDateCreated(status.getCreatedAt().getTime());
            p.getPartNumber();
            r.setProductId(p.getProductId());
            _entityManager.persist(r);

            _logger.log(Level.INFO,"Added tweet:" + r.getReview());

    }


    private void findTopics() {
        _twitterTopics = new LinkedList<String>();
        Query twitterQuery=_entityManager.createNamedQuery("Twitter.findAll");
        String all="";
        for(Object t: twitterQuery.getResultList()){
            Twitter twitter=(Twitter) t;

            for(String tag : twitter.getTags().split(" ")){
                _twitterTopics.add(tag);
                all+=tag+", ";
                Integer test = twitter.getCategoryId();
                _tagCatRel.put(tag,twitter.getCategoryId());
            }
        }
        _logger.log(Level.INFO,"Tracking: "+all);
    }
}

还有我的persistence.xml:

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="1.0" xmlns="http://java.sun.com/xml/ns/persistence"   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
  <persistence-unit name="PulsePU" transaction-type="JTA">
    <jta-data-source>pulseEJB</jta-data-source>
    <class>enkia.pulse.core.Category</class>
    <class>enkia.pulse.core.Department</class>
    <class>enkia.pulse.core.Feature</class>
    <class>enkia.pulse.core.Product</class>
    <class>enkia.pulse.core.Review</class>
    <class>enkia.pulse.core.ReviewSnippet</class>
    <class>enkia.pulse.core.Sentiment</class>
    <class>enkia.pulse.core.SentimentReview</class>
    <class>enkia.pulse.core.Twitter</class>
    <exclude-unlisted-classes>true</exclude-unlisted-classes>
    <properties/>
  </persistence-unit>
</persistence>

最后是我的 sun-resource.xml:

 <?xml version="1.0" encoding="UTF-8"?>
  <!DOCTYPE resources PUBLIC "-//Sun Microsystems, Inc.//DTD Application Server 9.0 Resource Definitions //EN" "http://www.sun.com/software/appserver/dtds/sun-resources_1_3.dtd">
<resources>
<jdbc-resource enabled="true" jndi-name="pulseEJB" object-type="user" pool-name="mysqlPool"/>
<jdbc-connection-pool allow-non-component-callers="false" associate-with-thread="false" connection-creation-retry-attempts="0" connection-creation-retry-interval-in-seconds="10" connection-leak-reclaim="false" connection-leak-timeout-in-seconds="0" connection-validation-method="auto-commit" datasource-classname="com.mysql.jdbc.jdbc2.optional.MysqlDataSource" fail-all-connections="false" idle-timeout-in-seconds="300" is-connection-validation-required="false" is-isolation-level-guaranteed="true" lazy-connection-association="false" lazy-connection-enlistment="false" match-connections="false" max-connection-usage-count="0" max-pool-size="32" max-wait-time-in-millis="60000" name="mysqlPool" non-transactional-connections="false" pool-resize-quantity="2" res-type="javax.sql.DataSource" statement-timeout-in-seconds="-1" steady-pool-size="8" validate-atmost-once-period-in-seconds="0" wrap-jdbc-objects="false">
<property name="serverName" value="endpoint"/>
<property name="portNumber" value="3306"/>
<property name="databaseName" value="pulse"/>
<property name="User" value="user"/>
<property name="Password" value="password"/>
<property name="URL" value="jdbc:mysql://endpoint/pulse"/>
<property name="driverClass" value="com.mysql.jdbc.Driver"/>

我正在使用 Netbeans。

EJB在一个webproject. 我有另一个 EJB 设置,与我在那里实例化的相同,它适用于容器管理的事务。我也试过只是说“搞砸”并使用了 UserTransaction 但合并出现问题并最终出现许多意想不到的问题,NPE 在 p 之后 "_entityManager.persist(p); _entityManager.merge(p);"

任何关于在哪里寻找两个 EJB 之间差异的建议都值得赞赏,因为我没有想法。

我还注意到netbeans在有问题的 EJB 中为我的两个实体类生成源,"ap-source-output"但在工作的 EJB 中没有。

生成的代码我不明白为什么会在下面生成:package enkia.pulse.core;

import javax.annotation.Generated;
import javax.persistence.metamodel.SingularAttribute;
import javax.persistence.metamodel.StaticMetamodel;

@Generated(value="EclipseLink-2.2.0.v20110202-r8913", date="2012-08-08T23:09:05")
@StaticMetamodel(Twitter.class)
public class Twitter_ { 

    public static volatile SingularAttribute<Twitter, Integer> id;
    public static volatile SingularAttribute<Twitter, String> tags;
    public static volatile SingularAttribute<Twitter, Integer> categoryId;
    public static volatile SingularAttribute<Twitter, Long> lastStatus;

}

package enkia.pulse.core;

import enkia.pulse.core.Category;
import javax.annotation.Generated;
import javax.persistence.metamodel.SingularAttribute;
import javax.persistence.metamodel.StaticMetamodel;

@Generated(value="EclipseLink-2.2.0.v20110202-r8913", date="2012-08-08T23:41:31")
@StaticMetamodel(Product.class)
public class Product_ { 

    public static volatile SingularAttribute<Product, String> productBrand;
    public static volatile SingularAttribute<Product, Category> category;
    public static volatile SingularAttribute<Product, String> model;
    public static volatile SingularAttribute<Product, byte[]> image;
    public static volatile SingularAttribute<Product, String> productName;
    public static volatile SingularAttribute<Product, String> imageURL;
    public static volatile SingularAttribute<Product, String> specifications;
    public static volatile SingularAttribute<Product, Integer> productId;
    public static volatile SingularAttribute<Product, String> partNumber;

}

当我这样做时,我也会显示实体文件: /* * 要更改此模板,请选择工具 | 模板 * 并在编辑器中打开模板。*/ 包 enkia.pulse.core;

import java.io.Serializable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;

/**
 *
 * @author fbarrow
 */
@Entity
@Table(name = "twitter")
@NamedQueries({
    @NamedQuery(name = "Twitter.findAll", query = "SELECT t FROM Twitter t"),
    @NamedQuery(name = "Twitter.findById", query = "SELECT t FROM Twitter t WHERE t.id = :id"),
    @NamedQuery(name = "Twitter.findByCategoryId", query = "SELECT t FROM Twitter t WHERE t.categoryId = :categoryId"),
    @NamedQuery(name = "Twitter.findByTags", query = "SELECT t FROM Twitter t WHERE t.tags = :tags"),
    @NamedQuery(name = "Twitter.findByLastStatus", query = "SELECT t FROM Twitter t WHERE t.lastStatus = :lastStatus")})
public class Twitter implements Serializable {
    private static final long serialVersionUID = 1L;
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id")
    private Integer id;
    @Column(name = "categoryId")
    private Integer categoryId;
    @Column(name = "tags")
    private String tags;
    @Column(name = "lastStatus")
    private Long lastStatus;

    public Twitter() {
    }

    public Twitter(Integer id) {
        this.id = id;
    }

    public Twitter(Integer id, Integer categoryId, String tags, Long lastStatus) {
        this.id = id;
        this.categoryId = categoryId;
        this.tags = tags;
        this.lastStatus = lastStatus;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Integer getCategoryId() {
        return categoryId; 
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId; 
    }

    public String getTags() {
        return tags;
    }

    public void setTags(String tags) {
        this.tags = tags;
    }

    public Long getLastStatus() {
        return lastStatus;
    }

    public void setLastStatus(Long lastStatus) {
        this.lastStatus = lastStatus;
    }

    @Override
    public int hashCode() {
        Integer hash = 0;
        hash += (id != null ? id.hashCode() : 0);
        return hash;
    }

    @Override
    public boolean equals(Object object) {
        // TODO: Warning - this method won't work in the case the id fields are not set
        if (!(object instanceof Twitter)) {
            return false;
        }
        Twitter other = (Twitter) object;
        if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "enkia.pulse.core.Twitter[ id=" + id + " ]";
    }

}

错误:

SEVERE: javax.persistence.TransactionRequiredException
    at com.sun.enterprise.container.common.impl.EntityManagerWrapper.doTxRequiredCheck(EntityManagerWrapper.java:163)
    at com.sun.enterprise.container.common.impl.EntityManagerWrapper.doTransactionScopedTxCheck(EntityManagerWrapper.java:145)
    at com.sun.enterprise.container.common.impl.EntityManagerWrapper.persist(EntityManagerWrapper.java:263)
    at enkia.pulse.indexing.beans.TwitterBean.insertStatus(TwitterBean.java:154)
    at enkia.pulse.indexing.beans.TwitterBean$1.onStatus(TwitterBean.java:99)
    at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:78)
    at twitter4j.AbstractStreamImplementation$1.run(AbstractStreamImplementation.java:107)
    at twitter4j.internal.async.ExecuteThread.run(DispatcherImpl.java:114)   
4

1 回答 1

2

使用Timer创建TwitterStream,运行独立线程?

计时器调用setupStream,它创建一个侦听器,并将该侦听器绑定到twitterStream通过TwitterStreamFactory. 该代码未显示,但从上下文看来 TwitterStream 正在异步运行代码:

twitter4j.internal.async.ExecuteThread

位于您的堆栈跟踪中,位于异常下方。我敢打赌,您正在管理自己的线程,这些线程不在容器的上下文中运行 - 在此模型中访问容器资源和与容器交互的所有赌注都已关闭(这就是为什么 Java EE 强烈建议您不要运行您自己的线程模型)。

具体来说,该代码不在容器管理的事务中运行。

您可能会探索让 Timer Service 通过 MDB 启动后台任务,该 MDB 将在适当的容器中从您的 EJB 异步运行。

于 2012-08-09T06:19:04.997 回答