0

所以我的一般问题是“是否有可能让 Accumulo BatchScanner 只拉回我给它的每个范围的第一个结果?”

现在有一些关于我的用例的细节,因为无论如何可能有更好的方法来解决这个问题。我有代表来自不同系统的消息的数据。可以有不同类型的消息。我的用户希望能够向系统提出问题,例如“给我在特定时间针对所有这些系统的特定类型的最新消息”。

我的表格布局看起来像这样

  rowid: system_name, family: message_type, qualifier: masked_timestamp, value: message_text

这个想法是用户给我一个他们关心的系统列表、消息类型和某个时间戳。我使用了掩码时间戳,以便表格首先排序最近的。这样,当我扫描时间戳时,第一个结果是该时间之前的最新结果。我正在使用 BatchScanner,因为我有多个系统要搜索每个查询。我可以让 BatchScanner 只获取每个 Range 的第一个结果吗?我无法指定特定的键,因为最近的可能与用户给出的日期时间不匹配。

目前,我正在使用 BatchScanner 并忽略每个键的第一个结果以外的所有结果。它现在可以工作,但是当我只关心每个系统/类型的第一个结果时,通过网络拉回特定系统/类型的所有数据似乎是一种浪费。

编辑

我尝试使用 FirstEntryInRowIterator

@Test
public void testFirstEntryIterator() throws Exception
{
    Connector connector = new MockInstance("inst").getConnector("user", new PasswordToken("password"));
    connector.tableOperations().create("testing");

    BatchWriter writer = writer(connector, "testing");
    writer.addMutation(mutation("row", "fam", "qual1", "val1"));
    writer.addMutation(mutation("row", "fam", "qual2", "val2"));
    writer.addMutation(mutation("row", "fam", "qual3", "val3"));
    writer.close();

    Scanner scanner = connector.createScanner("testing", new Authorizations());
    scanner.addScanIterator(new IteratorSetting(50, FirstEntryInRowIterator.class));

    Key begin = new Key("row", "fam", "qual2");
    scanner.setRange(new Range(begin, begin.followingKey(PartialKey.ROW_COLFAM_COLQUAL)));

    int numResults = 0;
    for (Map.Entry<Key, Value> entry : scanner)
    {
        Assert.assertEquals("qual2", entry.getKey().getColumnQualifier().toString());
        numResults++;
    }

    Assert.assertEquals(1, numResults);
}

我的目标是返回的条目将是 ("row", "fam", "qual2", "val2") 但我得到 0 个结果。似乎 Iterator 在 Range 之前被应用了?我还没有深入研究这个。

4

1 回答 1

3

这听起来像是使用 Accumulo 的 SortedKeyValueIterator 之一的一个很好的用例,特别是 FirstEntryInRowIterator(包含在 accumulo-core 工件中)。

使用 FirstEntryInRowIterator 创建一个 IteratorSetting 并将其添加到您的 BatchScanner。这将返回该 system_name 中的第一个键/值,然后停止避免忽略所有其他结果的客户端的开销。

快速修改 FirstEntryInRowIterator 可能会得到你想要的:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.core.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;

public class FirstEntryInRangeIterator extends SkippingIterator implements OptionDescriber {

  // options
  static final String NUM_SCANS_STRING_NAME = "scansBeforeSeek";

  // iterator predecessor seek options to pass through
  private Range latestRange;
  private Collection<ByteSequence> latestColumnFamilies;
  private boolean latestInclusive;

  // private fields
  private Text lastRowFound;
  private int numscans;

  /**
   * convenience method to set the option to optimize the frequency of scans vs. seeks
   */
  public static void setNumScansBeforeSeek(IteratorSetting cfg, int num) {
    cfg.addOption(NUM_SCANS_STRING_NAME, Integer.toString(num));
  }

  // this must be public for OptionsDescriber
  public FirstEntryInRangeIterator() {
    super();
  }

  public FirstEntryInRangeIterator(FirstEntryInRangeIterator other, IteratorEnvironment env) {
    super();
    setSource(other.getSource().deepCopy(env));
  }

  @Override
  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
    return new FirstEntryInRangeIterator(this, env);
  }

  @Override
  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
    super.init(source, options, env);
    String o = options.get(NUM_SCANS_STRING_NAME);
    numscans = o == null ? 10 : Integer.parseInt(o);
  }

  // this is only ever called immediately after getting "next" entry
  @Override
  protected void consume() throws IOException {
    if (finished == true || lastRowFound == null)
      return;
    int count = 0;
    while (getSource().hasTop() && lastRowFound.equals(getSource().getTopKey().getRow())) {

      // try to efficiently jump to the next matching key
      if (count < numscans) {
        ++count;
        getSource().next(); // scan
      } else {
        // too many scans, just seek
        count = 0;

        // determine where to seek to, but don't go beyond the user-specified range
        Key nextKey = getSource().getTopKey().followingKey(PartialKey.ROW);
        if (!latestRange.afterEndKey(nextKey))
          getSource().seek(new Range(nextKey, true, latestRange.getEndKey(), latestRange.isEndKeyInclusive()), latestColumnFamilies, latestInclusive);
        else {
          finished = true;
          break;
        }
      }
    }
    lastRowFound = getSource().hasTop() ? getSource().getTopKey().getRow(lastRowFound) : null;
  }

  private boolean finished = true;

  @Override
  public boolean hasTop() {
    return !finished && getSource().hasTop();
  }

  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
    // save parameters for future internal seeks
    latestRange = range;
    latestColumnFamilies = columnFamilies;
    latestInclusive = inclusive;
    lastRowFound = null;

    super.seek(range, columnFamilies, inclusive);
    finished = false;

    if (getSource().hasTop()) {
      lastRowFound = getSource().getTopKey().getRow();
      if (range.beforeStartKey(getSource().getTopKey()))
        consume();
    }
  }

  @Override
  public IteratorOptions describeOptions() {
    String name = "firstEntry";
    String desc = "Only allows iteration over the first entry per range";
    HashMap<String,String> namedOptions = new HashMap<String,String>();
    namedOptions.put(NUM_SCANS_STRING_NAME, "Number of scans to try before seeking [10]");
    return new IteratorOptions(name, desc, namedOptions, null);
  }

  @Override
  public boolean validateOptions(Map<String,String> options) {
    try {
      String o = options.get(NUM_SCANS_STRING_NAME);
      if (o != null)
        Integer.parseInt(o);
    } catch (Exception e) {
      throw new IllegalArgumentException("bad integer " + NUM_SCANS_STRING_NAME + ":" + options.get(NUM_SCANS_STRING_NAME), e);
    }
    return true;
  }

}
于 2013-10-10T21:15:17.623 回答