-1

** 首先查看更新,因为原始实现包含一些错误假设

背景故事

我有一个必须分叉进程的问题,原因是我使用的是 jni 和单线程 R 进程。我还需要一种监控内存和 CPU 的方法,分叉似乎是唯一真正的解决方案。你不能实现每个进程有多个 R 调用,我已经明确地试图绕过这个限制,但我很确定这是不可能的,因为 rinside 设置方法。

当前实施

我目前正在尝试分叉一个进程并将一个 rmi 连接连接到它并将它们存储在一个堆叠池中。问题是 registry.bind() 方法没有按应有的方式阻塞。在主进程中绑定到注册表时,进程将阻塞并等待远程方法调用,但从 RunTime.getRuntime().exec() 启动时,进程不会阻塞并退出。这会导致我的端点守护程序关闭,并且在尝试与守护程序通信时收到套接字错误。我正在使用 gfork 库来分​​叉我的进程,只是为了能够在分叉进程启动时接收异常等。

public class JRIDaemon  implements IROperationRemoteProvider, Serializable, Runnable {

    /**
     * Serialization Id
     */
    private static final long serialVersionUID = 2279972098306474322L;

    /**
     * Daemon logger
     */
    private static final Logger logger = Logger.getLogger(JRIDaemon.class.getName());

    /**
     * This is the exeuctor service used to execute our job, the option for
     * newSingleThreadExecutor is important because R is single threaded and JRI
     * puts check in and will kill us if the thread is manipulated.
     */
    private static ExecutorService executorService = Executors.newSingleThreadExecutor();

    /**
     * This implemenation uses the exeuctor service to run the analytics
     * operation. The executor service is used because R is single threaded and
     * cannot be called from outside.
     */
    private JRIExecutionTask callableOperation;

    /**
     * This is the unique id that can to fetch this daemon.
     */
    private final String daemonId;


    private JRIDaemon() {
        this(UUID.randomUUID().toString());
    }

    private JRIDaemon(String daemonId) {
        this.daemonId = daemonId;
    }


    private String getDaemonId() {
        return daemonId;
    }

    @Override
    public void run() {
        logger.info("Starting the jri daemon");

        System.out.println("Starting the jri daemon");
        try {
            IROperationRemoteProvider stub = (IROperationRemoteProvider) UnicastRemoteObject.exportObject(this, 0);

            Registry registry = LocateRegistry.getRegistry();
            registry.rebind(daemonId, stub);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("Exception occurred when initializing the rmi agent ", e);
        }
        System.out.println("Daemon is done");
        logger.fine("Exiting JRIDaemon#run");
    }

    /**
     * Close the connection to R services.
     * @throws NotBoundException 
     * @throws RemoteException 
     * @throws AccessException 
     */
    public void close() throws Exception {
        logger.info("Calling close !!!!!!!!!");
        //if (registry != null) {
        //    registry.unbind(daemonId);
        //}
        //System.exit(0);
    }

    /**
     * @see IROperationProvider#execute(IAnalyticsOperation, List, List)
     */
    @Override
    public Map<String, IMetric> execute(IAnalyticsOperation operation, List<IAnalyticsOperationInput> inputs, List<? extends IDataProvider> dataProvider) throws Exception {
        callableOperation = new JRIExecutionTask(inputs, operation, dataProvider);
        Future<Map<String, IMetric>> execution = executorService.submit((Callable<Map<String, IMetric>>) callableOperation);
        return execution.get();
    }

    /**
     * @see IROperationProvider#interrupt()
     * 
     *      TODO come to a solution on stopping and restarting the thread in the
     *      Rengine implementation.
     */
    @Override
    public void interrupt() {
        System.out.println("Calling interrupt on executor service");
        executorService.shutdown();
        // Can't do this yet because it causes a segfault in the task engine
        // process.
        // callableOperation.interrupt();
    }

    @Override
    public Boolean isAllGood() {
        return true;
    }

    @Override
    public void activate() {
    }

    @Override
    public void passivate() {

    }

    /**
     * This is here only for testing purposes.
     * @param args
     * @throws Exception
     */
    public static void main(String args[] ) throws Exception {
        IROperationRemoteProvider provider = create();
        Thread.sleep(10000);
        System.out.println(" ALL GOOD " + provider.isAllGood());

    }


