Source code for aztk.spark.client.client

from typing import List

import azure.batch.models.batch_error as batch_error

from aztk import error
from aztk import models as base_models
from aztk.client import CoreClient
from aztk.spark import models
from aztk.spark.client.cluster import ClusterOperations
from aztk.spark.client.job import JobOperations
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.utils import util
from aztk.utils import deprecate, deprecated, helpers


[docs]class Client(CoreClient): """The client used to create and manage Spark clusters Attributes: cluster (:obj:`aztk.spark.client.cluster.ClusterOperations`): Cluster job (:obj:`aztk.spark.client.job.JobOperations`): Job """ def __init__(self, secrets_configuration: models.SecretsConfiguration = None, **kwargs): super().__init__() context = None if kwargs.get("secrets_config"): deprecate( version="0.10.0", message="secrets_config key is deprecated in secrets.yaml", advice="Please use secrets_configuration key instead.", ) context = self._get_context(kwargs.get("secrets_config")) else: context = self._get_context(secrets_configuration) self.cluster = ClusterOperations(context) self.job = JobOperations(context) # ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
[docs] @deprecated("0.10.0") def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False): return self.cluster.create(cluster_configuration=cluster_conf, wait=wait)
[docs] @deprecated("0.10.0") def create_clusters_in_parallel(self, cluster_confs): # NOT IMPLEMENTED for cluster_conf in cluster_confs: self.cluster.create(cluster_conf)
[docs] @deprecated("0.10.0") def delete_cluster(self, cluster_id: str, keep_logs: bool = False): return self.cluster.delete(id=cluster_id, keep_logs=keep_logs)
[docs] @deprecated("0.10.0") def get_cluster(self, cluster_id: str): return self.cluster.get(id=cluster_id)
[docs] @deprecated("0.10.0") def list_clusters(self): return self.cluster.list()
[docs] @deprecated("0.10.0") def get_remote_login_settings(self, cluster_id: str, node_id: str): return self.cluster.get_remote_login_settings(cluster_id, node_id)
[docs] @deprecated("0.10.0") def submit(self, cluster_id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False): return self.cluster.submit(id=cluster_id, application=application, remote=remote, wait=wait)
[docs] @deprecated("0.10.0") def submit_all_applications(self, cluster_id: str, applications): # NOT IMPLEMENTED for application in applications: self.cluster.submit(cluster_id, application)
[docs] @deprecated("0.10.0") def wait_until_application_done(self, cluster_id: str, task_id: str): # NOT IMPLEMENTED try: helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] @deprecated("0.10.0") def wait_until_applications_done(self, cluster_id: str): # NOT IMPLEMENTED try: helpers.wait_for_tasks_to_complete(job_id=cluster_id, batch_client=self.batch_client) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] @deprecated("0.10.0") def wait_until_cluster_is_ready(self, cluster_id: str): # NOT IMPLEMENTED try: util.wait_for_master_to_be_ready(self.cluster._core_cluster_operations, self.cluster, cluster_id) pool = self.batch_client.pool.get(cluster_id) nodes = self.batch_client.compute_node.list(pool_id=cluster_id) return models.Cluster(base_models.Cluster(pool, nodes)) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] @deprecated("0.10.0") def wait_until_all_clusters_are_ready(self, clusters: List[str]): # NOT IMPLEMENTED for cluster_id in clusters: self.wait_until_cluster_is_ready(cluster_id)
[docs] @deprecated("0.10.0") def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str: return self.cluster.create_user(id=cluster_id, username=username, password=password, ssh_key=ssh_key)
[docs] @deprecated("0.10.0") def get_application_log(self, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0): return self.cluster.get_application_log( id=cluster_id, application_name=application_name, tail=tail, current_bytes=current_bytes)
[docs] @deprecated("0.10.0") def get_application_status(self, cluster_id: str, app_name: str): return self.cluster.get_application_status(id=cluster_id, application_name=app_name)
[docs] @deprecated("0.10.0") def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None): return self.cluster.run(id=cluster_id, command=command, host=host, internal=internal)
[docs] @deprecated("0.10.0") def node_run(self, cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None): return self.cluster.node_run( id=cluster_id, node_id=node_id, command=command, host=host, internal=internal, timeout=timeout)
[docs] @deprecated("0.10.0") def cluster_copy( self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None, ): return self.cluster.copy( id=cluster_id, source_path=source_path, destination_path=destination_path, host=host, internal=internal, timeout=timeout, )
[docs] @deprecated("0.10.0") def cluster_download( self, cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None, ): return self.cluster.download( id=cluster_id, source_path=source_path, destination_path=destination_path, host=host, internal=internal, timeout=timeout, )
[docs] @deprecated("0.10.0") def cluster_ssh_into_master(self, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): return self.cluster._core_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
""" job submission """
[docs] @deprecated("0.10.0") def submit_job(self, job_configuration: models.JobConfiguration): return self.job.submit(job_configuration)
[docs] @deprecated("0.10.0") def list_jobs(self): return self.job.list()
[docs] @deprecated("0.10.0") def list_applications(self, job_id): return self.job.list_applications(job_id)
[docs] @deprecated("0.10.0") def get_job(self, job_id): return self.job.get(job_id)
[docs] @deprecated("0.10.0") def stop_job(self, job_id): return self.job.stop(job_id)
[docs] @deprecated("0.10.0") def delete_job(self, job_id: str, keep_logs: bool = False): return self.job.delete(job_id, keep_logs)
[docs] @deprecated("0.10.0") def get_application(self, job_id, application_name): return self.job.get_application(job_id, application_name)
[docs] @deprecated("0.10.0") def get_job_application_log(self, job_id, application_name): return self.job.get_application_log(job_id, application_name)
[docs] @deprecated("0.10.0") def stop_job_app(self, job_id, application_name): # NOT IMPLEMENTED try: return job_submit_helper.stop_app(self, job_id, application_name) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] @deprecated("0.10.0") def wait_until_job_finished(self, job_id): try: self.job.wait(job_id) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] @deprecated("0.10.0") def wait_until_all_jobs_finished(self, jobs): # NOT IMPLEMENTED for job in jobs: self.wait_until_job_finished(job)
[docs] @deprecated("0.10.0") def run_cluster_diagnostics(self, cluster_id, output_directory=None): return self.cluster.diagnostics(cluster_id, output_directory)