Source code for femagtools.multiproc


    Creating and managing multicore/multiprocessing jobs

    :authors: R. Tanner, N. Mauchle
import sys
import multiprocessing
import subprocess
import os
import logging
from .job import Job
import femagtools.config as cfg
    from subprocess import DEVNULL
except ImportError:
    DEVNULL = open(os.devnull, 'wb')

logger = logging.getLogger(__name__)

[docs]def run_femag(cmd, workdir, fslfile): """Start the femag command as subprocess. :internal: Args: cmd: The program (executable image) to be run workdir: The workdir where the calculation files are stored fslfile: The name of the start file (usually femag.fsl) """'FEMAG %s: %s', workdir, fslfile) with open(os.path.join(workdir, "femag.out"), "wb") as out, \ open(os.path.join(workdir, "femag.err"), "wb") as err: try: proc = subprocess.Popen(cmd + ['-b', fslfile], shell=False, stdin=DEVNULL, stdout=out, stderr=err, cwd=workdir) # wait proc.wait()"Finished pid: %d return %d",, proc.returncode) return proc.returncode except OSError as e: logger.error("Starting process failed: %s, Command: %s", e, cmd) raise"Finished pid: %d return %d",, proc.returncode) return proc.returncode
[docs]class Engine: """The MultiProc engine uses a pool of local calculation processes. This is more or less a decorator for the `Python multiprocessing Module <>`_ Args: cmd: the program (executable image) to be run (femag dc is used if None) process_count: number of processes (cpu_count() if None) """ def __init__(self, cmd=None, process_count=None): self.process_count = process_count if cmd: self.cmd = [cmd] else: self.cmd = [cfg.get_femag()] if not sys.platform.startswith('linux'): self.cmd.append('-m')
[docs] def create_job(self, workdir): """Create a FEMAG :py:class:`Job` Args: workdir: The workdir where the calculation files are stored Return: FEMAG :py:class:`Job` """ self.job = Job(workdir) return self.job
[docs] def submit(self): """Starts the FEMAG calculation(s) with the internal :py:meth:`multiproc.run_femag` function Return: length of started tasks """ self.pool = multiprocessing.Pool(self.process_count) self.tasks = [self.pool.apply_async(run_femag, args=(self.cmd,, t.fsl_file)) for t in self.job.tasks] self.pool.close() # used to free resources after calculations have finished. thomas.maier/OSWALD return len(self.tasks)
[docs] def join(self): """Wait until all calculations are finished Return: list of all calculations status (C = Ok, X = error) """ results = [task.get() for task in self.tasks] status = [] for t, r in zip(self.job.tasks, results): t.status = 'C' if r == 0 else 'X' status.append(t.status) return status
[docs] def terminate(self):"terminate Engine") self.pool.terminate()