2128 words
11 minutes
Shared-Memory parallelization in Python

Parallelization in Python#

NOTE

All the code used in this article is avaialble here - https://github.com/XerxesViper/Mem-shared-multiproc

Parallelization is a process of breaking the original task into smaller subtasks and executing them simultaneously. This practically helps in reducing the overall execution time of the program. This is particularly useful for tasks that are either CPU-bound, requiring significant computation, or I/O-bound, involving tasks like file reading, network requests, or database operations.

Parallelization in Python comes in two flavours - Multithreading and Multiprocessing. But to understand this distinction, we first need to understand the difference between a thread and a process:

  • Thread - A thread is the smallest unit of processing that can be scheduled by an operating system. Multiple threads can exist within the same process, sharing the same memory space. A more detailed explanation can be found in the video below:
  • Process - A process is an independent execution unit that has its own memory space and system resources. It does not share memory with other processes. A more detailed explanation can be found in the video below:

Now that we understand the difference between a thread and a process, we can examine what multithreading and multiprocessing is:

  • Multithreading - Multithreading in Python refers to the concurrent execution of multiple threads within a single process. Each thread runs in parallel with others, allowing for simultaneous execution of different parts of a program.
  • Multiprocessing - Multiprocessing in Python refers to the ability to run multiple processes concurrently, each with its own memory space, to perform tasks in parallel. This isolation helps avoid issues like data corruption that can occur when multiple threads share the same memory.

Multithreading#

Multithreading is not the main focus of this article; however, it’s still good to understand the benefits we can get with simple multithreading in Python. Multithreading in Python is a powerful yet simple tool that can decrease the execution time of your code using straightforward modifications. It is useful in scenarios where a program needs to perform multiple tasks simultaneously, such as handling multiple network connections, performing background tasks, or maintaining a responsive user interface. However, Python’s CPython implementation (the most common) has a Global Interpreter Lock (GIL) that allows only one thread to execute Python bytecode at a time. This means that multithreading in Python is not suitable for CPU-bound tasks, as the GIL can become a bottleneck. Nevertheless, it remains effective for I/O-bound tasks.

Multithreading in python can be achieved in many ways, the easiest being:

  • Python’s standard threading library
  • Using external Joblib library

Example of parallelization using threading library:

import threading

# Function to compute the sum of squares in a given range
def compute_sum_of_squares(start, end, results, index):
    total = sum(x * x for x in range(start, end))
    results[index] = total

# Number of threads
num_threads = 6

# Results array to store the sum of squares for each thread
results = [0] * num_threads

# Create and start threads
threads = []
for i in range(num_threads):
    start = i * 1000000
    end = (i + 1) * 1000000
    thread = threading.Thread(target=compute_sum_of_squares, args=(start, end, results, i))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# Sum up the results from all threads
total_sum_of_squares = sum(results)
print(f"Total sum of squares: {total_sum_of_squares}")

Same example, but done using the Joblib library

from joblib import Parallel, delayed

# Function to compute the sum of squares for a range of numbers
def compute_sum_of_squares(start, end):
    return sum(x * x for x in range(start, end))

# Number of jobs (equivalent to number of threads)
num_jobs = 6

# Define the ranges for each job
ranges = [(i * 1000000, (i + 1) * 1000000) for i in range(num_jobs)]

# Use joblib to parallelize the computation using threads
results = Parallel(n_jobs=num_jobs, backend='threading')(delayed(compute_sum_of_squares)(start, end) for start, end in ranges)

# Sum up the results from all jobs
total_sum_of_squares = sum(results)
print(f"Total sum of squares: {total_sum_of_squares}")
Total sum of squares: 71999982000001000000
Total time for threading: 0.5573787000030279 sec

Total sum of squares: 71999982000001000000
Total time for joblib: 0.5454119000060018 sec

