Pool#
Is an object that automatically creates a specified number of processes and applies some function in those threads.
import numpy as np
from math import sqrt
from random import random
from multiprocessing import Pool
Examples and baselines#
We will consider a common task for data scientists, which involves computing distances between objects. We have an array of objects X
, and we need to compute the distances between each object in the array and a target object x
.
The next cell creates the required objects:
sample_size = 1_000_000
features_size = 10
X = np.random.normal(size=(sample_size, features_size))
x = np.random.normal(size=features_size)
The following cell implements an operation that we’ll use to calculate distances between objects.
def compute_distances(X : np.ndarray, a : np.ndarray) -> list:
'''
This function computes the Euclidean distances
between all objects in an array and a specified object.
Its purpose is to demonstrate the benefits of using multithreading.
The function intentionally avoids using numpy operations, as they appear
to have their own multithreading optimizations. The bottom line is that
without multithreading, there is no significant performance gain.
Parameters
----------
X : np.ndarray
(number of objcets, object dimensionality)
array of objects for which we need distances
a : np.ndarray
(object dimensionality)
object to which we need to compare all objects from X
'''
return [
sqrt(sum([(a_v-b_v)**2 for a_v, b_v in zip(a,b)]))
for b in X
]
So let’s estimate how long it will take to calculate all the distances at once.
%%timeit -n 1
compute_distances(X, x)
7.67 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
map#
Map is the simplest method of pooling - you simply specify which function to apply and define the function’s arguments for each process.
def wrapper(X):
'''
Just warps compute_distances to have opportunity to pass there different X
but the same X.
'''
return compute_distances(X, x)
Here is an example of use. The original array of objects was split into chunks and wrapper
was applied to each chunk.
%%timeit -n 1
split = np.array_split(X, 5)
pool = Pool(processes=5)
pool.map(wrapper , split)
pool.close()
pool.join()
2.9 s ± 53.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Results order#
pool.map
the results will always be in the order of the array they were applied to.
To demonstrate this, we have created an example where initially created threads take significantly longer to complete. In order to clearly determine which options finished earlier, we have included print statements. Additionally, each thread produces output that unambiguously identifies the thread from which the result was obtained.
def wrapper(it_number):
print(f"{it_number} iterations started \n", end="")
[random() for i in range(it_number)]
print(f"{it_number} iterations finished \n", end="")
return it_number
split = np.array_split(X, 5)
pool = Pool(processes=5)
results = pool.map(wrapper , [100000,100,10])
pool.close()
pool.join()
print(f"results : {results}")
100000 iterations started
100 iterations started
10 iterations started
100 iterations finished
10 iterations finished
100000 iterations finished
results : [100000, 100, 10]
So even thread with 100000 iterations finished last - it’s output is first in the output.