0

我想对我们在 Accumulo 中的一些数据执行 kmeans 聚类。我的第一个想法是在 Apache Mahout 中使用 kmeans 集群,但我很难在不使用临时文件的情况下连接两者。据我所知,为了使用 Mahout,我需要将 Accumulo 数据写入存储在 HDFS 中的一系列矢量文件,然后使用 Mahout 对它们进行聚类,然后将结果写回 Accumulo(Mahout 入口点似乎都采用了指向目录的路径)。虽然我还没有尝试过,但这似乎是一场性能噩梦。有没有更好的办法?或者,是否有其他可用的 kmeans 集群库可以更轻松地连接到 Accumulo?我现在正在研究 opencv,但欢迎提出其他建议。

4

1 回答 1

0

正如@FuriousGeorge 建议的那样,我研究了Apache Spark。这确实提供了一种在不使用临时文件的情况下执行 kmeans 集群的方法,如下所示:

import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import scala.Tuple2;

public class ClusterAccumuloData
{
  public static void main(String[] args)
  {
    JavaSparkContext sc = new JavaSparkContext("yarn-cluster",
                                               "JobName",
                                               "/spark/installation/directory",
                                               "/path/to/jar/file/containing/this/class");
    Configuration conf = new Configuration(); // As near as I can tell, this is all we need.
    Authorizations auths = new Authorizations("whatever_you_need");
    AccumuloInputFormat.setInputInfo(conf,
                                     "accumulo_user",
                                     "users_password".getBytes(),
                                     "accumulo_table_name",
                                     auths);
    AccumuloInputFormat.setZooKeeperInstance(conf, 
                                             "accumulo_instance_name",
                                             "zookeeper_server_1,zookeeper_server_2");
    // Calls to other AccumuloInputFormat functions (such as setRanges or addIterator)
    // that configure it to retrieve the data you wish to cluster.
    JavaPairRDD<Key, Value> accumuloRDD = sc.newAPIHadoopRDD(conf,
                                                             AccumuloInputFormat.class,
                                                             Key.class,
                                                             Value.class);
    JavaRDD<Vector> kmeansDataRDD =
      accumuloRDD.map(new Function<Tuple2<Key, Value>, Vector>()
                      {
                        public Vector call(Tuple2<Key, Value> accumuloData)
                        {
                          // Code which transforms accumuloData into either a
                          // DenseVector or a SparseVector, then returns that Vector.
                        }
                      });
    KMeansModel kmm = KMeans.train(JavaRDD.toRDD(kmeansDataRDD), 42, 14, 37);
  }
}
于 2014-09-25T22:05:23.147 回答