每当你persist
or compute
一个 Dask 集合,数据发送到调度程序,然后从调度程序发送到工作人员。如果您想绕过在调度程序上存储数据,那么您将必须学习如何移动数据 with scatter.
您有三个选择:
- 不要在客户端计算机上加载数据
- 先分散再分块
- 分块然后分散
不要在客户端计算机上加载数据
最好的方法是将加载数据作为计算的一部分,而不是在本地进行。
Before
x = load_array_from_file(fn) # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100)) # split with dask
x = x.persist()
After
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()
有关创建 dask 数组的更多信息如下:http://dask.pydata.org/en/latest/array-creation.html
先分散再分块
您可以将 numpy 数组直接分散给工作人员
future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()
这会将您的数据直接移动到工作人员,然后从那里分块。这很好,因为它绕过了调度程序。但是,如果您的工作人员开始出现故障,您现在将面临数据丢失的风险。仅当您处于大规模并行系统中时这才重要。
这也有点低效,因为所有数据都集中在一个工作人员上,而不是分散开来。你可能会打电话client.rebalance
或继续阅读。
成块然后分散
您可以使用本地调度程序在本地对数据进行分块,然后分散到集群中。
x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x # re-attach scattered futures as task graph
留在当地
或者,您可以继续在本地使用 dask,无论是使用线程调度程序,还是仅使用本地进程的分布式调度程序。
client = Client(processes=False)
这将停止本地进程、调度程序和工作线程之间不必要的数据复制。它们现在都在您的同一个本地进程中。
也可以看看:如何在分布式Dask中高效提交带有大参数的任务?对于这个答案的基于任务的版本