有 500 个,不断增长DataFrames
,我想将(对于每个 DataFrame 独立)数据的操作提交到dask
. 我的主要问题是:可以dask
保存连续提交的数据,所以我可以submit
对所有提交的数据进行函数处理——而不仅仅是新提交的数据?
但是让我们通过一个例子来解释它:
创建一个dask_server.py
:
from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'
def run_cluster():
cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
client = Client(cluster)
print(client)
print("Press Enter to quit ...")
input()
if __name__ == '__main__':
run_cluster()
现在我可以从 mymy_stream.py
和 start 连接到submit
数据gather
:
DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)
def my_dask_function(lines):
return lines['a'].mean() + lines['b'].mean
def async_stream_redis_to_d(max_chunk_size = 1000):
while 1:
# This is a redis queue, but can be any queueing/file-stream/syslog or whatever
lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)
futures = []
df = pd.DataFrame(data=lines, columns=['a','b','c'])
futures.append(dask_client.submit(my_dask_function, df))
result = self.dask_client.gather(futures)
print(result)
time sleep(0.1)
if __name__ == '__main__':
max_chunk_size = 1000
thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
#thread_stream_data_from_redis.setDaemon(True)
thread_stream_data_from_redis.start()
# Lets go
这可以按预期工作,而且非常快!!!
但接下来,我想实际上是在计算发生之前append
的lines
第一个- 想知道这是否可能?因此,在我们的示例中,我想计算已提交的所有行,而不仅仅是最后提交的行。mean
问题/方法:
- 这种累积计算可行吗?
- 糟糕的替代方案 1:每次有新行到达时,我都会在本地缓存所有行并将
submit
所有数据缓存到集群 。这就像一个指数开销。试过了,可以,但是很慢! - 黄金选项:Python 程序 1 推送数据。可以将另一个客户端(来自另一个 python 程序)连接到该累积数据,并将分析逻辑从插入逻辑中移开。我认为发布的数据集是要走的路,但是是否适用于这种高速附加?