我正在尝试执行多进程以从 Cassandra 中提取数据。但是,我面临着这个问题。我想使用我的 Cassandra 我的 cassandra_db 类提供的多进程将其拉取为单个键或多个键
from cassandra.cluster import Cluster
import cassandra
import pandas as pd
import numpy as np
from datetime import datetime
import sys
import os
from threading import Event
import itertools
from multiprocessing import Pool
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
ip_address = '127.0.0.1'
class cassandra_db(object):
concurrency = 2 # chosen to match the default in execute_concurrent_with_args
def __init__(self,process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup)
@classmethod
def _setup(cls):
cls.session = Cluster([ip_address]).connect(keyspace='test')
cls.session.row_factory = pandas_factory
cls.prepared = cls.session.prepare('SELECT * FROM tr_test WHERE key=?')
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
try:
xrange
except NameError:
xrange = range
params = list(params)
print("-----> ",params)
print("-----+>",self.concurrency)
self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))
@classmethod
def _results_from_concurrent(cls, params):
return execute_concurrent_with_args(cls.session, cls.prepared, params)
def _multiprocess_get(params):
return cassandra_db._results_from_concurrent(params)
我的呼叫班
import os
import pandas as pd
import sys
relative_path='/home/anji'
sys.path.append(os.path.join(relative_path ,'commons','Database Operations'))
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra_db import cassandra_db
from cassandra.policies import ConstantReconnectionPolicy
processes =2
con_db = cassandra_db(processes)
keys=[(1,),(2,)]
df = con_db.get_results(keys)
print("Result",df.head())
错误:
multiprocessing.pool.MaybeEncodingError: Error sending result: '[[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa93658bbe0>), ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x7fa936a2e0f0>)]]'. Reason: 'PicklingError("Can't pickle <class 'importlib._bootstrap.ExecutionResult'>: attribute lookup ExecutionResult on importlib._bootstrap failed",)'
我试图执行 2 个键,但面临问题。有谁能帮我解决这个问题