我想对我们在 Accumulo 中的一些数据执行 kmeans 聚类。我的第一个想法是在 Apache Mahout 中使用 kmeans 集群,但我很难在不使用临时文件的情况下连接两者。据我所知,为了使用 Mahout,我需要将 Accumulo 数据写入存储在 HDFS 中的一系列矢量文件,然后使用 Mahout 对它们进行聚类,然后将结果写回 Accumulo(Mahout 入口点似乎都采用了指向目录的路径)。虽然我还没有尝试过,但这似乎是一场性能噩梦。有没有更好的办法?或者,是否有其他可用的 kmeans 集群库可以更轻松地连接到 Accumulo?我现在正在研究 opencv,但欢迎提出其他建议。
问问题
453 次
1 回答
0
正如@FuriousGeorge 建议的那样,我研究了Apache Spark。这确实提供了一种在不使用临时文件的情况下执行 kmeans 集群的方法,如下所示:
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import scala.Tuple2;
public class ClusterAccumuloData
{
public static void main(String[] args)
{
JavaSparkContext sc = new JavaSparkContext("yarn-cluster",
"JobName",
"/spark/installation/directory",
"/path/to/jar/file/containing/this/class");
Configuration conf = new Configuration(); // As near as I can tell, this is all we need.
Authorizations auths = new Authorizations("whatever_you_need");
AccumuloInputFormat.setInputInfo(conf,
"accumulo_user",
"users_password".getBytes(),
"accumulo_table_name",
auths);
AccumuloInputFormat.setZooKeeperInstance(conf,
"accumulo_instance_name",
"zookeeper_server_1,zookeeper_server_2");
// Calls to other AccumuloInputFormat functions (such as setRanges or addIterator)
// that configure it to retrieve the data you wish to cluster.
JavaPairRDD<Key, Value> accumuloRDD = sc.newAPIHadoopRDD(conf,
AccumuloInputFormat.class,
Key.class,
Value.class);
JavaRDD<Vector> kmeansDataRDD =
accumuloRDD.map(new Function<Tuple2<Key, Value>, Vector>()
{
public Vector call(Tuple2<Key, Value> accumuloData)
{
// Code which transforms accumuloData into either a
// DenseVector or a SparseVector, then returns that Vector.
}
});
KMeansModel kmm = KMeans.train(JavaRDD.toRDD(kmeansDataRDD), 42, 14, 37);
}
}
于 2014-09-25T22:05:23.147 回答