Source code for jammy.utils.process

import threading
import time
from subprocess import PIPE, Popen

import psutil

from jammy.logging import get_logger

__all__ = ["run_simple_command", "BashRunner"]


[docs]def run_simple_command(cmd: str): cmd = cmd.split(" ") with Popen(cmd, stdout=PIPE, stderr=PIPE) as prc: stdout, stderr = prc.communicate() stdout = stdout.decode("utf-8") stderr = stderr.decode("utf-8") prc.terminate() return stdout, stderr
class ThreadSafeDict(dict): def __init__(self, *p_arg, **n_arg): dict.__init__(self, *p_arg, **n_arg) self._lock = threading.Lock() def __enter__(self): self._lock.acquire() return self def __exit__(self, type_p, value, traceback): del type_p, value, traceback self._lock.release() def _get_mem(): return psutil.virtual_memory().available / 1024 / 1024 / 1024 logger = get_logger()
[docs]class BashRunner: # pylint: disable=too-many-instance-attributes def __init__( self, notifier=None, interval_cpu_check: float = 1.0, cpu_usage_thres: float = 92, seconds_numthread_wait: int = 10, mem_avail_thres: float = 0.9, seconds_resouece_wait: int = 2, ): self.notifier = notifier self.interval_cpu_check = interval_cpu_check self.cpu_usage_thres = cpu_usage_thres self.mem_avail_thres = mem_avail_thres self.seconds_numthread_wait = seconds_numthread_wait self.seconds_resouece_wait = seconds_resouece_wait self.complete_record = [] self.active_task = ThreadSafeDict() self.error_task = {} self.num_total_task = 0
[docs] def proc_event(self, cur_process): finished_num = len(self.complete_record) + len(self.error_task) return f"\nPID:{cur_process.pid}. Active:{len(self.active_task)}.\ Finished: {finished_num}/{self.num_total_task}"
def _execute_single_task(self, str_cmd): logger.debug("Checking states") while not (self._check_cpu() and self._check_mem()): time.sleep(self.seconds_resouece_wait) with Popen(str_cmd, shell=True) as process: # , stdout=PIPE, stderr=PIPE) return process def _check_single_task(self, cmd, task_process): poll_state = task_process.poll() if poll_state is None: return False elif poll_state == 0: if cmd not in self.complete_record: self._print_finish(cmd, task_process) else: if cmd not in self.error_task: self._print_error(cmd, task_process) return True
[docs] def reset(self, batch_cmd): self.num_total_task = len(batch_cmd) self.complete_record.clear() self.active_task.clear() self.error_task.clear()
[docs] def execute_tasks(self, batch_cmd, num_thread=0, msg=None): self.reset(batch_cmd) if num_thread == 1: self.queue_tasks(batch_cmd, msg) else: thread_check = threading.Thread(target=self._check_process) thread_check.start() for task in batch_cmd: self.check_num_thread(num_thread) process = self._execute_single_task(task) self._print_start(task, process) with self.active_task as active_task: active_task[task] = process thread_check.join() self._send_msg(msg)
[docs] def queue_tasks(self, batch_cmd, msg=None): for task in batch_cmd: with Popen(task, shell=True) as process: self._print_start(task, process) process.communicate() if process.poll() == 0: self._print_finish(task, process) else: self._print_error(task, process) self._print_complete() self._send_msg(msg)
def _send_msg(self, msg=None): if self.notifier is None: return if msg == 0: return else: if len(self.error_task) == 0: msg += "\n All tasks success!" else: for cmd, error_code in self.error_task.items(): msg += f"\nError: {error_code:03d}\t{cmd}" self.notifier.notify(msg)
[docs] def check_num_thread(self, num_thread): if num_thread > 0: while len(self.active_task) >= num_thread: time.sleep(self.seconds_numthread_wait)
[docs] def kill_all(self): for cmd, process in self.active_task.items(): process.kill() logger.info(f"Kill {cmd}")
def _check_process(self): logger.debug("Start monitor thread") is_exit = False while not is_exit: delete_active = [] with self.active_task as active_task: for cmd, task_process in active_task.items(): if self._check_single_task(cmd, task_process): delete_active.append(cmd) for key in delete_active: del self.active_task[key] time.sleep(5) if len(self.complete_record) + len(self.error_task) == self.num_total_task: self._print_complete() is_exit = True logger.debug("Exit check thread") def _check_cpu(self): logger.debug(f"cpu {psutil.cpu_percent(interval=self.interval_cpu_check)}") return self.cpu_usage_thres > psutil.cpu_percent( interval=self.interval_cpu_check ) def _check_mem(self): logger.debug(f"mem {_get_mem()}") return self.mem_avail_thres < _get_mem() def _print_finish(self, cmd, task_process): self.complete_record.append(cmd) logger.info(f"Finish {cmd}! {self.proc_event(task_process)}") def _print_error(self, cmd, task_process): poll_state = task_process.poll() self.error_task[cmd] = poll_state logger.critical( f"Error:\t{poll_state:03d}! {self.proc_event(task_process)}\n{cmd}" ) def _print_start(self, cmd, task_process): logger.info(f"Fire {cmd}.{self.proc_event(task_process)}") def _print_complete(self): logger.success( f"Complete tasks: {len(self.complete_record)}\tError tasks: {len(self.error_task)}" )