3 min read

Hint at a better parallelization of groupby in Pandas

Intro

Parallelizing large amount of groups might requiere a lot of time without parallization. However directly parallize groups when the number of groups is very large and the function applied to each of them is rather fast, might lead to worse result than no parallezation.

Parallelizing every group creates a chunk of data for each group. Each chunk needs to be transfered to cores in order to be processed. Transfering chunk of data costs time. We want to create the minimal amont of chunks and each chunk must contains data needed by groups. Then we apply the grouping operation on these chunks. However we don’t want to exactly create one chunk per core, because some cores might be faster than others, therefore faster cores must be able to process multiple chunks and slower cores fewer. I personally curently use a rule of thumb of 3 chunk per core.

Code chunks

Solution example and benchmark

DISCLAIMER : This is an example of code and need to be adapted to your own code.

Note : Sorry for using only one chunk, it’s curently not possible to do otherwise. See here.

from multiprocessing import Pool, cpu_count
import pandas as pd
import numpy as np
import timeit
import time
#import dask
#import dask.dataframe as dd
def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pd.concat(ret_list)
# Create a Dataframe for a minimum example
df = pd.DataFrame()
# 5000 users with approx 100 values
df["user_id"] = np.random.randint(5000, size=500000)
# Generate 500000 random integer values
df["value"] = np.random.randint(30, size=500000)
# Create data_chunk based on modulo of user_id
df["data_chunk"] = df["user_id"].mod(cpu_count() * 3)
# Any not optimised and intensive function i want to apply to each group
def group_function(group):
    # Inverse cumulative sum
    group["inv_sum"] = group.iloc[::-1]['value'].cumsum()[::-1].shift(-1).fillna(0)
    return group
def func_group_apply(df):
    return df.groupby("user_id").apply(group_function)
    
start = time.time()
normal = df.groupby("user_id").apply(group_function)
end = time.time()
print("Execution time :" + str(end - start))
## Execution time :9.957386016845703
start = time.time()
parallel = applyParallel(df.groupby("user_id"), group_function)
end = time.time()
print("Execution time :" + str(end - start))
## Execution time :8.90306806564331
start = time.time()
parallel_chunk = applyParallel(df.groupby("data_chunk") , func_group_apply)
end = time.time()
print("Execution time :" + str(end - start))
# Let's check if we have the same results and that we didn't any problem with the parallel computation
# Sorting on index is part from the test, and not from the compution, therefore i don't include it in Execution time part
## Execution time :5.269551038742065
normal = normal.sort_index()
parallel = parallel.sort_index()
parallel_chunk = parallel_chunk.sort_index()
#parallel_dask = parallel_dask.sort_index()
# Check we have same results
print(normal.equals(parallel))
## True
print(normal.equals(parallel_chunk))
#print(normal.equals(parallel_dask))
## True

Setup : Intel(R) Core(TM) i5-5287U CPU @ 2.90GHz (2c/4t)

Or %timeit results.

print("Not working yet")
#%timeit df.groupby("user_id").apply(group_function)
#%timeit applyParallel(df.groupby("user_id"), group_function)
#%timeit applyParallel(df.groupby("group") , func_group_apply)
# 1 loop, best of 3: 6.79 s per loop
# 1 loop, best of 3: 5.61 s per loop
# 1 loop, best of 3: 3.23 s per loop
## Not working yet

The chunk parallelization clearly yield better results than the two others solution.

Dask implementation

At the time of the test, I wasn’t able to make Dask implementation work.

Going further

def func_group_apply(df):
    return df.groupby("user_id").apply(group_function)

The above function doesn’t take group_function as an argument, neighter the grouping columns. However at some point we would like that our function take several inputs as stated in this thread and might help us.

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pd.concat(ret_list)

Ideally creating an argument col_to_group would be ideal, and therefore the applyParallel(...) could be more generic.