Source code for femagtools.amazon

"""
    femagtools.amazon
    ~~~~~~~~~~~~~~~~

    Running FEMAG on Amazon Cloud EC2




    .. note: To use this engine you have to install the boto3 module from amazon
"""
import os
import threading
import time
import logging

import femagtools.job
from .config import Config

logger = logging.getLogger(__name__)

[docs]class MissingConfigurationException(Exception): def __init__(self, message): Exception.__init__(self, "Missing configuration: {}".format(message))
[docs]class Engine(object): config_class = Config default_config = { 'ENGINE': 'amazon', 'SERVER_LOCATION': 'eu-central-1', 'INSTANCE_TYPE': 't2.micro', 'ACL': 'authenticated-read', 'IMAGE_ID': 'ami-b0cc23df', 'FINISH_TASK_FILENAME': 'exit_code', 'COMPANY_NAME': 'femag' } """The Amazon Engine This engine uses the boto3 Python module to interact with the amazon ec2 and s3 services Args: buckets (:obj:`list`): Existing buckets with femag calculation files configfile (str): Filename of config file .. :note: If possible you should use the same location for all services """ def __init__(self, buckets=None, configfile='config.ini'): self.buckets = buckets self.job = None # Amazon file storage self.s3_resource = self._create_amazon_resource('s3') # Amazon Server administration self.ec2_resource = self._create_amazon_resource('ec2') # Create instance of config self.config = Config(self.default_config) self.config.from_ini_file(configfile) def _create_amazon_resource(self, resource): import boto3 return boto3.resource(resource) def _create_data_buckets(self): """Create unique S3 Buckets for calculation Args: ACL (str): ACL-Rules for Amazon """ # If buckets exsists map them with a folder if self.buckets: for idx, bucket in enumerate(self.buckets): self.job.tasks[idx].id = bucket['id'] self.job.tasks[idx].directory = bucket['folder'] return bucketConfiguration = {'LocationConstraint': self.config['SERVER_LOCATION']} # Create a bucket for every calculation for t in self.job.tasks: self.s3_resource.create_bucket(ACL=self.config['ACL'], Bucket=t.id, CreateBucketConfiguration=bucketConfiguration) logger.debug("Created buckets") def _upload_files_to_s3(self): """Upload all files to Amazon S3 for this calculation """ if self.buckets: logger.info("Files are already uploaded") return threads = [] for t in self.job.tasks: thread = threading.Thread(target=self._upload, args=(t, )) threads.append(thread) thread.start() logger.info("Uploading files: ") self._wait_for_threads_finished(threads, "Upload files") def _upload(self, task): """Upload thread for uploading one directory :internal: Args: task (py:class:`CloudTask`): The task which belongs to the uploading folder """ # Upload one single tar_file task.tar_file.close() name = os.path.basename(task.file) Body = open(task.file, 'rb') self.s3_resource.Object(task.id, name).put(Body=Body) def _wait_for_threads_finished(self, threads, operation): """Wait until all threads are finished :internal: Args: threads (:obj:`list`): List of threads to check if they are finished operation (str): Name of the operation to write a meaningful log message """ # Wait until all threads are not running while not all([t.isAlive() is False for t in threads]): time.sleep(5) # timer.cancel logger.info("{} is finished".format(operation)) def _start_instances(self): """Start all instances for the calculation """ # Prepare arguemtns for instance start param = {'MinCount': 1, 'MaxCount': 1 } if self.config.get('IMAGE_ID', None): param['ImageId'] = self.config['IMAGE_ID'] else: raise MissingConfigurationException('image_id') if self.config.get('INSTANCE_TYPE', None): param['InstanceType'] = self.config['INSTANCE_TYPE'] else: raise MissingConfigurationException('instance_type') if self.config.get('IAM_INSTANCE_PROFILE', None): param['IamInstanceProfile'] = {'Name': self.config['IAM_INSTANCE_PROFILE'] } if self.config.get('KEY_NAME', None): param['KeyName'] = self.config['KEY_NAME'] # Set security group id as list if self.config['SECURITY_GROUP_IDS']: param['SecurityGroupIds'] = [] for security_group in [s for s in self.config['SECURITY_GROUP_IDS'] if s]: param['SecurityGroupIds'].append(security_group) param['DryRun'] = self.config.get('DRY_RUN', False) threads = [] for idx, t in enumerate(self.job.tasks): thread = threading.Thread(target=self._start_instance, args=(param, t)) threads.append(thread) thread.start() self._wait_for_threads_finished(threads, "Start instances") def _start_instance(self, param, task): """Start one instance :internal: Args: task (Task): the task for calculation """ user_data = self._read_cloud_init(task.id) if user_data: param['UserData'] = user_data instances = self.ec2_resource.create_instances(**param) instance = instances[0] # We only started one instance logger.info("Instance started: {}".format(instance.id)) self._add_tag(task.id, instance.id) instance.wait_until_running() instance.load() # Reload the data to get public dns etc. logger.info("Instance {} is running: Public dns: {}".format(instance.id, instance.public_dns_name)) task.ec2_instance = instance.id def _add_tag(self, task_id, instance_id): """Add a tag to the instance :internal: Args: task_id (int): The task id (Same as the S3 Bucket name) instance_id (int): The instance_id to set the tag to the right instance """ tag = '{}-{}'.format(task_id, self.config.get('COMPANY_NAME', 'femag')) self.ec2_resource.create_tags(Resources=[instance_id], Tags=[{'Key': 'Name', 'Value': tag}]) def _read_cloud_init(self, bucket_name): """Read the cloud init file and if there is a line which starts with {{ENV}} then put all config options as environment variables. """ user_data = "" # Set all config options as environment variable if os.path.isfile(self.config.get('CLOUD_INIT', None)): with open(self.config['CLOUD_INIT'], 'rt') as f: for line in f: if line.startswith('{{ENV}}'): # Add config for key, value in sorted(self.config.items()): user_data += "export {}={}\n".format(key, value) # add other important stuff user_data += "export BUCKET_NAME={}\n".format(bucket_name) continue user_data += line return user_data def _join(self, timeout=20, filename='exit_code'): """Wait until all instances are finished with the calulation. :internal: Args: timeout (int): How long we wait between a check filename (str): What is the filename of the exit_code """ import botocore # For exception finished_tasks = [] client = self.s3_resource.meta.client while len(finished_tasks) < len(self.job.tasks): for t in [task for task in self.job.tasks if task not in finished_tasks]: try: client.get_object(Bucket=t.id, Key=filename) except botocore.exceptions.ClientError: # Instance not ready time.sleep(2) continue finished_tasks.append(t) logger.info("Calculation is finished for instance {}".format(t.id)) self.ec2_resource.instances.filter(InstanceIds=[t.ec2_instance]).terminate() time.sleep(timeout) logger.info("Calculations are finished") def _get_result_data_from_S3(self): """Get all the calculated files to the correct folder """ import boto3 client = self.s3_resource.meta.client transfer = boto3.s3.transfer.S3Transfer(client) for t in self.job.tasks: bucket = t.id folder = t.directory files = client.list_objects(Bucket=bucket)['Contents'] logger.debug("Starting new folder") for file in files: file_name = file['Key'] transfer.download_file(bucket, file_name, os.path.join("{}/{}".format(folder, file_name))) logger.debug("Downloaded file {}".format(file_name)) def _get_status_code(self, filename='exit_code'): """Get the status code from the caluclation Args: filename (str): Filename of exit_code """ status_code = [] for t in self.job.tasks: dir = "{}/{}".format(t.directory, filename) file = open(dir, 'r') status_code.append(file.read()) return status_code def _cleanup(self): threads = [] for t in self.job.tasks: thread = threading.Thread(target=self._delete_bucket, args=(t.id, )) threads.append(thread) thread.start() logger.info("Deleting buckets: ") self._wait_for_threads_finished(threads, "Deleting buckets") # Clean up volumes client = self.ec2_resource.meta.client volumes = client.describe_volumes(Filters=[{'Name': 'status', 'Values': ['available']}])['Volumes'] for v in volumes: client.delete_volume(VolumeId=v['VolumeId']) def _delete_bucket(self, bucket_name): bucket = self.s3_resource.Bucket(bucket_name) for key in bucket.objects.all(): key.delete() bucket.delete() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # FEMAG STUFF # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] def create_job(self, workdir): """Create a FEMAG :py:class:`CloudJob` Args: workdir (str): The workdir where the calculation files are stored Return: Cloud job (:class:`CloudJob`) """ self.job = femagtools.job.CloudJob(workdir) return self.job
[docs] def submit(self): """Starts the FEMAG calculation(s) on Amazon Return: length of started tasks (int) """ self._create_data_buckets() self._upload_files_to_s3() self._start_instances() return len(self.job.tasks)
[docs] def join( self ): """Wait until all calculations are finished Return: list of all calculations status (C = Ok, X = error) (:obj:`list`) """ status = [] # Wait until all tasks are finished self._join(timeout=20, filename=self.config['FINISH_TASK_FILENAME']) # get all files self._get_result_data_from_S3() # Remove buckets if cleanup is set if int(self.config.get('DELETE_BUCKETS', 0)): self._cleanup() status = self._get_status_code(filename=self.config['FINISH_TASK_FILENAME']) for t, r in zip(self.job.tasks, status): t.status = 'C' if int(r)==0 else 'X' return status