As you can see above, both examples produce the same results and take almost the same time. However, the syntax of joblib is relatively cleaner as it handles the creation and joining of the threads, though this creates a bit of overhead. You can get quite far by implementing simple threading in your tasks, but be aware that sometimes tasks slow down when using threading because of improper implementation. This can cause segmentation faults and incorrect results due to race conditions. Alas, that is beyond the scope of this article… let’s move on to the fun part.


Multiprocessing#

Now we come to the bread and butter of this article—Multiprocessing! As mentioned earlier, multiprocessing in Python refers to the ability to run multiple processes concurrently, each with its own memory space, to perform tasks in parallel. Unlike threading, which runs multiple threads within the same process and shares the same memory space, multiprocessing creates separate processes, each with its own Python interpreter and memory space. This isolation helps avoid issues like data corruption that can occur when multiple threads share the same memory.

Since different processes run with their own memory space, one process cannot directly read or manipulate the memory space of another process. This is done by design to prevent data corruption and enhance security. However, there are situations where sharing data between processes is necessary for the program to function correctly. Python’s built-in multiprocessing library provides mechanisms for sharing memory between processes, allowing them to safely exchange data while avoiding the risks of corruption. One other way to handle shared memory for integer-based ctypes is to use a multiprocessing wrapper over numpy to get the speed benefits of array vectorization. We will be using this wrapper for the example in this blog. Below is a simple illustration of how shared memory works in Python:

+-------------------+       +-------------------+
|    Process 1      |       |    Process 2      |
|                   |       |                   |
| Private Memory    |       | Private Memory    |
| (Local variables, |       | (Local variables, |
| independent data) |       | independent data) |
+-------------------+       +-------------------+
         |                           |
         |                           |
         v                           v
	   +-------------------------------+
	   |         Shared Memory         |
	   |                               |
	   | - Shared variables (e.g.,     |
	   |   multiprocessing.Value)      |
	   | - Shared arrays (e.g.,        |
	   |   multiprocessing.Array)      |
	   | - Shared objects (e.g.,       |
	   |   lists, dicts via            |
	   |   multiprocessing.Manager)    |
	   |                               |
	   | * Accessible by both          |
	   |   processes                   |
	   | * Managed to avoid            |
	   |   data corruption             |
	   +-------------------------------+

Now that we know we can share memory between processes, we can see by example how to implement it. Below is a small project - file integrity checker - written in Python, that uses memory-shared parallel processing to speed up the process of comparing files in a directory against a baseline. It calculates metadata for each file, like a truncated MD5 hash, file size, and timestamp, and checks for changes, new files, or deletions. It splits the workload into multiple processes, making it much faster compared to a regular single-threaded (for, while loop) approach, especially for large directories. Here the shared memory array is used to store the baseline metadata (hash, size, and timestamp) of the files in a way that all worker processes can access it simultaneously without duplicating the data in memory. If you want to test the project on your own data feel free to use the code from - https://github.com/XerxesViper/Mem-shared-multiproc.

Now let’s get into the code and results

import os  
import json  
import hashlib  
import numpy as np  
import multiprocessing as mp  
from mp_shared_array import MemorySharedNumpyArray  
from tqdm import tqdm  
  
  
def load_baseline_metadata(json_file, shared_array):  
    with open(json_file, "r") as f:  
        baseline_metadata = json.load(f)  
  
    np_array = shared_array.get_numpy_handle()  
    shared_array.get_lock().acquire()  # locking the shared memory array for reading  
  
    for i, entry in enumerate(baseline_metadata):  
        np_array[i, 0] = entry["hash"]  
        np_array[i, 1] = entry["size"]  
        np_array[i, 2] = entry["timestamp"]  
  
    shared_array.get_lock().release()  # release the lock after reading the data  
  
  
