0

我是 python 新手。

我正在尝试Hbase thrift client using thrift。我在网上得到了一些代码,我只是修改它以使用最新版本,thrift但是当我运行代码时,它只是退出,没有启动线程。

这是代码。

import json, traceback, sys, datetime, time, logging, threading, random
import logging.handlers

import thrift
sys.path.append('gen-py')


from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import THBaseService



gWritenItems = 0
gStartT = 0
gEndT = 0

recordsPerBatch = 300 #reports per client per day
columns = 3

#config
concurrent = 10
records = 60000#6000000 #6 million
bytesPerRecord = 1024



mylock = threading.RLock()
class writeThread(threading.Thread):
    def __init__(self, threadname, RecordsThreadwillwrite):
        threading.Thread.__init__(self, name = threadname)
        bytesPerColumn = int(bytesPerRecord/columns) - 11 #suppose 3 columns

        self.columnvalue = "value_" + "x"*bytesPerColumn + "_endv"
        self.tbwBatch = int (RecordsThreadwillwrite / recordsPerBatch)

        self.transport = TBufferedTransport(TSocket('pnq-adongrevm1', 5151), 40960)
        self.transport.open()
        protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

        self.client = THBaseService.Client(protocol)
        self.table = "example"

    def run(self):
        print "+%s start" % (self.getName())
        global gEndT
        global gWritenItems           

        threadWritenItem = 0
        for loopidx in xrange(0, self.tbwBatch):            
            self.write_hbase() #write                                           
            threadWritenItem += recordsPerBatch   

        mylock.acquire()
        gEndT = time.time()  
        gWritenItems += threadWritenItem
        print "%s done, %s seconds past, %d reocrds saved" % (self.getName(), gEndT-gStartT, gWritenItems)
        mylock.release()
        self.transport.close()                   

    def write_hbase(self): #write 50 rowkyes, and  3 column families in each rowkey
        print self.getName(), "Start write"
        batchmutations = []
        for i in xrange(0, recordsPerBatch): # write to db, 300 items together
            mutations = []
            rowkey = "RK_%s_%s" % (random.random(), time.time())       
            for ii in xrange(0, columns):
                mutations.append(THBaseService.TPut(row=rowkey, columnValues=[TColumnValue(family="f1", qualifier="%s"%ii, value=self.columnvalue)]))
        self.client.putMultiple(self.table,mutations)        



itemsPerThread = int(records / concurrent)
for threadid in xrange(0, concurrent):    
    gStartT = time.time()
    t = writeThread("Thread_%s" % threadid, itemsPerThread)
    t.start();
print "%d thread created, each thread will write %d records" % (concurrent, itemsPerThread)

我只是收到一条消息10 thread created, each thread will write 6000 records

4

1 回答 1

2

是的,这是因为你没有等待线程完成它们的工作,所以主线程就退出了。尝试这个:

itemsPerThread = int(records / concurrent)
threads = []
for threadid in xrange(0, concurrent):    
    gStartT = time.time()
    t = writeThread("Thread_%s" % threadid, itemsPerThread)
    t.start();
    threads.append(t)

# wait until all finish the job
for t in threads:
    t.join()

编辑哈,我不认为我在这里,因为你没有将你的线程标记为守护进程。即使没有加入它也应该工作。但是看看这段代码:

class CustomThread(threading.Thread):
    def run(self):
        print "test"

for x in xrange(0, 10):
    t = CustomThread()
    t.start()

print "test"无论如何,它总是会达到线。print "+%s start" % (self.getName())因此,在您的代码中,无论如何它都应该始终达到。你确定它不起作用?:)

如果没有,那么只有两种可能:

  1. 您的方法中存在阻塞操作和/或异常__init__。但后来它无法达到最终印刷;
  2. concurrent变量是0出于某种原因(这与最终打印不一致)。
于 2013-03-29T14:20:58.117 回答