From 3c5ee4e03b96e0ba2139d978c3833cba5005d0da Mon Sep 17 00:00:00 2001 From: Enrique Martin Date: Wed, 2 Aug 2023 10:31:52 +0200 Subject: [PATCH] Added multiprocessing functions --- .../extras/pandoraPlugintools/threads.py | 115 +++++++++++++++++- 1 file changed, 110 insertions(+), 5 deletions(-) diff --git a/pandora_server/extras/pandoraPlugintools/threads.py b/pandora_server/extras/pandoraPlugintools/threads.py index 83f967b4b3..93c36b1cf5 100644 --- a/pandora_server/extras/pandoraPlugintools/threads.py +++ b/pandora_server/extras/pandoraPlugintools/threads.py @@ -1,6 +1,15 @@ import sys from queue import Queue from threading import Thread +from multiprocessing import Pool, Manager + +#### +# Define multi-processing internal global variables. +######################################################################################### + +_manager = Manager() +_shared_dict = _manager.dict() +_shared_dict_lock = _manager.Lock() #### # Internal use only: Run a given function in a thread @@ -26,7 +35,8 @@ def _single_thread( def run_threads( max_threads: int = 1, function: callable = None, - items: list = [] + items: list = [], + print_errors: bool = False ) -> bool: """ Run a given function for given items list in a given number of threads @@ -74,8 +84,9 @@ def run_threads( q.join() - for error in errors: - print(error,file=sys.stderr) + if print_errors: + for error in errors: + print(error,file=sys.stderr) if len(errors) > 0: return False @@ -83,5 +94,99 @@ def run_threads( return True except Exception as e: - print("Error while running threads: "+str(e)+"\n",file=sys.stderr) - return False \ No newline at end of file + if print_errors: + print("Error while running threads: "+str(e)+"\n",file=sys.stderr) + return False + +#### +# Set a given value to a key in the internal shared dict. +# Used by all parallel processes. +######################################################################################### +def set_shared_dict_value( + key: str = None, + value = None + ): + """ + Set a given value to a key in the internal shared dict. + Used by all parallel processes. + """ + global _shared_dict + + if key is not None: + with _shared_dict_lock: + _shared_dict[key] = value + +#### +# Add a given value to a key in the internal shared dict. +# Used by all parallel processes. +######################################################################################### +def add_shared_dict_value( + key: str = None, + value = None + ): + """ + Add a given value to a key in the internal shared dict. + Used by all parallel processes. + """ + global _shared_dict + + if key is not None: + with _shared_dict_lock: + if key in _shared_dict: + _shared_dict[key] += value + else: + set_shared_dict_value(key, value) + +#### +# Get the value of a key in the internal shared dict. +# Used by all parallel processes. +######################################################################################### +def get_shared_dict_value( + key: str = None + ): + """ + Get the value of a key in the internal shared dict. + Used by all parallel processes. + """ + global _shared_dict + + with _shared_dict_lock: + if key in _shared_dict and key is not None: + return _shared_dict[key] + else: + return None + +#### +# Run a given function for given items list in a given number of processes +# Given function receives each item as first parameter +######################################################################################### +def run_processes( + max_processes: int = 1, + function: callable = None, + items: list = [], + print_errors: bool = False + ) -> bool: + """ + Run a given function for given items list in a given number of processes + """ + + # Assign processes + processes = max_processes + + if processes > len(items): + processes = len(items) + + if processes < 1: + processes = 1 + + # Run processes + with Pool(processes) as pool: + try: + pool.map(function, items) + result = True + except Exception as error: + if print_errors: + print(error,file=sys.stderr) + result = False + + return result \ No newline at end of file