import os
import getpass
from . import helper
from . import util
[docs]
def connect(url, username, http_connection_timeout=10):
""" Create access client connection using provided information
:param str url: Access web server URL
:param str username: User to use for accessing APIs
:param int http_connection_timeout: Maximum time(in seconds) to wait for establishing HTTP connection to Access web
REST APIs
:rtype: AccessClient
:return: Access client object
"""
# TODO: URL validation
oidc_issuer_url = url + util.access_services_context_path + util.oidc_service_context_path
api_gateway_conf = util.APIGatewayConfig(url)
access_services_conf = util.AccessServicesConfig("", "", 0, util.access_services_context_path)
oidc_config = util.OIDCConfig(oidc_issuer_url, oidc_issuer_url, "ssh", "access-web", "ZS1hcHAtc2VjcmV0ZXhhbXBs",
"openid profile email offline_access", False, "")
access_py_sdk_config = util.AccessPySDKConfig(http_connection_timeout)
access_config = util.AccessConfig(access_services_conf, api_gateway_conf, oidc_config, access_py_sdk_config)
password = getpass.getpass(prompt="Enter password for user '%s':" % username)
return AccessClient(access_config, username, password)
[docs]
class AccessClient(object):
""" Provides APIs to communicate with access web services. It is created using function :func:`connect`
:param util.AccessConfig access_config: Access configuration
:param str username: User to use for accessing APIs
:param str password: Password of the user
"""
def __init__(self, access_config, username, password):
""" Create Access client object
:param util.AccessConfig access_config: Access configuration
:param str username: User to use for accessing APIs
:param str password: Password of the user
"""
# set global access configuration
util.ACCESS_CONFIG = access_config
access_token = util.create_access_token_with_credentials(username, password)
# set global access token
util.TOKEN_USER = username
util.TOKEN_USER_PASSWORD = password
util.ACCESS_TOKEN = access_token
[docs]
def submit_job(self, application_id, job_inputs, server_name=""):
""" Submit job to application with given inputs
:param str application_id: Application to which job has to be submitted
:param dict[str, str] job_inputs: Inputs to job according to application definition. All values should be
provided in string format.
Example: job_input = {"CORES":"1","JOB_NAME":"mbdsystemlvlopt",
"MEMORY_PLACEMENT":"true",
"PRIMARY_FILE":"/stage/mbdsystemlvlopt.fem",
"INCLUDE_FILES":"/stage/dvgrids.fem;/stage/grids.fem",
"SUBMISSION_DIRECTORY":"/stage/david/generate_data" .......}
:param str server_name: Name of server to which job has to be submitted. If not mentioned job will be
submitted to the first server containing application 'application_id'
:rtype: Job
:return: Job object representing the job
"""
job_id, server_name = helper.submit_job(application_id, job_inputs, server_name=server_name)
return Job(job_id, server_name)
[docs]
def clean(self):
""" Delete local access temporary directory
:rtype: str
:return: Local access temporary directory path
"""
access_temp_dir_path = util.get_access_temp_dir_path()
util.delete_dir_if_exist(access_temp_dir_path)
return access_temp_dir_path
[docs]
def close(self):
""" Close connection to access web server"""
util.destroy_token()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
[docs]
class Job(object):
""" This class represents the job and provides functions to perform operations on this job. It is created using method :meth:`AccessClient.submit_job`
:param str job_id: ID of the job
:param str server_name: Name of the server to which job belongs to
"""
def __init__(self, job_id, server_name):
""" Create job object
:param str job_id: ID of the job
:param str server_name: name of the server to which job belongs to
"""
if not job_id:
raise util.APIException(util.ERROR_CODE_JOBID_EMPTY, "job id is empty")
if not server_name:
raise util.APIException(util.ERROR_CODE_SERVER_NAME_EMPTY, "server name is empty")
self.job_id = job_id
self.server_name = server_name
[docs]
def status(self):
""" Get current status of the job
Job state can be:
- B : (Job arrays only) Job array is begun, meaning that at least one subjob has started
- D : (Access Desktop only) Job is downloading files from head node to local machine
- E : Job is exiting after having run
- F : Job is finished. Job might be completed execution, failed during execution, or terminated
- H : Job is held. A job is put into a held state by the server or by a user or administrator. A job stays in a held state until it is released by a user or administrator
- M : Job was moved to another server
- Q : Job is queued, eligible to run or be routed
- R : Job is running
- S : Job is suspended by scheduler. A job is put into the suspended state when a higher priority job needs the resources
- T : Job is in transition to or from a server
- U : (Access Desktop only) Job is uploading files from local machine to head node
- W : Job is waiting for its requested execution time to be reached, or job is delayed due to stagein failure
- X : (Subjobs only) Subjob is finished(expired.)
:rtype: str
:return: Job's current status
"""
return helper.get_job_status(self.job_id, self.server_name)
[docs]
def wait(self):
""" Wait for job to finish
:rtype: Job
:return: Current job object
"""
job_status = "R"
while job_status != "F" and job_status != "C":
job_status = helper.get_job_status(self.job_id, self.server_name)
return self
[docs]
def terminate(self):
""" Terminate a running job
:rtype: Job
:return: Current job object
"""
helper.terminate_job(self.job_id, self.server_name)
return self
[docs]
def delete(self, delete_job_output_directory=False):
""" Remove job completely irrespective of its state(running, queue, finished etc.)
:param bool delete_job_output_directory: If True, after deleting job it's output directory will also be deleted
:rtype: Job
:return: Current job object
"""
helper.delete_job(self.job_id, self.server_name, delete_job_output_directory)
return self
[docs]
def download_file(self, file_name, dst_dir_path=""):
""" Once job is finished, using this function a file from job output directory can be downloaded
:param str file_name: Name of the file to be downloaded
:param str dst_dir_path: Path of the directory where file should be downloaded. If not specified it will be
downloaded to job's local temporary directory.
:rtype: str
:return: Path of the downloaded file
"""
if not file_name or not file_name.strip():
raise util.APIException(util.ERROR_CODE_OPERATION_FAILED, "Name of the file to download is empty")
job_details_dict = helper.get_job_details(self.job_id, self.server_name)
job_status = util.get_job_status_from_job_details(job_details_dict)
if job_status != "F" and job_status != "C":
raise util.APIException(util.ERROR_CODE_INVALID_OPERATION_FOR_JOB_STATE,
"Job should be completed but its in state '%s'" % job_status)
job_output_dir_path = job_details_dict.get('outputFiles')
if not job_output_dir_path:
raise util.APIException(util.ERROR_CODE_JOB_DETAILS_EMPTY, "GetJobs API did not return 'outputFiles' path")
if job_output_dir_path.startswith("*@"):
job_output_dir_path = job_output_dir_path[job_output_dir_path.find(":") + 1:]
src_file_path = job_output_dir_path + util.linux_path_seperator + file_name
if not dst_dir_path:
dst_dir_path = util.get_job_local_temp_dir_path(self.job_id)
if not os.path.exists(dst_dir_path):
try:
os.makedirs(dst_dir_path)
except Exception as err:
raise util.APIException(util.ERROR_CODE_OPERATION_FAILED,
"Creating destination directory '%s' failed: %s" % (dst_dir_path, err))
dst_file_path = dst_dir_path + util.linux_path_seperator + file_name
helper.download_file(src_file_path, dst_file_path, server_name=self.server_name)
return dst_file_path
[docs]
def clean(self):
""" Delete job's local temporary directory
:rtype: str
:return: Path of job's local temporary directory
"""
job_temp_dir_path = util.get_job_local_temp_dir_path(self.job_id)
util.delete_dir_if_exist(job_temp_dir_path)
return self
def __str__(self):
return "Job[Job ID: %s, Server Name: %s]" % (self.job_id, self.server_name)