Source code for aztk.spark.client.cluster.operations

from aztk.client.cluster import CoreClusterOperations
from aztk.spark import models
from aztk.spark.client.base import SparkBaseOperations

from .helpers import (
    copy,
    create,
    create_user,
    delete,
    diagnostics,
    download,
    get,
    get_application_log,
    get_application_status,
    get_configuration,
    get_remote_login_settings,
    list,
    node_run,
    run,
    ssh_into_master,
    submit,
    wait,
)


[docs]class ClusterOperations(SparkBaseOperations): """Spark ClusterOperations object Attributes: _core_cluster_operations (:obj:`aztk.client.cluster.CoreClusterOperations`): # _spark_base_cluster_operations (:obj:`aztk.spark.client.cluster.CoreClusterOperations`): """ def __init__(self, context): self._core_cluster_operations = CoreClusterOperations(context) # self._spark_base_cluster_operations = SparkBaseOperations()
[docs] def create(self, cluster_configuration: models.ClusterConfiguration, wait: bool = False): """Create a cluster. Args: cluster_configuration (:obj:`ClusterConfiguration`): Configuration for the cluster to be created. wait (:obj:`bool`): if True, this function will block until the cluster creation is finished. Returns: :obj:`aztk.spark.models.Cluster`: An Cluster object representing the state and configuration of the cluster. """ return create.create_cluster(self._core_cluster_operations, self, cluster_configuration, wait)
[docs] def delete(self, id: str, keep_logs: bool = False): """Delete a cluster. Args: id (:obj:`str`): the id of the cluster to delete. keep_logs (:obj:`bool`): If True, the logs related to this cluster in Azure Storage are not deleted. Defaults to False. Returns: :obj:`bool`: True if the deletion process was successful. """ return delete.delete_cluster(self._core_cluster_operations, id, keep_logs)
[docs] def get(self, id: str): """Get details about the state of a cluster. Args: id (:obj:`str`): the id of the cluster to get. Returns: :obj:`aztk.spark.models.Cluster`: A Cluster object representing the state and configuration of the cluster. """ return get.get_cluster(self._core_cluster_operations, id)
[docs] def list(self): """List all clusters. Returns: :obj:`List[aztk.spark.models.Cluster]`: List of Cluster objects each representing the state and configuration of the cluster. """ return list.list_clusters(self._core_cluster_operations)
[docs] def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False): """Submit an application to a cluster. Args: id (:obj:`str`): the id of the cluster to submit the application to. application (:obj:`aztk.spark.models.ApplicationConfiguration`): Application definition remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable by the cluster already. This is useful when your application is stored in a mounted Azure File Share and not the client. Defaults to False. wait (:obj:`bool`, optional): If True, this function blocks until the application has completed. Defaults to False. Returns: :obj:`None` """ return submit.submit(self._core_cluster_operations, self, id, application, remote, wait)
[docs] def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None): """Create a user on every node in the cluster Args: username (:obj:`str`): name of the user to create. pool_id (:obj:`str`): id of the cluster to create the user on. ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None. password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None. Returns: :obj:`None` """ return create_user.create_user(self._core_cluster_operations, self, id, username, ssh_key, password)
[docs] def get_application_status(self, id: str, application_name: str): """Get the status of a submitted application Args: id (:obj:`str`): the name of the cluster the application was submitted to application_name (:obj:`str`): the name of the application to get Returns: :obj:`str`: the status state of the application """ return get_application_status.get_application_status(self._core_cluster_operations, id, application_name)
[docs] def run(self, id: str, command: str, host=False, internal: bool = False, timeout=None): """Run a bash command on every node in the cluster Args: id (:obj:`str`): the id of the cluster to run the command on. command (:obj:`str`): the bash command to execute on the node. internal (:obj:`bool`): if true, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. container_name=None (:obj:`str`, optional): the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node. Defaults to None. Returns: :obj:`List[aztk.spark.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command """ return run.cluster_run(self._core_cluster_operations, id, command, host, internal, timeout)
[docs] def node_run(self, id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None): """Run a bash command on the given node Args: id (:obj:`str`): the id of the cluster to run the command on. node_id (:obj:`str`): the id of the node in the cluster to run the command on. command (:obj:`str`): the bash command to execute on the node. internal (:obj:`bool`): if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. container_name=None (:obj:`str`, optional): the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node. Defaults to None. Returns: :obj:`aztk.spark.models.NodeOutput`: object containing the output of the run command """ return node_run.node_run(self._core_cluster_operations, id, node_id, command, host, internal, timeout)
[docs] def copy( self, id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None, ): """Copy a file to every node in a cluster. Args: id (:obj:`str`): the id of the cluster to copy files with. source_path (:obj:`str`): the local path of the file to copy. destination_path (:obj:`str`, optional): the path on each node the file is copied to. container_name (:obj:`str`, optional): the name of the container to copy to or from. If None, the copy operation will occur on the host VM, Defaults to None. internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. timeout (:obj:`int`, optional): The timeout in seconds for establishing a connection to the node. Defaults to None. Returns: :obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command. """ return copy.cluster_copy(self._core_cluster_operations, id, source_path, destination_path, host, internal, timeout)
[docs] def download( self, id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None, ): """Download a file from every node in a cluster. Args: id (:obj:`str`): the id of the cluster to copy files with. source_path (:obj:`str`): the path of the file to copy from. destination_path (:obj:`str`, optional): the local directory path where the output should be written. If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be written to this path. Defaults to None. container_name (:obj:`str`, optional): the name of the container to copy to or from. If None, the copy operation will occur on the host VM, Defaults to None. internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. timeout (:obj:`int`, optional): The timeout in seconds for establishing a connection to the node. Defaults to None. Returns: :obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command. """ return download.cluster_download(self._core_cluster_operations, id, source_path, destination_path, host, internal, timeout)
[docs] def diagnostics(self, id, output_directory: str = None, brief: bool = False): """Download a file from every node in a cluster. Args: id (:obj:`str`): the id of the cluster to copy files with. output_directory (:obj:`str`, optional): the local directory path where the output should be written. If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be written to this path. Defaults to None. Returns: :obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command. """ return diagnostics.run_cluster_diagnostics(self, id, output_directory, brief)
[docs] def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0): """Get the log for a running or completed application Args: id (:obj:`str`): the id of the cluster to run the command on. application_name (:obj:`str`): str tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved. Only use this if streaming the log as it is being written. Defaults to False. current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved. Only useful is streaming the log as it is being written. Only used if tail is True. Returns: :obj:`aztk.spark.models.ApplicationLog`: a model representing the output of the application. """ return get_application_log.get_application_log(self._core_cluster_operations, id, application_name, tail, current_bytes)
[docs] def get_remote_login_settings(self, id: str, node_id: str): """Get the remote login information for a node in a cluster Args: id (:obj:`str`): the id of the cluster the node is in node_id (:obj:`str`): the id of the node in the cluster Returns: :obj:`aztk.spark.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node """ return get_remote_login_settings.get_remote_login_settings(self._core_cluster_operations, id, node_id)
[docs] def wait(self, id: str, application_name: str): """Wait until the application has completed Args: id (:obj:`str`): the id of the cluster the application was submitted to application_name (:obj:`str`): the name of the application to wait for Returns: :obj:`None` """ return wait.wait_for_application_to_complete(self._core_cluster_operations, id, application_name)
[docs] def get_configuration(self, id: str): """Get the initial configuration of the cluster Args: id (:obj:`str`): the id of the cluster Returns: :obj:`aztk.spark.models.ClusterConfiguration` """ return get_configuration.get_configuration(self._core_cluster_operations, id)
[docs] def ssh_into_master(self, id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): """Open an SSH tunnel to the Spark master node and forward the specified ports Args: id (:obj:`str`): the id of the cluster username (:obj:`str`): the name of the user to open the ssh session with ssh_key (:obj:`str`, optional): the ssh_key to authenticate the ssh user with. Must specify either `ssh_key` or `password`. password (:obj:`str`, optional): the password to authenticate the ssh user with. Must specify either `password` or `ssh_key`. port_forward_list (:obj:`aztk.spark.models.PortForwardingSpecification`, optional): List of the ports to forward. internal (:obj:`str`, optional): if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. """ return ssh_into_master.ssh_into_master(self, self._core_cluster_operations, id, username, ssh_key, password, port_forward_list, internal)