4

当使用 Cassandra 推荐的 RandomPartitioner(或 Murmur3Partitioner)时,不可能对键进行有意义的范围查询,因为行使用键的 md5 散列分布在集群周围。这些哈希称为“令牌”。

尽管如此,通过为每个计算工作者分配一系列令牌来在许多计算工作者之间拆分一个大表将非常有用。使用 CQL3,似乎可以直接针对 tokens 发出查询,但是以下 python不起作用...编辑:在切换到针对 cassandra 数据库的最新版本(doh!)进行测试后工作,并且还更新每个语法以下注释:

## use python cql module
import cql

## If running against an old version of Cassandra, this raises: 
## TApplicationException: Invalid method name: 'set_cql_version'
conn = cql.connect('localhost', cql_version='3.0.2')

cursor = conn.cursor()

try:
    ## remove the previous attempt to make this work
    cursor.execute('DROP KEYSPACE test;')
except Exception, exc:
    print exc

## make a keyspace and a simple table
cursor.execute("CREATE KEYSPACE test WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = 1;")
cursor.execute("USE test;")
cursor.execute('CREATE TABLE data (k int PRIMARY KEY, v varchar);')

## put some data in the table -- must use single quotes around literals, not double quotes                                                                                                                                   
cursor.execute("INSERT INTO data (k, v) VALUES (0, 'a');")
cursor.execute("INSERT INTO data (k, v) VALUES (1, 'b');")
cursor.execute("INSERT INTO data (k, v) VALUES (2, 'c');")
cursor.execute("INSERT INTO data (k, v) VALUES (3, 'd');")

## split up the full range of tokens.
## Suppose there are 2**k workers:
k = 3 # --> eight workers
token_sub_range = 2**(127 - k)
worker_num = 2 # for example
start_token =    worker_num  * token_sub_range
end_token = (1 + worker_num) * token_sub_range

## put single quotes around the token strings
cql3_command = "SELECT k, v FROM data WHERE token(k) >= '%d' AND token(k) < '%d';" % (start_token, end_token)
print cql3_command

## this fails with "ProgrammingError: Bad Request: line 1:28 no viable alternative at input 'token'"
cursor.execute(cql3_command)

for row in cursor:
    print row

cursor.close()
conn.close()

理想情况下,我希望使用 pycassa 来完成这项工作,因为我更喜欢它的 Pythonic 界面。

有一个更好的方法吗?

4

2 回答 2

1

我已更新问题以包含答案。

于 2013-04-22T22:42:56.427 回答
0

它不是 CQL3,但这是一个简单的程序,可以直接使用 Thrift 接口读取 localhost 拥有的所有(腌制)数据。这可以用来构建一个简单的以 Cassandra 作为后端的 map/reduce 引擎。每个节点都会运行这样的东西来映射()属于自己的数据,因此不会产生数据检索的网络开销。然后将结果传送回单独节点上的 reduce() 阶段。

显然,这不适用于 Cassandra1.2+ 中的 vnode。我现在使用索引方法来允许 map() 覆盖较小的本地数据子集并支持 vnode。

#!/usr/bin/env python2.7

import sys
import socket
import cPickle as pickle
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from pycassa.cassandra import Cassandra
from pycassa.cassandra.ttypes import *
import time
import pprint

def main():
    jobname = sys.argv[1]
    pp = pprint.PrettyPrinter(indent=2)

    (client, transport) = connect("localhost")

    # Determine local IP address
    ip = socket.gethostbyname(socket.gethostname())

    # Set up query
    keyspace = "data"
    column_parent = ColumnParent(column_family=foo)

    try:
        # Find range of tokens for which this node is first replica
        for tokenrange in client.describe_ring(keyspace):
            if tokenrange.endpoints[0] == ip:
                start_token=tokenrange.start_token
                end_token=tokenrange.end_token
                break

        # Set kesypace
        client.set_keyspace(keyspace)

        # Query for all data owned by this node
        slice_range = SliceRange(start="", finish="")
        predicate = SlicePredicate(slice_range=slice_range)
        keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000)
        t0 = time.time()
        ptime = 0
        keycount = 0
        start=""
        for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE):
            keycount += 1
            for col in keyslice.columns:
                pt0 = time.time()
                data = pickle.loads(col.column.value)
                ptime += time.time() - pt0
    except Thrift.TException, tx:
        print 'Thrift: %s' % tx.message
    finally:
        disconnect(transport)

    t1 = time.time() - t0
    print "Read data for %d tasks in: %.2gs" %(keycount, t1)
    print "Job unpickling time: %.2gs" %ptime
    print "Unpickling percentage: %.2f%%" %(ptime/t1*100)

def connect(host):
    """ 
    Connect to cassandra instance on given host.
    Returns: Cassandra.Client object
    """
    socket = TSocket.TSocket(host, 9160)
    transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    transport.open()
    client = Cassandra.Client(protocol) 
    return (client, transport)

def disconnect(transport):
    """ 
    Disconnect from cassandra instance
    """
    transport.close()

if __name__ == '__main__':
    main()
于 2013-04-22T23:24:51.110 回答