我ray.put(large_2d_array)
用来存储一个大型 numpy 2D 布尔数组,然后在工作进程中,我从这个共享的 numpy 数组中取出一列并将其传递给 cython 函数,然后通过cdef cnp.npy_bool view = &sliced_array[0]
. 使用此视图,我可以修改底层缓冲区(将一些索引设置为 True)。
由于等离子存储对象是不可变的,因此我可能会面临哪些未知问题?
如果有人可以对此有所了解,我会很高兴。
代码模板:
import numpy as np
# ray init code
......
large_2d_array = np.zeros((6000000000, 205), dtype=bool, order='F')
shared_array = ray.put(large_2d_array)
# Call worker via ray remote here and pass the shared_array
......
......
@ray.remote(num_cpus=1)
def worker(large_2d_array, col_idx):
array_slice = large_2d_array[:, col_idx]
cython_function(array_slice)
#In file: cython_func.pyx
def cython_function(cnp.ndarray[cnp.npy_bool, ndim=1, mode='c'] sliced_array):
cdef cnp.npy_bool view = &sliced_array[0]
cdef int i
for i in range(100):
view[i] = True
return
PS:没有两个工人可以同时访问同一个切片。每个切片(在这种情况下为列)仅被访问一次并且仅被修改/写入一次。
尝试演员方法 这仍然不起作用,我想我没有做错什么。
import ray
import numpy as np
ray.init(num_cpus=4)
@ray.remote
class test:
def __init__(self, shape):
self.shape = shape
self.np_array = np.zeros(shape, dtype=bool)
def get_col_slice(self, col_idx):
return self.np_array[:, col_idx]
def get_array(self):
return self.np_array
def write_to_slice(self, col_idx, nrows):
self.np_array[:, col_idx] = [1] * nrows
@ray.remote
def write_to_alternate_slice(actor_handle, col_idx, nrows):
actor_handle.write_to_slice.remote(col_idx, nrows)
shape = (10, 20)
test_actor = test.remote(shape)
for i in range(10, 2):
write_to_alternate_slice.remote(test_actor, i, 10)
print(ray.get(test_actor.get_array.remote()))