Source code for access.client

import os
import getpass

from . import helper
from . import util


[docs] def connect(url, username, http_connection_timeout=10): """ Create access client connection using provided information :param str url: Access web server URL :param str username: User to use for accessing APIs :param int http_connection_timeout: Maximum time(in seconds) to wait for establishing HTTP connection to Access web REST APIs :rtype: AccessClient :return: Access client object """ # TODO: URL validation oidc_issuer_url = url + util.access_services_context_path + util.oidc_service_context_path api_gateway_conf = util.APIGatewayConfig(url) access_services_conf = util.AccessServicesConfig("", "", 0, util.access_services_context_path) oidc_config = util.OIDCConfig(oidc_issuer_url, oidc_issuer_url, "ssh", "access-web", "ZS1hcHAtc2VjcmV0ZXhhbXBs", "openid profile email offline_access", False, "") access_py_sdk_config = util.AccessPySDKConfig(http_connection_timeout) access_config = util.AccessConfig(access_services_conf, api_gateway_conf, oidc_config, access_py_sdk_config) password = getpass.getpass(prompt="Enter password for user '%s':" % username) return AccessClient(access_config, username, password)
[docs] class AccessClient(object): """ Provides APIs to communicate with access web services. It is created using function :func:`connect` :param util.AccessConfig access_config: Access configuration :param str username: User to use for accessing APIs :param str password: Password of the user """ def __init__(self, access_config, username, password): """ Create Access client object :param util.AccessConfig access_config: Access configuration :param str username: User to use for accessing APIs :param str password: Password of the user """ # set global access configuration util.ACCESS_CONFIG = access_config access_token = util.create_access_token_with_credentials(username, password) # set global access token util.TOKEN_USER = username util.TOKEN_USER_PASSWORD = password util.ACCESS_TOKEN = access_token
[docs] def submit_job(self, application_id, job_inputs, server_name=""): """ Submit job to application with given inputs :param str application_id: Application to which job has to be submitted :param dict[str, str] job_inputs: Inputs to job according to application definition. All values should be provided in string format. Example: job_input = {"CORES":"1","JOB_NAME":"mbdsystemlvlopt", "MEMORY_PLACEMENT":"true", "PRIMARY_FILE":"/stage/mbdsystemlvlopt.fem", "INCLUDE_FILES":"/stage/dvgrids.fem;/stage/grids.fem", "SUBMISSION_DIRECTORY":"/stage/david/generate_data" .......} :param str server_name: Name of server to which job has to be submitted. If not mentioned job will be submitted to the first server containing application 'application_id' :rtype: Job :return: Job object representing the job """ job_id, server_name = helper.submit_job(application_id, job_inputs, server_name=server_name) return Job(job_id, server_name)
[docs] def clean(self): """ Delete local access temporary directory :rtype: str :return: Local access temporary directory path """ access_temp_dir_path = util.get_access_temp_dir_path() util.delete_dir_if_exist(access_temp_dir_path) return access_temp_dir_path
[docs] def close(self): """ Close connection to access web server""" util.destroy_token()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False
[docs] class Job(object): """ This class represents the job and provides functions to perform operations on this job. It is created using method :meth:`AccessClient.submit_job` :param str job_id: ID of the job :param str server_name: Name of the server to which job belongs to """ def __init__(self, job_id, server_name): """ Create job object :param str job_id: ID of the job :param str server_name: name of the server to which job belongs to """ if not job_id: raise util.APIException(util.ERROR_CODE_JOBID_EMPTY, "job id is empty") if not server_name: raise util.APIException(util.ERROR_CODE_SERVER_NAME_EMPTY, "server name is empty") self.job_id = job_id self.server_name = server_name
[docs] def status(self): """ Get current status of the job Job state can be: - B : (Job arrays only) Job array is begun, meaning that at least one subjob has started - D : (Access Desktop only) Job is downloading files from head node to local machine - E : Job is exiting after having run - F : Job is finished. Job might be completed execution, failed during execution, or terminated - H : Job is held. A job is put into a held state by the server or by a user or administrator. A job stays in a held state until it is released by a user or administrator - M : Job was moved to another server - Q : Job is queued, eligible to run or be routed - R : Job is running - S : Job is suspended by scheduler. A job is put into the suspended state when a higher priority job needs the resources - T : Job is in transition to or from a server - U : (Access Desktop only) Job is uploading files from local machine to head node - W : Job is waiting for its requested execution time to be reached, or job is delayed due to stagein failure - X : (Subjobs only) Subjob is finished(expired.) :rtype: str :return: Job's current status """ return helper.get_job_status(self.job_id, self.server_name)
[docs] def wait(self): """ Wait for job to finish :rtype: Job :return: Current job object """ job_status = "R" while job_status != "F" and job_status != "C": job_status = helper.get_job_status(self.job_id, self.server_name) return self
[docs] def terminate(self): """ Terminate a running job :rtype: Job :return: Current job object """ helper.terminate_job(self.job_id, self.server_name) return self
[docs] def delete(self, delete_job_output_directory=False): """ Remove job completely irrespective of its state(running, queue, finished etc.) :param bool delete_job_output_directory: If True, after deleting job it's output directory will also be deleted :rtype: Job :return: Current job object """ helper.delete_job(self.job_id, self.server_name, delete_job_output_directory) return self
[docs] def download_file(self, file_name, dst_dir_path=""): """ Once job is finished, using this function a file from job output directory can be downloaded :param str file_name: Name of the file to be downloaded :param str dst_dir_path: Path of the directory where file should be downloaded. If not specified it will be downloaded to job's local temporary directory. :rtype: str :return: Path of the downloaded file """ if not file_name or not file_name.strip(): raise util.APIException(util.ERROR_CODE_OPERATION_FAILED, "Name of the file to download is empty") job_details_dict = helper.get_job_details(self.job_id, self.server_name) job_status = util.get_job_status_from_job_details(job_details_dict) if job_status != "F" and job_status != "C": raise util.APIException(util.ERROR_CODE_INVALID_OPERATION_FOR_JOB_STATE, "Job should be completed but its in state '%s'" % job_status) job_output_dir_path = job_details_dict.get('outputFiles') if not job_output_dir_path: raise util.APIException(util.ERROR_CODE_JOB_DETAILS_EMPTY, "GetJobs API did not return 'outputFiles' path") if job_output_dir_path.startswith("*@"): job_output_dir_path = job_output_dir_path[job_output_dir_path.find(":") + 1:] src_file_path = job_output_dir_path + util.linux_path_seperator + file_name if not dst_dir_path: dst_dir_path = util.get_job_local_temp_dir_path(self.job_id) if not os.path.exists(dst_dir_path): try: os.makedirs(dst_dir_path) except Exception as err: raise util.APIException(util.ERROR_CODE_OPERATION_FAILED, "Creating destination directory '%s' failed: %s" % (dst_dir_path, err)) dst_file_path = dst_dir_path + util.linux_path_seperator + file_name helper.download_file(src_file_path, dst_file_path, server_name=self.server_name) return dst_file_path
[docs] def clean(self): """ Delete job's local temporary directory :rtype: str :return: Path of job's local temporary directory """ job_temp_dir_path = util.get_job_local_temp_dir_path(self.job_id) util.delete_dir_if_exist(job_temp_dir_path) return self
def __str__(self): return "Job[Job ID: %s, Server Name: %s]" % (self.job_id, self.server_name)