我正在使用 SingleColumnValueFilter 来返回我想要删除的行列表:
SingleColumnValueFilter fileTimestampFilter = new SingleColumnValueFilter(
Bytes.toBytes('a'),
Bytes.toBytes('date'),
CompareFilter.CompareOp.GREATER,
Bytes.toBytes("20140101000000")
);
然后我创建一个 Delete 对象并删除每一列。
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.deleteColumn(Bytes.toBytes('a'), Bytes.toBytes('date'));
htable.delete(delete);
检索代码是
private List<String> getRecordsToDelete(long maxResultSize)
{
ResultScanner rs = null;
HTableInterface table = null;
List<String> keyList = new ArrayList<String>();
try
{
log.debug("Retrieving records");
HbaseConnection hbaseConnectionConfig = myConfig.getHbaseConnection();
Configuration configuration = getHbaseConfiguration(hbaseConnectionConfig);
table = new HTable(configuration, 'mytable');
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter filter = HbaseDao.getFilter();
list.addFilter(filter);
list.addFilter(new PageFilter(maxResultSize));
Scan scan = new Scan();
scan.setFilter(list);
//scan.setMaxResultSize(maxResultSize);
//scan.setCaching(1);
//scan.setCacheBlocks(false);
//log.debug("Scan raw? = " + scan.isRaw());
//scan.setRaw(false);
rs = table.getScanner(scan);
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext())
{
Result result = iterator.next();
String key = Bytes.toString(result.getRow());
log.debug("**************** f key = " + key); //the same keys are always added here
keyList.add(key);
}
log.debug("Done processing retrieval of records to delete Size = " + keyList.size());
}
catch (Exception ex)
{
log.error("Unable to process retrieval of records.", ex);
}
finally
{
try
{
if (table != null)
{
table.close();
}
if (rs != null)
{
rs.close();
}
}
catch (IOException ioEx)
{
//do nothing
log.error(ioEx);
}
}
return keyList;
}
此任务已安排,当它再次运行时,它正在检索相同的行。我知道 hbase 将行标记为删除,然后它们仅在主要压缩后才被物理删除。如果我在任务运行之间通过 hbase shell 查询该行,则该列肯定已被删除。为什么我的 Scan 在此任务的后续运行中返回相同的行?
提前致谢!