def calculate_file_metadata(file_path, hash_algorithm="md5"):  
    try:  
        hash_func = hashlib.new(hash_algorithm)  
        with open(file_path, "rb") as f:  
            while chunk := f.read(8192):  
                hash_func.update(chunk)  
        file_hash = hash_func.hexdigest()  
        hash_numeric = int(file_hash, 16) & ((1 << 64) - 1)  
        file_size = os.path.getsize(file_path)  
        file_timestamp = os.path.getmtime(file_path)  
        return {"hash": hash_numeric, "size": file_size, "timestamp": file_timestamp}  
    except FileNotFoundError:  
        return None  
  
  
def producer_process(directory, queue, stop_event, buffer_size):  
    """  
    Producer process to read files from the directory and load them into the queue.  
    
    Args:        
    directory (str): Path to the directory to scan.        
    queue (mp.Queue): Queue to store file paths.        
    stop_event (mp.Event): Event to signal when the producer is done.        
    buffer_size (int): Maximum size of the queue.  
    
    Returns: None    
    """    
    for root, _, files in os.walk(directory):  
        for file in files:  
            file_path = os.path.join(root, file)  
            queue.put(file_path)  # Add file path to the queue  
            while queue.qsize() >= buffer_size:  # Wait if the queue is full  
                pass  
    stop_event.set()  # Signal that the producer is done  
  
  
def worker_process(queue, shared_array, progress_counter, stop_event, alert_list):  
    """  
    Worker process to calculate file metadata and compare with baseline.  
    Args:       
    queue (mp.Queue): Queue containing file paths to process.        
    shared_array (MemorySharedNumpyArray): Shared memory array for baseline metadata.       
    progress_counter (mp.Value): Shared counter for progress tracking.        
    stop_event (mp.Event): Event to signal when the producer is done.        
    alert_list (mp.Manager().list): Shared list to store alerts.  
    
    Returns: None    
    """    while not (queue.empty() and stop_event.is_set()):  
        try:  
            file_path = queue.get()  
            if file_path == "STOP":  
                break  
  
            current_metadata = calculate_file_metadata(file_path)  
  
            if current_metadata is None:  
                with progress_counter.get_lock():  
                    progress_counter.value += 1  
                continue  
  
            # Compare with baseline  
  
            np_array = shared_array.get_numpy_handle()  
            shared_array.get_lock().acquire()  
            baseline_hashes = np_array[:, 0]  
            baseline_sizes = np_array[:, 1]  
            baseline_timestamps = np_array[:, 2]  
            shared_array.get_lock().release()  
  
            if current_metadata["hash"] not in baseline_hashes:  
                alert_list.append(f"New file detected: {file_path}")  
            else:  
                idx = np.where(baseline_hashes == current_metadata["hash"])[0][0]  
                if current_metadata["size"] != baseline_sizes[idx]:  
                    alert_list.append(f"File size changed: {file_path}")  
                if current_metadata["timestamp"] != baseline_timestamps[idx]:  
                    alert_list.append(f"File timestamp changed: {file_path}")  
  
            with progress_counter.get_lock():  
                progress_counter.value += 1  
        except Exception as e:  
            alert_list.append(f"Error processing file {file_path}: {e}")  
            with progress_counter.get_lock():  
                progress_counter.value += 1  
  
  
