5

有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立)数据的操作提交到dask. 我的主要问题是:可以dask保存连续提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?

但是让我们通过一个例子来解释它:

创建一个dask_server.py

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
    cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
    client = Client(cluster)
    print(client)
    print("Press Enter to quit ...")
    input()

if __name__ == '__main__':
    run_cluster()

现在我可以从 mymy_stream.py和 start 连接到submit数据gather

DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)

def my_dask_function(lines):
    return lines['a'].mean() + lines['b'].mean

def async_stream_redis_to_d(max_chunk_size = 1000):
    while 1:

        # This is a redis queue, but can be any queueing/file-stream/syslog or whatever
        lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)

        futures = []
        df = pd.DataFrame(data=lines, columns=['a','b','c'])
        futures.append(dask_client.submit(my_dask_function, df))

        result = self.dask_client.gather(futures)
        print(result)

        time sleep(0.1)

if __name__ == '__main__':
    max_chunk_size = 1000
    thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
    #thread_stream_data_from_redis.setDaemon(True)
    thread_stream_data_from_redis.start()
    # Lets go

这可以按预期工作,而且非常快!!!

但接下来,我想实际上是在计算发生之前appendlines第一个- 想知道这是否可能?因此,在我们的示例中,我想计算已提交的所有行,而不仅仅是最后提交行。mean

问题/方法:

  1. 这种累积计算可行吗?
  2. 糟糕的替代方案 1:每次有新行到达时,我都会在本地缓存所有行并将submit 所有数据缓存到集群 。这就像一个指数开销。试过了,可以,但是很慢!
  3. 黄金选项:Python 程序 1 推送数据。可以将另一个客户端(来自另一个 python 程序)连接到该累积数据,并将分析逻辑从插入逻辑中移开。我认为发布的数据集是要走的路,但是是否适用于这种高速附加?

可能相关:分布式变量Actors Worker

4

1 回答 1

1

将期货列表分配给已发布的数据集对我来说似乎很理想。这相对便宜(一切都是元数据),您将在几毫秒内保持最新

client.datasets["x"] = list_of_futures

def worker_function(...):
    futures = get_client().datasets["x"]
    data = get_client.gather(futures)
    ... work with data

正如您提到的,还有其他系统,例如 PubSub 或 Actors。从您所说的来看,我怀疑 Futures + Published 数据集更简单,更实用。

于 2020-05-23T18:12:25.933 回答