Added multiprocessing functions

This commit is contained in:
Enrique Martin 2023-08-02 10:31:52 +02:00
parent 27bab031c3
commit 3c5ee4e03b
1 changed files with 110 additions and 5 deletions

View File

@ -1,6 +1,15 @@
import sys
from queue import Queue
from threading import Thread
from multiprocessing import Pool, Manager
####
# Define multi-processing internal global variables.
#########################################################################################
_manager = Manager()
_shared_dict = _manager.dict()
_shared_dict_lock = _manager.Lock()
####
# Internal use only: Run a given function in a thread
@ -26,7 +35,8 @@ def _single_thread(
def run_threads(
max_threads: int = 1,
function: callable = None,
items: list = []
items: list = [],
print_errors: bool = False
) -> bool:
"""
Run a given function for given items list in a given number of threads
@ -74,8 +84,9 @@ def run_threads(
q.join()
for error in errors:
print(error,file=sys.stderr)
if print_errors:
for error in errors:
print(error,file=sys.stderr)
if len(errors) > 0:
return False
@ -83,5 +94,99 @@ def run_threads(
return True
except Exception as e:
print("Error while running threads: "+str(e)+"\n",file=sys.stderr)
return False
if print_errors:
print("Error while running threads: "+str(e)+"\n",file=sys.stderr)
return False
####
# Set a given value to a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def set_shared_dict_value(
key: str = None,
value = None
):
"""
Set a given value to a key in the internal shared dict.
Used by all parallel processes.
"""
global _shared_dict
if key is not None:
with _shared_dict_lock:
_shared_dict[key] = value
####
# Add a given value to a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def add_shared_dict_value(
key: str = None,
value = None
):
"""
Add a given value to a key in the internal shared dict.
Used by all parallel processes.
"""
global _shared_dict
if key is not None:
with _shared_dict_lock:
if key in _shared_dict:
_shared_dict[key] += value
else:
set_shared_dict_value(key, value)
####
# Get the value of a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def get_shared_dict_value(
key: str = None
):
"""
Get the value of a key in the internal shared dict.
Used by all parallel processes.
"""
global _shared_dict
with _shared_dict_lock:
if key in _shared_dict and key is not None:
return _shared_dict[key]
else:
return None
####
# Run a given function for given items list in a given number of processes
# Given function receives each item as first parameter
#########################################################################################
def run_processes(
max_processes: int = 1,
function: callable = None,
items: list = [],
print_errors: bool = False
) -> bool:
"""
Run a given function for given items list in a given number of processes
"""
# Assign processes
processes = max_processes
if processes > len(items):
processes = len(items)
if processes < 1:
processes = 1
# Run processes
with Pool(processes) as pool:
try:
pool.map(function, items)
result = True
except Exception as error:
if print_errors:
print(error,file=sys.stderr)
result = False
return result