** 首先查看更新,因为原始实现包含一些错误假设
背景故事
我有一个必须分叉进程的问题,原因是我使用的是 jni 和单线程 R 进程。我还需要一种监控内存和 CPU 的方法,分叉似乎是唯一真正的解决方案。你不能实现每个进程有多个 R 调用,我已经明确地试图绕过这个限制,但我很确定这是不可能的,因为 rinside 设置方法。
当前实施
我目前正在尝试分叉一个进程并将一个 rmi 连接连接到它并将它们存储在一个堆叠池中。问题是 registry.bind() 方法没有按应有的方式阻塞。在主进程中绑定到注册表时,进程将阻塞并等待远程方法调用,但从 RunTime.getRuntime().exec() 启动时,进程不会阻塞并退出。这会导致我的端点守护程序关闭,并且在尝试与守护程序通信时收到套接字错误。我正在使用 gfork 库来分叉我的进程,只是为了能够在分叉进程启动时接收异常等。
public class JRIDaemon implements IROperationRemoteProvider, Serializable, Runnable {
/**
* Serialization Id
*/
private static final long serialVersionUID = 2279972098306474322L;
/**
* Daemon logger
*/
private static final Logger logger = Logger.getLogger(JRIDaemon.class.getName());
/**
* This is the exeuctor service used to execute our job, the option for
* newSingleThreadExecutor is important because R is single threaded and JRI
* puts check in and will kill us if the thread is manipulated.
*/
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* This implemenation uses the exeuctor service to run the analytics
* operation. The executor service is used because R is single threaded and
* cannot be called from outside.
*/
private JRIExecutionTask callableOperation;
/**
* This is the unique id that can to fetch this daemon.
*/
private final String daemonId;
private JRIDaemon() {
this(UUID.randomUUID().toString());
}
private JRIDaemon(String daemonId) {
this.daemonId = daemonId;
}
private String getDaemonId() {
return daemonId;
}
@Override
public void run() {
logger.info("Starting the jri daemon");
System.out.println("Starting the jri daemon");
try {
IROperationRemoteProvider stub = (IROperationRemoteProvider) UnicastRemoteObject.exportObject(this, 0);
Registry registry = LocateRegistry.getRegistry();
registry.rebind(daemonId, stub);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Exception occurred when initializing the rmi agent ", e);
}
System.out.println("Daemon is done");
logger.fine("Exiting JRIDaemon#run");
}
/**
* Close the connection to R services.
* @throws NotBoundException
* @throws RemoteException
* @throws AccessException
*/
public void close() throws Exception {
logger.info("Calling close !!!!!!!!!");
//if (registry != null) {
// registry.unbind(daemonId);
//}
//System.exit(0);
}
/**
* @see IROperationProvider#execute(IAnalyticsOperation, List, List)
*/
@Override
public Map<String, IMetric> execute(IAnalyticsOperation operation, List<IAnalyticsOperationInput> inputs, List<? extends IDataProvider> dataProvider) throws Exception {
callableOperation = new JRIExecutionTask(inputs, operation, dataProvider);
Future<Map<String, IMetric>> execution = executorService.submit((Callable<Map<String, IMetric>>) callableOperation);
return execution.get();
}
/**
* @see IROperationProvider#interrupt()
*
* TODO come to a solution on stopping and restarting the thread in the
* Rengine implementation.
*/
@Override
public void interrupt() {
System.out.println("Calling interrupt on executor service");
executorService.shutdown();
// Can't do this yet because it causes a segfault in the task engine
// process.
// callableOperation.interrupt();
}
@Override
public Boolean isAllGood() {
return true;
}
@Override
public void activate() {
}
@Override
public void passivate() {
}
/**
* This is here only for testing purposes.
* @param args
* @throws Exception
*/
public static void main(String args[] ) throws Exception {
IROperationRemoteProvider provider = create();
Thread.sleep(10000);
System.out.println(" ALL GOOD " + provider.isAllGood());
}
/**
* This creates a daemon and initializes returns the client that can be used
* to talk to the server. The daemon is useless for the calling process as
* it is a separate process and we use the client to communicate with the
* jri daemon process.
*
* @return
*/
public static IROperationRemoteProvider create() throws Exception {
LocateRegistry.createRegistry(1099);
String daemonId = UUID.randomUUID().toString();
JRIDaemon daemon = new JRIDaemon(daemonId);
Fork<JRIDaemon, org.gfork.types.Void> forkedDaemon = new Fork<JRIDaemon, org.gfork.types.Void>(daemon);
//forkedDaemon.setJvmOptions("-Djava.security.manager -Djava.security.policy=\"taskenginesecurity.policy\"");
logger.info("Calling run task");
forkedDaemon.addListener(new Listener<JRIDaemon, org.gfork.types.Void>() {
@Override
public void onFinish(Fork<JRIDaemon, Void> fork, boolean wasKilled) throws IllegalAccessException, InterruptedException {
logger.info("Task is finished exit value -> " + fork.getExitValue() + " killed ->" + wasKilled);
}
@Override
public void onError(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException {
logger.info("Error was " + fork.getStdErr());
}
@Override
public void onException(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException, IOException, ClassNotFoundException {
logger.log(Level.SEVERE, " Erorro occurred in daemon ", fork.getException());
}
});
Fork.setLoggingEnabled(true);
forkedDaemon.execute();
forkedDaemon.waitFor();
logger.info("Standard out was " + forkedDaemon.getStdOut());
if (forkedDaemon.isException()) {
throw new RuntimeException("Unble to create Remote Provider ", forkedDaemon.getException());
}
//Thread.sleep(2000);
Registry registry = LocateRegistry.getRegistry();
IROperationRemoteProvider process = (IROperationRemoteProvider) registry.lookup(daemonId);
return process;
}
}
我使用 create 方法来创建我的分析提供程序的新实现,Fork 类调用在执行时运行以生成新的守护程序。如果我将这个完全相同的代码放在 public static void main(String[] args) 中,该进程会进行守护进程并等待 rmi 调用,但是当通过 for 操作执行时它不会。
这是 Gfrork 执行方法,您可以看到它使用 Runtime.exec
/**
* Starts a new java process which runs the task.
* The subprocess inherits the environment including class path an
* system properties of the current process. The JVM is launched using
* executable derived from standard system property 'java.home'.
* <p>
* Standard output (System.out) of the task can be red by {@link #getStdOut()} or
* forwarded to a file, see {@link #setStdOutWriter(Writer)}.
* The same is possible for Standard error (System.err),
* see {@link #getStdErr()} and {@link #setStdErrWriter(Writer)}.
*
* @throws Exception
*/
public synchronized void execute() throws Exception {
if (isExecuting()) {
throw new IllegalStateException(FORK_IS_ALREADY_EXECUTING);
}
exec = Runtime.getRuntime().exec(createCmdArray(), null, workingDir);
taskStdOutReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
taskErrorReader = new BufferedReader(new InputStreamReader(exec.getErrorStream()));
readError();
readStdOut();
waitForFinishedThread = new Thread("jforkWaitForFinishedThread") {
// needed to notify listeners after execution
@Override
public void run() {
try {
waitFor();
} catch (final Exception e) {
e.printStackTrace();
stdErrText.append(String.format("ERROR jforkListenerNotifier: %s%n", e.toString()));
}
}
};
waitForFinishedThread.start();
}
我已经添加了睡眠计时器来监视进程,它确实开始了,不久之后它就退出了,没有错误,状态为 0。我已经验证,如果在 run 方法中配置 rmi 时出现问题,它将返回异常。RMI 似乎正在正确初始化,但不会阻塞,因此分叉的进程不会退出。我在 Runtime.exec 上有 RTFM,但不知道是什么导致它退出。任何帮助,将不胜感激。
更新
谢谢EJP,尽管您的言论居高临下,但它们是正确的。我做了一个错误的假设,即绑定被阻塞是因为进程没有死掉,但这是因为它创建了一个单独的线程来处理 rmi 通信。这就是使过程保持活力的原因。
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
public class RunnableRMIDaemon implements Remote {
public static void main(String args[]) throws InterruptedException {
String daemonID = "123";
System.out.println("STARTING");
Registry registry;
try {
RunnableRMIDaemon daemon = new RunnableRMIDaemon();
registry = LocateRegistry.getRegistry();
final Remote stub = (Remote) UnicastRemoteObject.exportObject(daemon, 0);
registry.rebind(daemonID, stub);
Thread.sleep(1000);
} catch (RemoteException e) {
throw new RuntimeException("Remote Exception occurred while running " + e);
}
System.out.println("ENDING");
}
}
import java.io.IOException;
public class ForkRMIDaemon {
public static void main(String args[]) throws IOException, InterruptedException {
System.out.println("Starting fork");
Runtime.getRuntime().exec("java -cp . RunnableRMIDaemon");
Thread.sleep(10000);
System.out.println("Completed fork");
}
}
当第一个进程死亡时, Runtime.getRuntime().exec() 进程仍然存在。
thanatos:testingrmifork chris$ java ForkRMIDaemon
Starting fork
Completed fork
tv-mini:testingrmifork chris$ ps -ef | grep java
501 25499 1 0 0:00.10 ttys007 0:00.72 /usr/bin/java -cp . RunnableRMIDaemon
501 25501 25413 0 0:00.00 ttys007 0:00.00 grep java
thanatos:testingrmifork chris$
我的调查尚未完成,但似乎简单的 gfork 库实际上正在做一些事情来关闭返回过程。我查看了 gfork 代码,但没有看到这可能发生在哪里。
感谢 EJP,我对不正确的信息表示感谢。我猜 gfork 正在做一些诡计,因为它允许你调用一个不是 main 的方法。
我假设 java 处理线程更像 c pthreads 并且我总是不得不在 main() 中创建一个 while 循环,否则我的线程将在 main 退出时被杀死。我的错