Source code for harmless.parallelize

import multiprocessing as mp
import psutil

__all__ = ["calc_threads", "run_parallel"]


[docs] def calc_threads(pad=0.4): """Compute the total number of threads / processes to launch based on, (i) the total number of physical cores on a node (ii) the padding provided :param pad: padding, used as <cpu cores> * <pad>, defaults to 0.4 :type pad: float, optional :return: Total number of threads to be launched :rtype: int """ nthreads = int(psutil.cpu_count(logical=False) * pad) return nthreads
[docs] def run_parallel(function, files, nthreads): """Calls ``function`` for each element of the iterator ``files``. ``nthreads`` sets the number of worker processes spawned at a time. :param function: The function that is to be executed over each element of the iterator :type function: function :param files: The iterator that contains the data to be looped over :type files: iterator object :param nthreads: Number of worker processes to be spawned at a time. :type nthreads: int """ pool = mp.Pool(nthreads) pool.map_async(function, files).get(720000) pool.close() pool.join()