# -*- coding: utf-8 -*-
"""
femagtools.femag
~~~~~~~~~~~~~~~~
Running FEMAG
"""
import subprocess
import os
import glob
import logging
try:
import zmq
except ImportError:
pass
import sys
import json
import io
import femagtools.mcv
import femagtools.airgap as ag
import femagtools.fsl
import femagtools.ntib as ntib
import femagtools.config as cfg
import time
import platform
import re
from threading import Thread
logger = logging.getLogger(__name__)
BCHEXT = '.BATCH' if sys.platform.startswith('linux') else '.BCH' # win32
[docs]class FemagError(Exception):
pass
[docs]class BaseFemag(object):
def __init__(self, workdir, cmd, magnetizingCurves, magnets):
self.workdir = workdir
if cmd:
self.cmd = cmd
else:
self.cmd = cfg.get_femag()
if magnetizingCurves:
if isinstance(magnetizingCurves,
femagtools.mcv.MagnetizingCurve):
self.magnetizingCurves = magnetizingCurves
else:
self.magnetizingCurves = femagtools.mcv.MagnetizingCurve(
magnetizingCurves)
else:
self.magnetizingCurves = []
if magnets:
if isinstance(magnets, femagtools.Magnet):
self.magnets = magnets
else:
self.magnets = femagtools.Magnet(magnets)
else:
self.magnets = []
[docs] def copy_magnetizing_curves(self, model, dir=None):
"""extract mc names from model and write files into workdir or dir if given
Return:
list of extracted mc names (:obj:`list`)
"""
dest = dir if dir else self.workdir
return [self.magnetizingCurves.writefile(m, dest)
for m in model.set_magcurves(
self.magnetizingCurves, self.magnets)]
[docs] def create_fsl(self, pmMachine, operatingConditions):
"""create list of fsl commands"""
model = femagtools.MachineModel(pmMachine)
self.modelname = model.name
self.copy_magnetizing_curves(model)
builder = femagtools.fsl.Builder()
return builder.create(model, operatingConditions, self.magnets)
[docs] def get_log_value(self, pattern, modelname='FEMAG-FSL.log'):
result = []
pat = re.compile(pattern)
with open(os.path.join(self.workdir, 'FEMAG-FSL.log')) as f:
for line in f:
m = pat.search(line)
if m:
result.append(float(line.split(':')[-1].split()[0]))
return result
[docs] def read_bch(self, modelname=None):
"read most recent BCH/BATCH file and return result"
# read latest bch file if any
if not modelname:
modelname = self._get_modelname_from_log()
result = femagtools.bch.Reader()
bchfile_list = sorted(glob.glob(os.path.join(
self.workdir, modelname+'_[0-9][0-9][0-9]'+BCHEXT)))
if len(bchfile_list) > 0:
logger.info("Read BCH {}".format(bchfile_list[-1]))
with io.open(bchfile_list[-1], encoding='latin1',
errors='ignore') as f:
result.read(f)
return result
[docs] def read_los(self, modelname=None):
"read most recent LOS file and return result"
# read latest los file if any
if not modelname:
modelname = self._get_modelname_from_log()
losfile_list = sorted(glob.glob(os.path.join(
self.workdir, modelname+'_[0-9][0-9][0-9].LOS')))
if len(losfile_list) > 0:
return ntib.read_los(losfile_list[-1])
return dict()
[docs] def read_airgap_induction(self):
"""read airgap induction"""
return ag.read(os.path.join(self.workdir, 'bag.dat'))
def _get_modelname_from_log(self):
"""
Read the modelname from the Femag Log file
"""
with open(os.path.join(self.workdir, 'FEMAG-FSL.log')) as f:
for l in f:
if l.startswith('New model') or l.startswith('Load model'):
model = l.split()[2].replace('"', '').replace(',', '')
break
return model
[docs]class Femag(BaseFemag):
"""Invoke and control execution of FEMAG
Args:
workdir: name of working directory
cmd: name of femag program
magnetizingCurves: collection of lamination material curves
magnets: collection of magnet material
"""
def __init__(self, workdir, cmd=None,
magnetizingCurves=None, magnets=None):
super(self.__class__, self).__init__(workdir, cmd,
magnetizingCurves, magnets)
[docs] def run(self, filename, options=['-b'], fsl_args=[]):
"""invoke FEMAG in current workdir
Args:
filename: name of file to execute
options: list of FEMAG options
fsl_args: list of FSL argument options
Raises:
FemagError
"""
if self.cmd.startswith('wfemag') and \
'-b' in options and \
'-m' not in options:
options.insert(0, '-m')
args = [self.cmd] + options + [filename] + fsl_args
basename, ext = os.path.splitext(os.path.basename(filename))
outname = os.path.join(self.workdir, basename+'.out')
errname = os.path.join(self.workdir, basename+'.err')
with open(outname, 'w') as out, open(errname, 'w') as err:
logger.info('invoking %s', ' '.join(args))
proc = subprocess.Popen(
args,
stdout=out, stderr=err, cwd=self.workdir)
proc.wait()
errs = []
# print femag output
print(outname)
with io.open(outname, encoding='latin1', errors='ignore') as outfile:
for l in outfile:
print(l.strip())
if l.find('ERROR') > -1:
errs.append(l.strip())
rc = proc.returncode
logger.info("%s exited with returncode %d (num errs=%d)",
self.cmd, rc, len(errs))
if rc != 0 or errs:
with io.open(errname, encoding='latin1',
errors='ignore') as errfile:
for l in errfile:
errs.append(l.strip())
errs.insert(0, 'Exit code {}'.format(rc))
raise FemagError(errs)
[docs] def cleanup(self):
"removes all created files in workdir"
if not os.path.exists(self.workdir):
return
cleanfiles = ('*.B*CH', '*.I*7-*', '*.A*7-*',
'*.dat', '*.LOS', '*.svg', '*.png')
# '*.TMC','*.TMO', '*.PROT', '*.hxy'):
for p in cleanfiles:
for f in glob.glob(os.path.join(self.workdir, p)):
os.remove(f)
def __call__(self, pmMachine, operatingConditions,
options=['-b'], fsl_args=[]):
"""setup fsl file, run calculation and return BCH results"""
fslfile = 'femag.fsl'
with open(os.path.join(self.workdir, fslfile), 'w') as f:
f.write('\n'.join(self.create_fsl(pmMachine,
operatingConditions)))
if operatingConditions['calculationMode'] == "pm_sym_loss":
with open(os.path.join(self.workdir,
self.modelname+'.ntib'), 'w') as f:
f.write('\n'.join(ntib.create(
operatingConditions['speed'],
operatingConditions['current'],
operatingConditions['angl_i_up'])))
# TODO: add r1, m
self.run(fslfile, options, fsl_args)
if operatingConditions['calculationMode'] == "pm_sym_loss":
return self.read_los(self.modelname)
return self.read_bch(self.modelname)
[docs]class ZmqFemag(BaseFemag):
"""Invoke and control execution of FEMAG with ZeroMQ
Args:
workdir: name of working directory
cmd: name of femag program
"""
def __init__(self, workdir, port, host='localhost', cmd=None,
magnetizingCurves=None, magnets=None):
super(self.__class__, self).__init__(workdir, cmd,
magnetizingCurves, magnets)
self.host = host
self.port = port
self.request_socket = None
self.subscriber_socket = None
self.proc = None
self.reader = None
def __del__(self):
if self.request_socket:
self.request_socket.close()
if self.subscriber_socket:
self.subscriber_socket.close()
if self.reader:
self.reader.continue_loop = False
logger.info("Destructor ZmqFemag")
def __req_socket(self):
"""returns a new request client"""
if self.request_socket:
return self.request_socket
context = zmq.Context.instance()
self.request_socket = context.socket(zmq.REQ)
self.request_socket.connect('tcp://{0}:{1}'.format(
self.host, self.port))
self.request_socket.setsockopt(zmq.LINGER, 500)
return self.request_socket
def __sub_socket(self):
"""returns a subscriber client"""
if self.subscriber_socket:
return self.subscriber_socket
context = zmq.Context.instance()
self.subscriber_socket = context.socket(zmq.SUB)
self.subscriber_socket.connect(
'tcp://{0}:{1}'.format(
self.host, self.port+1))
self.subscriber_socket.setsockopt(zmq.SUBSCRIBE, b'')
self.subscriber_socket.RCVTIMEO = 900 # in milliseconds
return self.subscriber_socket
def __is_process_running(self, procId):
try:
import psutil
return psutil.pid_exists(procId)
except:
pass
# long version, self made
try:
if procId > 0:
if platform.system() == "Windows":
# if procId in psutil.get_pid_list():
proc = subprocess.Popen(["tasklist"],
stdout=subprocess.PIPE)
for l in proc.stdout:
ls = l.split()
try:
if str(procId) == ls[1]:
return True
except:
continue
else:
if not os.kill(procId, 0):
return True
except OSError as e:
# No such process
logger.info("OSError: '{}'\n".format(str(e)))
return False
except Exception:
# we cannot check processId
logger.info("Error: unknown\n")
return True
return False
def __is_running(self, timeout=1500):
"""check if FEMAG is running in ZMQ mode
Args:
timeout: The timeout (in milliseconds) to wait for a response
Return:
True if FEMAG is running, False otherwise
"""
try:
request_socket = self.__req_socket()
request_socket.send_string("FSL", flags=zmq.SNDMORE)
request_socket.send_string("testvar=0")
poller = zmq.Poller()
# use POLLIN for recv, POLLOUT for send
poller.register(request_socket, zmq.POLLIN)
if poller.poll(timeout): # ok, femag is running
logger.info('femag is running for %s',
self.request_socket.getsockopt_string(
zmq.IDENTITY))
request_socket.recv_multipart()
return True
except Exception as e:
logger.error(e)
self.request_socket.close()
self.request_socket = None
logger.debug("femag is not running")
return False
[docs] def send_fsl(self, fsl, callback=None, header='FSL'):
"""sends FSL commands in ZMQ mode and blocks until commands are processed
Args:
fsl: string of FSL commands
Return:
response
"""
logger.debug("Send fsl with fsl: {}, callback: {}, header: {}".format(
fsl, callback, header))
try:
# Start the reader thread to get information about the next calculation
if callback:
self.reader = FemagReadStream(self.__sub_socket(), callback)
self.reader.setDaemon(True)
self.reader.start()
request_socket = self.__req_socket()
request_socket.send_string(header, flags=zmq.SNDMORE)
logger.debug("Sent header")
request_socket.send_string(fsl)
logger.debug("Sent fsl wait for response")
response = request_socket.recv_multipart()
logger.debug("send_fsl["+fsl+"] done")
time.sleep(.5) # Be sure all messages are arrived over zmq
if callback:
self.reader.continue_loop = False
return [s.decode() for s in response]
except Exception as e:
logger.exception("send_fsl")
msg = str(e)
return ['{"status":"error", "message":"'+msg+'"}', '{}']
[docs] def run(self, options=['-b'], restart=False, procId=None): # noqa: C901
"""invokes FEMAG in current workdir and returns pid
Args:
options: list of FEMAG options
Raises:
FemagError
"""
args = [self.cmd] + options
if self.__is_running():
if restart:
logging.info("must restart")
self.quit(True)
# check if process really finished (mq_connection)
logger.info("procId: %s", procId)
if procId:
for t in range(200):
time.sleep(0.1)
if not self.__is_process_running(procId):
break
logger.info(
"femag (pid: '{}') not stopped yet".format(
procId))
logger.info("Stopped procId: %s", procId)
else:
try:
with open(os.path.join(self.workdir,
'femag.pid'), 'r') as pidfile:
procId = int(pidfile.readline())
except Exception:
pass
return procId
if self.request_socket:
self.request_socket.close()
self.request_socket = None
basename = str(self.port)
args.append(basename)
outname = os.path.join(self.workdir, basename+'.out')
errname = os.path.join(self.workdir, basename+'.err')
with open(outname, 'w') as out, open(errname, 'w') as err:
logger.info('invoking %s', ' '.join(args))
self.proc = subprocess.Popen(
args,
stdout=out, stderr=err, cwd=self.workdir)
# check if mq is ready for listening
for t in range(200):
time.sleep(0.1)
if self.__is_running():
logger.info("femag (pid: '{}') is listening".format(
self.proc.pid))
break
# write femag.pid
logger.info("ready %s", self.proc.pid)
with open(os.path.join(self.workdir, 'femag.pid'), 'w') as pidfile:
pidfile.write("{}\n".format(self.proc.pid))
return self.proc.pid
[docs] def quit(self, save_model=False):
"""terminates femag"""
if not self.__is_running():
logger.info("Femag already stopped")
if self.proc:
self.proc.wait()
self.proc = None
return
# send exit flags
self.__req_socket()
f = '\n'.join(['exit_on_end = true',
'exit_on_error = true'])
response = self.send_fsl(f)
# send quit command
try:
response = self.send_fsl('quit', header='CONTROL')
# recvflags=zmq.NOBLOCK)
except Exception as e:
logger.error("Femag Quit zmq message %s", e)
logger.info("Sent QUIT to femag %s", response)
# if query, send a answer
obj = json.loads(response[0])
logger.debug("status: {}".format(obj['status']))
if obj['status'] == 'Query':
logger.info('query: %s => %s',
obj['message'], 'saved' if save_model else 'not saved')
# Only send one msg
request_socket = self.__req_socket()
response = request_socket.send_string(
'Ok' if save_model else 'Cancel')
if self.proc:
self.proc.wait()
self.proc = None
return response
[docs] def stopStreamReader(self):
if self.reader:
logger.debug("stop stream reader")
self.reader.continue_loop = False
def __call__(self, pmMachine, operatingConditions):
"""setup fsl file, run calculation and return BCH results"""
self.send_fsl('\n'.join(self.create_fsl(pmMachine,
operatingConditions)))
return self.read_bch(self.modelname)
[docs]class FemagReadStream(Thread):
def __init__(self, sub_socket, callback):
Thread.__init__(self)
logger.debug("Initialize reader thread")
self.sub_socket = sub_socket
# TODO: use a function to handle calls without callback
self.callback = callback if callback else self.dummy_callback
self.continue_loop = True
[docs] def dummy_callback(self, data):
"""This dummy method is used when no callback is defined.
"""
logger.warn(
"No callback defined, fallback to dummy\n >{}".format(data))
[docs] def run(self):
"""Listen for messages from femag as long as the continue_loop is True
the sub_socket has a timeout to finish the loop and thread after
the continue_loop is set to False and no messages are arrived.
"""
logger.debug(
"Start thread with while condition: {}".format(self.continue_loop))
while self.continue_loop:
try:
response = self.sub_socket.recv_multipart()
# Sometimes femag send messages with only len = 1. These messages must be ignored
if len(response) < 2:
continue
# Call the callback function
self.callback([s.decode() for s in response])
# The subscriber_socket has a timeout of 900 mil sec. If no answer was arrived
# this exception is raised - Ingnore
except zmq.error.Again:
continue
# Any other exception is shown in the error log
except Exception as e:
logger.error("error in reading output from femag {}".format(e))
continue
logger.debug("Exit reader thread")