1.5. Parallel processing in Python¶
Having a multicore CPU, certainly we want to make use of it for parallel processing. This is easily done using the Joblib library. Explanation of the functions is as follow
from joblib import Parallel, delayed # Note the use of parentheses results = Parallel(n_jobs=8, prefer="threads")(delayed(func_name)(func_para1, func_para2) for i in range(i_start, i_stop, i_step))
The first part of the code, Parallel(n_jobs=8, prefer="threads")
, is to select the number of cores and a backend method
for parallelization. The second part of the code, (delayed()() for ...)
has 3 sub-sections: the name of a function,
its parameters, and the loop. We can also use nested loops
results = Parallel(n_jobs=8, prefer="threads")(delayed(func_name)(func_para1, func_para2) for i in range(i_start, i_stop, i_step) \ for j in range(j_start, j_stop, j_step))
Note that results
is a list of the outputs of the function used. The order of the items in the list
corresponding to how the loops are defined. The following examples will make things more clear.
Example to show the output order of nested loops:
from joblib import Parallel, delayed def print_order(i, j): print("i = {0}; j = {1} \n".format(i, j)) return i, j results = Parallel(n_jobs=4, prefer="threads")(delayed(print_order)(i, j) for i in range(0, 2, 1) \ for j in range(2, 4, 1)) print("Output = ", results)>>> i = 0; j = 2 i = 0; j = 3 i = 1; j = 2 i = 1; j = 3 Output = [(0, 2), (0, 3), (1, 2), (1, 3)]Example to show how to apply a smoothing filter to multiple images in parallel
import timeit import multiprocessing as mp import numpy as np import scipy.ndimage as ndi from joblib import Parallel, delayed # Select number of cpu cores ncore = 16 if ncore > mp.cpu_count(): ncore = mp.cpu_count() # Create data for testing height, width = 3000, 5000 image = np.zeros((height, width), dtype=np.float32) image[1000:2000, 1500:3500] = 1.0 n_slice = 16 data = np.moveaxis(np.asarray([i * image for i in range(n_slice)]), 0, 1) print(data.shape) # >>> (3000, 16, 5000) # Using sequential computing for comparison t0 = timeit.default_timer() results = [] for i in range(n_slice): mat = ndi.gaussian_filter(data[:, i, :], (3, 5), 0) results.append(mat) t1 = timeit.default_timer() print("Time cost for sequential computing: ", t1 - t0) # >>> 8.831482099999999 # Using parallel computing t0 = timeit.default_timer() results = Parallel(n_jobs=16, prefer="threads")(delayed(ndi.gaussian_filter)(data[:, i, :], (3, 5), 0) for i in range(n_slice)) t1 = timeit.default_timer() print("Time cost for parallel computing: ", t1 - t0) # >>> 0.8372323000000002 # As the output is a list we have to convert it to a numpy array # and reshape to get back the original shape results = np.asarray(results) print(results.shape) # >>> (16, 3000, 5000) results = np.moveaxis(results, 0, 1) print(results.shape) # >>> (3000, 16, 5000)There are several options for choosing the backend methods. Depending on the problem and how input data are used, their performance can be significantly different. In the above example, the “threads” option gives the best performance. Note that we can’t use the above approaches for parallel reading or writing data from/to a hdf file. There is a different way of doing these.
Users can also refer to how Algotom uses Joblib for different use-cases as shown here, here, or here.