使用 spark-notebook 更新累积表。使用accumulo 文档和accumulo 示例代码中指定的方法。以下是我在笔记本中逐字记录的内容,以及回复:
val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);
clientRqrdTble: org.apache.accumulo.core.cli.ClientOnRequiredTable = org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig: org.apache.accumulo.core.client.BatchWriterConfig = [maxMemory=52428800, maxLatency=120000, maxWriteThreads=3,超时=9223372036854775807] batchWriter: org.apache.accumulo.core.client.BatchWriter = org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736
val rowIdS = rddX2_first._1.split(" ")(0)
rowIdS:字符串 = row_0736460000
val mutation = new Mutation(new Text(rowIdS))
突变:org.apache.accumulo.core.data.Mutation = org.apache.accumulo.core.data.Mutation@0
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes) )
java.lang.IllegalStateException:在 org.apache.accumulo.core.data.Mutation.put( Mutation.java:163) 在 org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)
我深入研究了代码,发现罪魁祸首是一个 if-catch,它正在检查 UnsynchronizedBuffer.Writer 缓冲区是否为空。行号不会对齐,因为这是与 1.6 accumulo-core jar 中的版本略有不同的版本 - 我已经查看了两者,但在这种情况下,不同之处并不重要。据我所知,该对象是在执行该方法之前创建的,并且没有被转储。
所以要么我在代码中遗漏了一些东西,要么有其他东西出现了。你们中有人知道可能导致这种行为的原因吗?
更新一
我已经使用 scala 控制台并通过直接的 java 1.8 执行了以下代码。它在 scala 中失败,但在 Java 中失败。在这一点上,我认为这是一个 Accumulo 问题。因此,我将打开一个 bug 票并深入挖掘源代码。如果我想出一个解决方案,我会在这里发布。
下面是Java形式的代码。那里有一些额外的东西,因为我想确保我可以连接到我使用 accumulo 批处理编写器示例创建的表:
import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
public class App {
public static void main( String[] args ) throws
AccumuloException,
AccumuloSecurityException,
TableNotFoundException {
// connect to accumulo using a scanner
// print first ten rows of a given table
String instanceNameS = "accumulo";
String zooServersS = "localhost:2181";
Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
Connector connector =
instance.getConnector( "root", new PasswordToken("password"));
Authorizations auths = new Authorizations("exampleVis");
Scanner scanner = connector.createScanner("batchtestY", auths);
scanner.setRange(new Range("row_0000000001", "row_0000000010"));
for(Entry<Key, Value> entry : scanner) {
System.out.println(entry.getKey() + " is " + entry.getValue());
}
// stage up connection info objects for serialization
ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
BatchWriterConfig bwConfig = new BatchWriterConfig();
BatchWriter batchWriter =
connector.createBatchWriter("batchtestY", bwConfig);
// create mutation object
Mutation mutation = new Mutation(new Text("row_0000000001"));
// populate mutation object
// -->THIS IS WHAT'S FAILING IN SCALA<--
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes()) );
}
}
更新二
已为此问题创建了Accumulo 错误票证。他们的目标是在 v1.7.0 中修复这个问题。在那之前,我在下面提供的解决方案是一种功能解决方法。