    /**
     * This creates a daemon and initializes returns the client that can be used
     * to talk to the server. The daemon is useless for the calling process as
     * it is a separate process and we use the client to communicate with the
     * jri daemon process.
     * 
     * @return
     */
    public static IROperationRemoteProvider create() throws Exception {
        LocateRegistry.createRegistry(1099);
        String daemonId = UUID.randomUUID().toString();

        JRIDaemon daemon = new JRIDaemon(daemonId);
        Fork<JRIDaemon, org.gfork.types.Void> forkedDaemon = new Fork<JRIDaemon, org.gfork.types.Void>(daemon);

        //forkedDaemon.setJvmOptions("-Djava.security.manager -Djava.security.policy=\"taskenginesecurity.policy\"");

        logger.info("Calling run task");
        forkedDaemon.addListener(new Listener<JRIDaemon, org.gfork.types.Void>() {

            @Override
            public void onFinish(Fork<JRIDaemon, Void> fork, boolean wasKilled) throws IllegalAccessException, InterruptedException {

                logger.info("Task is finished exit value -> " + fork.getExitValue() + " killed ->" + wasKilled);

            }

            @Override
            public void onError(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException {
                logger.info("Error was " + fork.getStdErr());
            }

            @Override
            public void onException(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException, IOException, ClassNotFoundException {
                logger.log(Level.SEVERE, " Erorro occurred in daemon ", fork.getException());
            } 
        });

        Fork.setLoggingEnabled(true);

        forkedDaemon.execute();

        forkedDaemon.waitFor();

        logger.info("Standard out was " + forkedDaemon.getStdOut());

        if (forkedDaemon.isException()) {
            throw new RuntimeException("Unble to create Remote Provider ", forkedDaemon.getException());
        }

       //Thread.sleep(2000);

        Registry registry = LocateRegistry.getRegistry();

        IROperationRemoteProvider process = (IROperationRemoteProvider) registry.lookup(daemonId);

        return process;
    }
}

我使用 create 方法来创建我的分析提供程序的新实现,Fork 类调用在执行时运行以生成新的守护程序。如果我将这个完全相同的代码放在 public static void main(String[] args) 中,该进程会进行守护进程并等待 rmi 调用,但是当通过 for 操作执行时它不会。

这是 Gfrork 执行方法,您可以看到它使用 Runtime.exec

/**
     * Starts a new java process which runs the task. 
     * The subprocess inherits the environment including class path an
     * system properties of the current process. The JVM is launched using
     * executable derived from standard system property 'java.home'.
     * <p>
     * Standard output (System.out) of the task can be red by {@link #getStdOut()} or
     * forwarded to a file, see {@link #setStdOutWriter(Writer)}.
     * The same is possible for Standard error (System.err), 
     * see {@link #getStdErr()} and {@link #setStdErrWriter(Writer)}.
     * 
     * @throws Exception
     */
    public synchronized void execute() throws Exception {
        if (isExecuting()) {
            throw new IllegalStateException(FORK_IS_ALREADY_EXECUTING);
        }
        exec = Runtime.getRuntime().exec(createCmdArray(), null, workingDir);

        taskStdOutReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
        taskErrorReader = new BufferedReader(new InputStreamReader(exec.getErrorStream()));
        readError();
        readStdOut();

        waitForFinishedThread = new Thread("jforkWaitForFinishedThread") {
            // needed to notify listeners after execution
            @Override
            public void run() {
                try {
                    waitFor();
                } catch (final Exception e) {
                    e.printStackTrace();
                    stdErrText.append(String.format("ERROR jforkListenerNotifier: %s%n", e.toString()));
                }
            }
        };
        waitForFinishedThread.start();
    }

我已经添加了睡眠计时器来监视进程,它确实开始了,不久之后它就退出了,没有错误,状态为 0。我已经验证,如果在 run 方法中配置 rmi 时出现问题,它将返回异常。RMI 似乎正在正确初始化,但不会阻塞,因此分叉的进程不会退出。我在 Runtime.exec 上有 RTFM,但不知道是什么导致它退出。任何帮助,将不胜感激。

更新

谢谢EJP,尽管您的言论居高临下,但它们是正确的。我做了一个错误的假设,即绑定被阻塞是因为进程没有死掉,但这是因为它创建了一个单独的线程来处理 rmi 通信。这就是使过程保持活力的原因。

import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;


public class RunnableRMIDaemon implements Remote {


        public static void main(String args[]) throws InterruptedException {
            String daemonID = "123";
            System.out.println("STARTING");
            Registry registry;
            try {
                RunnableRMIDaemon daemon = new RunnableRMIDaemon();
                registry = LocateRegistry.getRegistry();
                final Remote stub = (Remote) UnicastRemoteObject.exportObject(daemon, 0);
                registry.rebind(daemonID, stub);


                Thread.sleep(1000);

            } catch (RemoteException e) {
                throw new RuntimeException("Remote Exception occurred while running " + e);
            } 
            System.out.println("ENDING");
        }
    }



import java.io.IOException;

public class ForkRMIDaemon {

    public static void main(String args[]) throws IOException, InterruptedException {
        System.out.println("Starting fork");
        Runtime.getRuntime().exec("java -cp . RunnableRMIDaemon");
        Thread.sleep(10000);
        System.out.println("Completed fork");
    }
}

当第一个进程死亡时, Runtime.getRuntime().exec() 进程仍然存在。

thanatos:testingrmifork chris$ java ForkRMIDaemon
Starting fork
Completed fork
tv-mini:testingrmifork chris$ ps -ef | grep java
  501 25499     1   0   0:00.10 ttys007    0:00.72 /usr/bin/java -cp . RunnableRMIDaemon
  501 25501 25413   0   0:00.00 ttys007    0:00.00 grep java
thanatos:testingrmifork chris$ 

我的调查尚未完成,但似乎简单的 gfork 库实际上正在做一些事情来关闭返回过程。我查看了 gfork 代码,但没有看到这可能发生在哪里。

感谢 EJP,我对不正确的信息表示感谢。我猜 gfork 正在做一些诡计,因为它允许你调用一个不是 main 的方法。

我假设 java 处理线程更像 c pthreads 并且我总是不得不在 main() 中创建一个 while 循环,否则我的线程将在 main 退出时被杀死。我的错

4

2 回答 2

1

问题是 registry.bind() 方法没有按应有的方式阻塞。在主进程中绑定到注册表时,该进程将阻塞并等待远程方法调用。

不,不会的。这是幻想。你编的。文档中没有任何内容可以说明任何事情。它不是阻塞调用(除了它与注册表通信的时刻);它不会“阻塞并等待远程方法调用”。它返回到您的代码。如果你编造了行为而系统没有表现出来,你一定不要感到惊讶。

这会导致我的端点守护程序关闭

不,它没有。您的端点守护程序以某种方式导致自身关闭。RMI 启动非守护线程来处理传入的连接,因此已导出远程对象的 JVM 不会退出,直到这些远程对象被显式或通过 GC 或应用程序调用 System.exit() 取消导出。防止远程对象 GC 的方法是存储对它们的静态引用。

我必须说我不明白你为什么要执行一个子进程,如果你在主进程中要做的就是等待它。

于 2013-02-05T22:04:45.453 回答
0

想出了一个半肮脏的方法来做到这一点,这将无限期地阻塞,但我必须找到一个可靠的方法来关闭分叉的守护进程,在正常环境中,当我从单元测试中运行它时,进程应该得到一个 sigkill . 我想我快到了。

@Override
public void run() {
    logger.info("Starting the jri daemon");

    Registry registry;
    try {
        registry = LocateRegistry.getRegistry();
        final IROperationRemoteProvider stub = (IROperationRemoteProvider) UnicastRemoteObject.exportObject(this, 0);
        registry.rebind(daemonId, stub);
    } catch (RemoteException e) {
        throw new RuntimeException("Remote Exception occurred while running " + e);
    }

    final Object waitObj = new Object();

    synchronized (waitObj) {
        while (!closed)
            try {
                waitObj.wait();
            } catch (InterruptedException e) {
                closed = true;
            }
    }
    logger.fine("Exiting JRIDaemon#run");
}
于 2013-02-05T21:40:15.100 回答