from aztk import models
from aztk.internal import cluster_data
from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_recent_job,
get_remote_login_settings, get_task_state, list_tasks, node_run, run, ssh_into_node, task_table)
[docs]class BaseOperations:
"""Base operations that all other operations have as an attribute
Attributes:
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`):
Model that holds AZTK secrets used to authenticate with Azure and the clusters.
"""
def __init__(self, context):
self.batch_client = context["batch_client"]
self.blob_client = context["blob_client"]
self.table_service = context["table_service"]
self.secrets_configuration = context["secrets_configuration"]
[docs] def get_cluster_configuration(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key
or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`aztk.models.ClusterConfiguration`: Object representing the cluster's configuration
"""
return self.get_cluster_data(id).read_cluster_config()
[docs] def get_cluster_data(self, id: str) -> cluster_data.ClusterData:
"""Gets the ClusterData object to manage data related to the given cluster
Args:
id (:obj:`str`): the id of the cluster to get
Returns:
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster
"""
return cluster_data.ClusterData(self.blob_client, id)
[docs] def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`None`
"""
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal)
[docs] def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None):
"""Create a user on a node
Args:
id (:obj:`str`): id of the cluster to create the user on.
node_id (:obj:`str`): id of the node in the cluster to create the user on.
username (:obj:`str`): name of the user to create.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
password (:obj:`str`, optional): password for the user, must use ssh_key or password.
Returns:
:obj:`None`
"""
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password)
# TODO: remove nodes as param
[docs] def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None):
"""Create a user on every node in the cluster
Args:
username (:obj:`str`): name of the user to create.
id (:obj:`str`): id of the cluster to create the user on.
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
Returns:
:obj:`None`
"""
return create_user_on_cluster.create_user_on_cluster(self, id, nodes, username, ssh_pub_key, password)
[docs] def generate_user_on_node(self, id, node_id):
"""Create a user with an autogenerated username and ssh_key on the given node.
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_node.generate_user_on_node(self, id, node_id)
# TODO: remove nodes as param
[docs] def generate_user_on_cluster(self, id, nodes):
"""Create a user with an autogenerated username and ssh_key on the cluster
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_cluster.generate_user_on_cluster(self, id, nodes)
[docs] def delete_user_on_node(self, id: str, node_id: str, username: str) -> str:
"""Delete a user on a node
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_node.delete_user(self, id, node_id, username)
# TODO: remove nodes as param
[docs] def delete_user_on_cluster(self, username, id, nodes):
"""Delete a user on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes)
[docs] def node_run(self, id, node_id, command, internal, container_name=None, timeout=None, block=True):
"""Run a bash command on the given node
Args:
id (:obj:`str`): the id of the cluster to run the command on.
node_id (:obj:`str`): the id of the node in the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
block=True (:obj:`bool`, optional): If True, the command blocks until execution is complete.
Returns:
:obj:`aztk.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout, block)
[docs] def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node in the cluster
Returns:
:obj:`aztk.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node
"""
return get_remote_login_settings.get_remote_login_settings(self, id, node_id)
[docs] def run(self, id, command, internal, container_name=None, timeout=None):
"""Run a bash command on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if true, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[azkt.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command
"""
return run.cluster_run(self, id, command, internal, container_name, timeout)
[docs] def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application
Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes.
Otherwise, the whole log will be retrieved. Only use this if streaming the log as it is being written.
Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes
are retrieved. Only useful is streaming the log as it is being written. Only used if tail is True.
Returns:
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes)
[docs] def create_task_table(self, id: str):
"""Create an Azure Table Storage to track tasks
Args:
id (:obj:`str`): the id of the cluster
"""
return task_table.create_task_table(self.table_service, id)
[docs] def list_task_table_entries(self, id):
"""list tasks in a storage table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table
"""
return task_table.list_task_table_entries(self.table_service, id)
[docs] def get_task_from_table(self, id, task_id):
"""Create a storage table to track tasks
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: the task with id task_id from the cluster's storage table
"""
return task_table.get_task_from_table(self.table_service, id, task_id)
[docs] def insert_task_into_task_table(self, id, task):
"""Insert a task into the table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`aztk.models.Task`: a model representing an entry in the Task table
"""
return task_table.insert_task_into_task_table(self.table_service, id, task)
[docs] def update_task_in_task_table(self, id, task):
"""Update a task in the table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`aztk.models.Task`: a model representing an entry in the Task table
"""
return task_table.update_task_in_task_table(self.table_service, id, task)
[docs] def delete_task_table(self, id):
"""Delete the table that tracks tasks
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`bool`: if True, the deletion was successful
"""
return task_table.delete_task_table(self.table_service, id)
[docs] def list_tasks(self, id):
"""list tasks in a storage table
Args:
id (:obj:`str`): the id of the cluster
Returns:
:obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table
"""
return list_tasks.list_tasks(self, id)
[docs] def get_recent_job(self, id):
"""Get the most recently run job in an Azure Batch job schedule
Args:
id (:obj:`str`): the id of the job schedule
Returns:
:obj:`[azure.batch.models.Job]`: the most recently run job on the job schedule
"""
return get_recent_job.get_recent_job(self, id)
[docs] def get_task_state(self, id: str, task_name: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
task_name (:obj:`str`): the name of the task to get
Returns:
:obj:`str`: the status state of the task
"""
return get_task_state.get_task_state(self, id, task_name)
[docs] def list_batch_tasks(self, id: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
Returns:
:obj:`[aztk.models.Task]`: list of aztk tasks
"""
return task_table.list_batch_tasks(self.batch_client, id)
[docs] def get_batch_task(self, id: str, task_id: str):
"""Get the status of a submitted task
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
task_id (:obj:`str`): the name of the task to get
Returns:
:obj:`aztk.models.Task`: aztk Task representing the Batch Task
"""
return task_table.get_batch_task(self.batch_client, id, task_id)