def main():  
    directory_to_scan = r"E:\Images"  
    baseline_json = "file_metadata_truncated.json"  
    num_processes = mp.cpu_count() - 2  
    buffer_size = 100  
  
    num_files = sum(len(files) for _, _, files in os.walk(directory_to_scan))  
    print(f"Total files in directory: {num_files}")  
  
    shared_array = MemorySharedNumpyArray(  
        dtype=np.float64, shape=(num_files, 3), sampling=1, lock=True  
    )  
  
    print("Loading baseline metadata...")  
    load_baseline_metadata(baseline_json, shared_array)  
    print("Baseline metadata loaded.")  
  
    file_queue = mp.Queue()  
    stop_event = mp.Event()  
    progress_counter = mp.Value("i", 0)  
  
    # Shared list for alerts  
    manager = mp.Manager()  
    alert_list = manager.list()  
  
    # Start the producer process  
    producer = mp.Process(target=producer_process, args=(directory_to_scan, file_queue, stop_event, buffer_size))  
    producer.start()  
  
    # Create a progress bar for file processing  
    progress_bar = tqdm(total=num_files, desc="Processing Files", position=0)  
  
    # Start worker processes  
    processes = []  
    for _ in range(num_processes):  
        p = mp.Process(target=worker_process, args=(file_queue, shared_array, progress_counter, stop_event, alert_list))  
        processes.append(p)  
        p.start()  
  
    # Update the progress bar in the main process  
    while progress_counter.value < num_files:  
        progress_bar.n = progress_counter.value  
        progress_bar.refresh()  
  
    # Wait for all processes to finish  
    for p in processes:  
        p.join()  
  
    producer.join()  
    progress_bar.close()  
  
    # Print all alerts at the end  
    print("\n--- Alerts ---")  
    for alert in alert_list:  
        print(alert)  
  
    print("File comparison complete.")  
  
  
if __name__ == "__main__":  
    main()

Here in this code we have different functions

  • load_baseline_metadata: Loads the metadata of all files from a json file. These are precalculated hashes for a baseline comparison (check https://github.com/XerxesViper/Mem-shared-multiproc/blob/main/create_file_metadata.py)
  • calculate_file_metadata: Calculates the hash value of each file to compare against the baseline.
  • producer_process: Loads the files into a multiprocessing queue for all processes to access. This decreases the time a process has to wait for new file to be loaded from disk - increasing the speed of the program.
  • worker_process: calls the calculate_file_metadata function and compares the output against the baseline value to detect changes. multiple worker processes are spawned to share the load and speed up the process.

So after running the code on a directory we get:

python .\file_integrity_parallel.py 

Processes spawned = 10

Total files in directory: 1734
Loading baseline metadata...
Baseline metadata loaded.

Processing Files: 100%|██████████████████████████████████████████▉| 1732/1734 
[00:08<00:00, 210.61it/s]


--- Alerts ---
File timestamp changed: E:\Images\destiny-rise-of-iron-8160x4320-expansion-ps4-xbox-one-4k-8k-hd-2583.jpg
File timestamp changed: E:\Images\thunderjaw-7000x4181-aloy-horizon-zero-dawn-london-4k-8k-6536.jpg     

New file detected: E:\Images\tom_clancys_the_division_2-7680x4320.jpg

File comparison complete.

Here you can see to compare ~1700 files it takes about 8 seconds. And the code shows that the there are files with different time stamps and a new file has been added to the system. Similarly when we run the linear code we get:

python .\file_integrity_linear.py   

Loading baseline metadata...
Baseline metadata loaded.
Processing files...

Processing Files: 100%|████████████████████████████████████████████| 1734/1734 [00:27<00:00, 63.78it/s]

--- Alerts ---
File timestamp changed: E:\Images\destiny-rise-of-iron-8160x4320-expansion-ps4-xbox-one-4k-8k-hd-2583.jpg
File timestamp changed: E:\Images\thunderjaw-7000x4181-aloy-horizon-zero-dawn-london-4k-8k-6536.jpg     

New file detected: E:\Images\tom_clancys_the_division_2-7680x4320.jpg

File comparison complete.

We can see that the linear code provides the same results; however, it takes ~27 seconds (4X more time) to complete the same task. Here we are spawning 10 separate processes in the parallel process which work together, reducing the execution time of the program. This number 10 comes from the mp.cpu_count() - 2 formula which is a decent starting point, but not the most optimal one. To find the optimal number of processes, we need to understand the data we will be scanning and the structure and core configuration of the CPU. However, this is beyond the scope of this article.

This shows that with multiprocessing and memory sharing, we can make Python programs much faster and efficient without too much hassle.

Shared-Memory parallelization in Python
http://xerxesviper.fyi/posts/python/parallelization-with-python/
Author
Xerxes Viper
Published at
2025-03-23