Source code for aztk.spark.client

from typing import List

import azure.batch.models.batch_error as batch_error

import aztk
from aztk import error
from aztk.client import Client as BaseClient
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.spark.helpers import create_cluster as create_cluster_helper
from aztk.spark.helpers import get_log as get_log_helper
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.helpers import submit as cluster_submit_helper
from aztk.spark.helpers import cluster_diagnostic_helper
from aztk.spark.utils import util
from aztk.utils import helpers


[docs]class Client(BaseClient): """ Aztk Spark Client This is the main entry point for using aztk for spark Args: secrets_config(aztk.spark.models.models.SecretsConfiguration): Configuration with all the needed credentials """
[docs] def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False): """ Create a new aztk spark cluster Args: cluster_conf(aztk.spark.models.models.ClusterConfiguration): Configuration for the the cluster to be created wait(bool): If you should wait for the cluster to be ready before returning Returns: aztk.spark.models.Cluster """ cluster_conf = _apply_default_for_cluster_config(cluster_conf) cluster_conf.validate() cluster_data = self._get_cluster_data(cluster_conf.cluster_id) try: zip_resource_files = None node_data = NodeData(cluster_conf).add_core().done() zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file() start_task = create_cluster_helper.generate_cluster_start_task(self, zip_resource_files, cluster_conf.cluster_id, cluster_conf.gpu_enabled(), cluster_conf.get_docker_repo(), cluster_conf.file_shares, cluster_conf.plugins, cluster_conf.mixed_mode(), cluster_conf.worker_on_master) software_metadata_key = "spark" vm_image = models.VmImage( publisher='Canonical', offer='UbuntuServer', sku='16.04') cluster = self.__create_pool_and_job( cluster_conf, software_metadata_key, start_task, vm_image) # Wait for the master to be ready if wait: util.wait_for_master_to_be_ready(self, cluster.id) cluster = self.get_cluster(cluster.id) return cluster except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def create_clusters_in_parallel(self, cluster_confs): for cluster_conf in cluster_confs: self.create_cluster(cluster_conf)
[docs] def delete_cluster(self, cluster_id: str, keep_logs: bool = False): try: return self.__delete_pool_and_job(cluster_id, keep_logs) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_cluster(self, cluster_id: str): try: pool, nodes = self.__get_pool_details(cluster_id) return models.Cluster(pool, nodes) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def list_clusters(self): try: return [models.Cluster(pool) for pool in self.__list_clusters(aztk.models.Software.spark)] except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_remote_login_settings(self, cluster_id: str, node_id: str): try: return self.__get_remote_login_settings(cluster_id, node_id) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def submit(self, cluster_id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False): try: cluster_submit_helper.submit_application(self, cluster_id, application, remote, wait) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def submit_all_applications(self, cluster_id: str, applications): for application in applications: self.submit(cluster_id, application)
[docs] def wait_until_application_done(self, cluster_id: str, task_id: str): try: helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def wait_until_applications_done(self, cluster_id: str): try: helpers.wait_for_tasks_to_complete(job_id=cluster_id, batch_client=self.batch_client) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def wait_until_cluster_is_ready(self, cluster_id: str): try: util.wait_for_master_to_be_ready(self, cluster_id) pool = self.batch_client.pool.get(cluster_id) nodes = self.batch_client.compute_node.list(pool_id=cluster_id) return models.Cluster(pool, nodes) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def wait_until_all_clusters_are_ready(self, clusters: List[str]): for cluster_id in clusters: self.wait_until_cluster_is_ready(cluster_id)
[docs] def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str: try: cluster = self.get_cluster(cluster_id) master_node_id = cluster.master_node_id if not master_node_id: raise error.ClusterNotReadyError("The master has not yet been picked, a user cannot be added.") self.__create_user_on_pool(username, cluster.id, cluster.nodes, ssh_key, password) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_application_log(self, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0): try: return get_log_helper.get_log(self.batch_client, self.blob_client, cluster_id, application_name, tail, current_bytes) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_application_status(self, cluster_id: str, app_name: str): try: task = self.batch_client.task.get(cluster_id, app_name) return task.state._value_ except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None): try: return self.__cluster_run(cluster_id, command, internal, container_name='spark' if not host else None, timeout=timeout) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def node_run(self, cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None): try: return self.__node_run(cluster_id, node_id, command, internal, container_name='spark' if not host else None, timeout=timeout) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None): try: container_name = None if host else 'spark' return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=False, internal=internal, timeout=timeout) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None): try: container_name = None if host else 'spark' return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=True, internal=internal, timeout=timeout) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def cluster_ssh_into_master(self, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): try: self.__ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
''' job submission '''
[docs] def submit_job(self, job_configuration: models.JobConfiguration): try: job_configuration = _apply_default_for_job_config(job_configuration) job_configuration.validate() cluster_data = self._get_cluster_data(job_configuration.id) node_data = NodeData(job_configuration.to_cluster_config()).add_core().done() zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file() start_task = create_cluster_helper.generate_cluster_start_task(self, zip_resource_files, job_configuration.id, job_configuration.gpu_enabled, job_configuration.get_docker_repo(), mixed_mode=job_configuration.mixed_mode(), worker_on_master=job_configuration.worker_on_master) application_tasks = [] for application in job_configuration.applications: application_tasks.append( (application, cluster_submit_helper.generate_task(self, job_configuration.id, application)) ) job_manager_task = job_submit_helper.generate_task(self, job_configuration, application_tasks) software_metadata_key = "spark" vm_image = models.VmImage( publisher='Canonical', offer='UbuntuServer', sku='16.04') autoscale_formula = "$TargetDedicatedNodes = {0}; " \ "$TargetLowPriorityNodes = {1}".format( job_configuration.max_dedicated_nodes, job_configuration.max_low_pri_nodes) job = self.__submit_job( job_configuration=job_configuration, start_task=start_task, job_manager_task=job_manager_task, autoscale_formula=autoscale_formula, software_metadata_key=software_metadata_key, vm_image_model=vm_image, application_metadata='\n'.join(application.name for application in (job_configuration.applications or []))) return models.Job(job) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def list_jobs(self): try: return [models.Job(cloud_job_schedule) for cloud_job_schedule in job_submit_helper.list_jobs(self)] except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def list_applications(self, job_id): try: applications = job_submit_helper.list_applications(self, job_id) for item in applications: if applications[item]: applications[item] = models.Application(applications[item]) return applications except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_job(self, job_id): try: job, apps, pool, nodes = job_submit_helper.get_job(self, job_id) return models.Job(job, apps, pool, nodes) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def stop_job(self, job_id): try: return job_submit_helper.stop(self, job_id) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def delete_job(self, job_id: str, keep_logs: bool = False): try: return job_submit_helper.delete(self, job_id, keep_logs) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_application(self, job_id, application_name): try: return models.Application(job_submit_helper.get_application(self, job_id, application_name)) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def get_job_application_log(self, job_id, application_name): try: return job_submit_helper.get_application_log(self, job_id, application_name) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def stop_job_app(self, job_id, application_name): try: return job_submit_helper.stop_app(self, job_id, application_name) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def wait_until_job_finished(self, job_id): try: job_submit_helper.wait_until_job_finished(self, job_id) except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
[docs] def wait_until_all_jobs_finished(self, jobs): for job in jobs: self.wait_until_job_finished(job)
[docs] def run_cluster_diagnostics(self, cluster_id, output_directory): try: output = cluster_diagnostic_helper.run(self, cluster_id, output_directory) return output except batch_error.BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e))
def _default_scheduling_target(vm_count: int): if vm_count == 0: return models.SchedulingTarget.Any else: return models.SchedulingTarget.Dedicated def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration): cluster_conf = models.ClusterConfiguration() cluster_conf.merge(configuration) if cluster_conf.scheduling_target is None: cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size) return cluster_conf def _apply_default_for_job_config(job_conf: models.JobConfiguration): if job_conf.scheduling_target is None: job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes) return job_conf