Azure Distributed Data Engineering Toolkit

Azure Distributed Data Engineering Toolkit (AZTK) is a python CLI application for provisioning on-demand Spark on Docker clusters in Azure. It’s a cheap and easy way to get up and running with a Spark cluster, and a great tool for Spark users who want to experiment and start testing at scale.

This toolkit is built on top of Azure Batch but does not require any Azure Batch knowledge to use.

Getting Started

The minimum requirements to get started with this package are:

  • Python 3.5+, pip 9.0.1+
  • An Azure account
  • An Azure Batch account
  • An Azure Storage account

Cloning and installing the project

  1. Clone the repo

  2. Make sure you are running python 3.5 or greater. If the default version on your machine is python 2 make sure to run the following commands with pip3 instead of pip.

  3. Install aztk:

    pip install -e .
    
  4. Initialize your environment:

    Navigate to the directory you wish to use as your spark development environment, and run:

    aztk spark init
    

    This will create a .aztk folder with preset configuration files in your current working directory.

    If you would like to initialize your AZTK clusters with a specific development toolset, please pass one of the following flags:

    aztk spark init --python
    aztk spark init --R
    aztk spark init --scala
    aztk spark init --java
    

    If you wish to have global configuration files that will be read regardless of your current working directory, run:

    aztk spark init --global
    

    This will put default configuration files in your home directory, ~/. Please note that configuration files in your current working directory will take precedence over global configuration files in your home directory.

Setting up your accounts

Using the account setup script

A script to create and configure the Azure resources required to use aztk is provided. For more more information and usage, see Getting Started Script

Manual resource creation

To finish setting up, you need to fill out your Azure Batch and Azure Storage secrets in .aztk/secrets.yaml. We’d also recommend that you enter SSH key info in this file too.

Please note that if you use ssh keys and a have a non-standard ssh key file name or path, you will need to specify the location of your ssh public and private keys. To do so, set them as shown below:

# SSH keys used to create a user and connect to a server.
# The public key can either be the public key itself(ssh-rsa ...) or the path to the ssh key.
# The private key must be the path to the key.
ssh_pub_key: ~/.ssh/my-public-key.pub
ssh_priv_key: ~/.ssh/my-private-key
  1. Log into Azure If you do not already have an Azure account, go to https://azure.microsoft.com/ to get started for free today.

    Once you have one, simply log in and go to the Azure Portal to start creating your Azure Batch account and Azure Storage account.

Using AAD

To get the required keys for your Azure Active Directory (AAD) Service Principal, Azure Batch Account and Azure Storage Account, please follow these instructions. Note that this is the recommended path for use with AZTK, as some features require AAD and are disabled if using Shared Key authentication.

  1. Register an Azure Active Directory (AAD) Application
  • Navigate to Azure Active Directory by searching in “All Services”. Click “Properties” and record the value in the “Directory ID” field. This is your tenant ID.

  • Navigate to App Registrations by searching in “All Services”.

  • Click the “+ New application registration” option at the top left of the window. Fill in the necessary fields for the “Create” form. For “Application type” use “Web app/ API.”

  • Click on the newly created App to reveal more info. Record the Application ID (for use as Client ID). Then click “Settings”, then “Keys.” Create a new password using the provided form, ensure to copy and save the password as it will only be revealed once. This password is used as the credential in secrets.yaml.

  1. Create a Storage Account
  • Click the ‘+’ button at the top left of the screen and search for ‘Storage’. Select ‘Storage account - blob, file, table, queue’ and click ‘Create’

  • Fill in the form and create the Storage account.

  • Record the Storage account’s resource ID.

  • Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.

  1. Create a Batch Account
  • Click the ‘+’ button at the top left of the screen and search for ‘Compute’. Select ‘Batch’ and click ‘Create’

  • Fill in the form and create the Batch account.

  • Navigate to your newly created Batch Account and record it’s resource ID by clicking “Properties” and copying.

  • Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.

  1. Save your account credentials into the secrets.yaml file
  • Open the secrets.yaml file in the .aztk folder in your current working directory (if .aztk doesn’t exist, run aztk spark init). Fill in all of the fields as described below.
  • Fill in the service_principal block with your recorded values as shown below:
service_principal:
    tenant_id: <AAD Diretory ID>
    client_id: <AAD App Application ID>
    credential: <AAD App Password>
    batch_account_resource_id: </batch/account/resource/id>
    storage_account_resource_id: </storage/account/resource/id>

Using Shared Keys

Please note that using Shared Keys prevents the use of certain AZTK features including Mixed Mode clusters and support for VNETs.

To get the required keys for Azure Batch and Azure Storage, please follow the below instructions:

  1. Create a Storage account
  • Click the ‘+’ button at the top left of the screen and search for ‘Storage’. Select ‘Storage account - blob, file, table, queue’ and click ‘Create’

  • Fill in the form and create the Storage account.

  1. Create a Batch account
  • Click the ‘+’ button at the top left of the screen and search for ‘Compute’. Select ‘Batch’ and click ‘Create’

  • Fill in the form and create the Batch account.

  1. Save your account credentials into the secrets.yaml file
  • Open the secrets.yaml file in the .aztk folder in your current working directory (if .aztk doesn’t exist, run aztk spark init). Fill in all of the fields as described below.
  • Go to the accounts in the Azure portal and copy paste the account names, keys and other information needed into the secrets file.

Storage account

For the Storage account, copy the name and one of the two keys:

Batch account

For the Batch account, copy the name, the url and one of the two keys:

Getting Started Script

The provided account setup script creates and configures all of the required Azure resources.

The script will create and configure the following resources:

  • Resource group
  • Storage account
  • Batch account
  • Azure Active Directory application and service principal

The script outputs all of the necessary information to use aztk, just copy the output into the .aztk/secrets.yaml file created when running aztk spark init.

Usage

Copy and paste the following into an Azure Cloud Shell:

wget -q https://raw.githubusercontent.com/Azure/aztk/v0.8.1/account_setup.sh &&
chmod 755 account_setup.sh &&
/bin/bash account_setup.sh

A series of prompts will appear, and you can set the values you desire for each field. Default values appear in brackets [] and will be used if no value is provided.

Azure Region [westus]:
Resource Group Name [aztk]:
Storage Account Name [aztkstorage]:
Batch Account Name [aztkbatch]:
Active Directory Application Name [aztkapplication]:
Active Directory Application Credential Name [aztk]:

Once the script has finished running you will see the following output:

service_principal:
    tenant_id: <AAD Diretory ID>
    client_id: <AAD App Application ID>
    credential: <AAD App Password>
    batch_account_resource_id: </batch/account/resource/id>
    storage_account_resource_id: </storage/account/resource/id>

Copy the entire service_principal section in your .aztk/secrets.yaml. If you do not have a secrets.yaml file, you can create one in your current working directory by running aztk spark init.

Now you are ready to create your first aztk cluster. See Creating a Cluster.

Clusters

In the Azure Distributed Data Engineering Toolkit, a cluster is primarily designed to run Spark jobs. This document describes how to create a cluster to use for Spark jobs. Alternatively for getting started and debugging you can also use the cluster in interactive mode which will allow you to log into the master node and interact with the cluster from there.

Creating a Cluster

Creating a Spark cluster only takes a few simple steps after which you will be able to SSH into the master node of the cluster and interact with Spark. You will be able to view the Spark Web UI, Spark Jobs UI, submit Spark jobs (with spark-submit), and even interact with Spark in a Jupyter notebook.

For the advanced user, please note that the default cluster settings are preconfigured in the .aztk/cluster.yaml file that is generated when you run aztk spark init. More information on cluster config here.

Commands

Create a Spark cluster:

aztk spark cluster create --id <your_cluster_id> --vm-size <vm_size_name> --size <number_of_nodes>

For example, to create a cluster of 4 Standard_A2 nodes called ‘spark’ you can run:

aztk spark cluster create --id spark --vm-size standard_a2 --size 4

You can find more information on VM sizes here. Please note that you must use the official SKU name when setting your VM size - they usually come in the form: “standard_d2_v2”.

Note: The cluster id (--id) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each cluster must have a unique cluster id.

By default, you cannot create clusters of more than 20 cores in total. Visit this page to request a core quota increase.

Low priority nodes

You can create your cluster with low-priority VMs at an 80% discount by using --size-low-pri instead of --size. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.

Mixed Mode

You can create clusters with a mixed of low-priority and dedicated VMs to reach the optimal balance of price and availability. In Mixed Mode, your cluster will have both dedicated instances and low priority instances. To minimize the potential impact on your Spark workloads, the Spark master node will always be provisioned on one of the dedicated nodes while each of the low priority nodes will be Spark workers.

Please note, to use Mixed Mode clusters, you need to authenticate using Azure Active Directory (AAD) by configuring the Service Principal in .aztk/secrets.yaml. You also need to create a Virtual Network (VNET), and provide the resource ID to a Subnet within the VNET in your ./aztk/cluster.yaml` configuration file.

Setting your Spark and/or Python versions

By default, the Azure Distributed Data Engineering Toolkit will use Spark v2.2.0 and Python v3.5.4. However, you can set your Spark and/or Python versions by configuring the base Docker image used by this package.

Listing clusters

You can list all clusters currently running in your account by running

aztk spark cluster list

Viewing a cluster

To view details about a particular cluster run:

aztk spark cluster get --id <your_cluster_id>

Note that the cluster is not fully usable until a master node has been selected and it’s state is idle.

For example here cluster ‘spark’ has 2 nodes and node tvm-257509324_2-20170820t200959z is the master and ready to run a job.

Cluster         spark
------------------------------------------
State:          active
Node Size:      standard_a2
Nodes:          2
| Dedicated:    2
| Low priority: 0

Nodes                               | State           | IP:Port              | Master
------------------------------------|-----------------|----------------------|--------
tvm-257509324_1-20170820t200959z    | idle            | 40.83.254.90:50001   |
tvm-257509324_2-20170820t200959z    | idle            | 40.83.254.90:50000   | *

Deleting a cluster

To delete a cluster run:

aztk spark cluster delete --id <your_cluster_id>

Deleting a cluster also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs flag.

To delete all clusters:

aztk spark cluster delete --id $(aztk spark cluster list -q)

Skip delete confirmation by using the --force flag.

You are charged for the cluster as long as the nodes are provisioned in your account. Make sure to delete any clusters you are not using to avoid unwanted costs.

Run a command on all nodes in the cluster

To run a command on all nodes in the cluster, run:

aztk spark cluster run --id <your_cluster_id> "<command>"

The command is executed through an SSH tunnel.

Run a command on a specific node in the cluster

To run a command on all nodes in the cluster, run:

aztk spark cluster run --id <your_cluster_id> --node-id <your_node_id> "<command>"

This command is executed through a SSH tunnel. To get the id of nodes in your cluster, run aztk spark cluster get --id <your_cluster_id>.

Copy a file to all nodes in the cluster

To securely copy a file to all nodes, run:

aztk spark cluster copy --id <your_cluster_id> --source-path </path/to/local/file> --dest-path </path/on/node>

The file will be securely copied to each node using SFTP.

Interactive Mode

All other interaction with the cluster is done via SSH and SSH tunneling. If you didn’t create a user during cluster create (aztk spark cluster create), the first step is to add a user to each node in the cluster.

Make sure that the .aztk/secrets.yaml file has your SSH key (or path to the SSH key), and it will automatically use it to make the SSH connection.

aztk spark cluster add-user --id spark --username admin

Alternatively, you can add the SSH key as a parameter when running the add-user command.

aztk spark cluster add-user --id spark --username admin --ssh-key <your_key_OR_path_to_key>

You can also use a password to create your user:

aztk spark cluster add-user --id spark --username admin --password <my_password>

Using a SSH key is the recommended method.

SSH and Port Forwarding

After a user has been created, SSH into the Spark container on the master node with:

aztk spark cluster ssh --id spark --username admin

If you would like to ssh into the host instead of the Spark container on it, run:

aztk spark cluster ssh --id spark --username admin --host

If you ssh into the host and wish to access the running Docker Spark environment, you can run the following:

sudo docker exec -it spark /bin/bash

Now that you’re in, you can change directory to your familiar $SPARK_HOME

cd $SPARK_HOME

To view the SSH command being called, pass the --no-connect flag:

aztk spark cluster ssh --id spark --no-connect

Note that an SSH tunnel and shell will be opened with the default SSH client if one is present. Otherwise, a pure python SSH tunnel is created to forward the necessary ports. The pure python SSH tunnel will not open a shell.

Debugging your Spark Cluster

If your cluster is in an unknown or unusable state, you can debug by running:

aztk spark cluster debug --id <cluster-id> --output </path/to/output/directory/>

The debug utility will pull logs from all nodes in the cluster. The utility will check for:

  • free diskspace
  • docker image status
  • docker container status
  • docker container logs
  • docker container process status
  • aztk code & version
  • spark component logs (master, worker, shuffle service, history server, etc) from $SPARK_HOME/logs
  • spark application logs from $SPARK_HOME/work

Please be careful sharing the output of the debug command as secrets and application code are present in the output.

Interact with your Spark cluster

By default, the aztk spark cluster ssh command port forwards the Spark Web UI to localhost:8080, Spark Jobs UI to localhost:4040, and Spark History Server to your localhost:18080. This can be configured in .aztk/ssh.yaml.

Custom scripts

Custom scripts are DEPRECATED. Use plugins instead.

Custom scripts allow for additional cluster setup steps when the cluster is being provisioned. This is useful if you want to install additional software, and if you need to modify the default cluster configuration for things such as modifying spark.conf, adding jars or downloading any files you need in the cluster.

You can specify the location of custom scripts on your local machine in .aztk/cluster.yaml. If you do not have a .aztk/ directory in you current working directory, run aztk spark init or see Getting Started. Note that the path can be absolute or relative to your current working directory.

The custom scripts can be configured to run on the Spark master only, the Spark workers only, or all nodes in the cluster (Please note that by default, the Spark master node is also a Spark worker). For example, the following custom script configuration will run 3 custom scripts in the order they are provided:

custom_scripts:
    - script: ./custom-scripts/simple.sh
      runOn: all-nodes
    - script: ./custom-scripts/master-only.sh
      runOn: master
    - script: ./custom-scripts/worker-only.sh
      runOn: worker

The first script, simple.sh, will run on all nodes and will be executed first. The next script, master-only.sh will run only on nodes that are Spark masters and after simple.sh. The next script, worker-only.sh, will run last and only on nodes that are Spark workers.

Directories may also be provided in the custom_scripts section of .aztk/cluster.yaml.

custom_scripts:
    - script: /custom-scripts/
      runOn: all-nodes

The above configuration takes the absolute path /custom-scripts/ and uploads every file within it. These files will all be executed, although order of execution is not guaranteed. If your custom scripts have dependencies, specify the order by providing the full path to the file as seen in the first example.

Scripting considerations

  • The default OS is Ubuntu 16.04.
  • The scripts run on the specified nodes in the cluster after Spark has been installed.
  • The scripts execute in the order provided
  • If a script directory is provided, order of execution is not guaranteed
  • The environment variable $SPARK_HOME points to the root Spark directory.
  • The environment variable $IS_MASTER identifies if this is the node running the master role. The node running the master role also runs a worker role on it.
  • The Spark cluster is set up using Standalone Mode

Provided Custom Scripts

HDFS

A custom-script to install HDFS (2.8.2) is provided at custom-scripts/hdfs.sh directory. This will install and provision HDFS for your cluster.

To enable HDFS, add this snippet to the custom_scripts section of your .aztk/cluster.yaml configuration file:

custom_scripts:
  - script: ./custom-scripts/hdfs.sh
    runOn: all-nodes

When SSHing into the cluster, you will have access to the Namenode UI at the default port 50070. This port can be changed in the ssh.yaml file in your .aztk/ directory, or by passing the --namenodeui flag to the aztk spark cluster ssh command.

When enabled on the cluster, HDFS can be used to read or write data locally during program execution.

Docker

Azure Distributed Data Engineering Toolkit runs Spark on Docker.

Supported Azure Distributed Data Engineering Toolkit images are hosted publicly on Docker Hub.

By default, the aztk/spark:v0.1.0-spark2.3.0-base image will be used.

To select an image other than the default, you can set your Docker image at cluster creation time with the optional –docker-repo parameter:

aztk spark cluster create ... --docker-repo <name_of_docker_image_repo>

For example, if I wanted to use Spark v2.2.0, I could run the following cluster create command:

aztk spark cluster create ... --docker-repo aztk/base:spark1.6.3

Using a custom Docker Image

You can build your own Docker image on top or beneath one of our supported base images OR you can modify the supported Dockerfiles and build your own image that way.

Once you have your Docker image built and hosted publicly, you can then use the –docker-repo parameter in your aztk spark cluster create command to point to it.

Using a custom Docker Image that is Privately Hosted

To use a private docker image you will need to provide a docker username and password that have access to the repository you want to use.

In .aztk/secrets.yaml setup your docker config

docker:
    username: <myusername>
    password: <mypassword>

If your private repository is not on docker hub (Azure container registry for example) you can provide the endpoint here too

docker:
    username: <myusername>
    password: <mypassword>
    endpoint: <https://my-custom-docker-endpoint.com>

Building Your Own Docker Image

Building your own Docker Image provides more customization over your cluster’s environment. For some, this may look like installing specific, and even private, libraries that their Spark jobs require. For others, it may just be setting up a version of Spark, Python or R that fits their particular needs.

The Azure Distributed Data Engineering Toolkit supports custom Docker images. To guarantee that your Spark deployment works, we recommend that you build on top of one of our supported images.

To build your own image, can either build on top or beneath one of our supported images OR you can just modify one of the supported Dockerfiles to build your own.

Building on top

You can build on top of our images by referencing the aztk/spark image in the FROM keyword of your Dockerfile:

# Your custom Dockerfile

FROM aztk/spark:v0.1.0-spark2.3.0-base
...

Building beneath

To build beneath one of our images, modify one of our Dockerfiles so that the FROM keyword pulls from your Docker image’s location (as opposed to the default which is a base Ubuntu image):

# One of the Dockerfiles that AZTK supports
# Change the FROM statement to point to your hosted image repo

FROM my_username/my_repo:latest
...

Please note that for this method to work, your Docker image must have been built on Ubuntu.

Custom Docker Image Requirements

If you are building your own custom image and not building on top of a supported image, the following requirements are necessary.

Please make sure that the following environment variables are set:

  • AZTK_DOCKER_IMAGE_VERSION
  • JAVA_HOME
  • SPARK_HOME

You also need to make sure that PATH is correctly configured with $SPARK_HOME

  • PATH=$SPARK_HOME/bin:$PATH

By default, these are set as follows:

ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64
ENV SPARK_HOME /home/spark-current
ENV PATH $SPARK_HOME/bin:$PATH

If you are using your own version of Spark, make that it is symlinked by “/home/spark-current”. $SPARK_HOME, must also point to “/home/spark-current”.

Hosting your Docker Image

By default, aztk assumes that your Docker images are publicly hosted on Docker Hub. However, we also support hosting your images privately.

See here to learn more about using privately hosted Docker Images.

Configuration Files

This section refers to the files in the directory .aztk that are generated from the aztk spark init command.

cluster.yaml

The core settings for a cluster are configured in the cluster.yaml file. Once you have set your desired values in .aztk/cluster.yaml, you can create a cluster using aztk spark cluster create.

This is the default cluster configuration:

# id: <id of the cluster to be created>
id: spark_cluster

# Toolkit configuration [Required] You can use `aztk toolkit` command to find which toolkits are available
toolkit:
  software: spark
  version: 2.2
  # environment: python
  # Optional version for the environment
  # environment_version:

  # Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environment if using default images)
  # docker_repo: <name of docker image repo (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>


# vm_size: <vm-size, see available options here: https://azure.microsoft.com/pricing/details/batch//>
vm_size: standard_a2

# size: <number of dedicated nodes in the cluster, not that clusters must contain all dedicated or all low priority nodes>
size: 2

# size_low_priority: <number of low priority nodes in the cluster, mutually exclusive with size setting>

# username: <username for the linux user to be created> (optional)
username: spark

# Enable plugins
plugins:
  # - name: spark_ui_proxy
  # - name: jupyterlab
  # - name: jupyter
  # - name: hdfs
  # - name: rstudio_server

# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

# wait: <true/false>
wait: true

Running aztk spark cluster create will create a cluster of 4 Standard_A2 nodes called ‘spark_cluster’ with a linux user named ‘spark’. This is equivalent to running the command

aztk spark cluster create --id spark --vm-size standard_a2 --size 4 --username spark --wait

NOTE: This assumes that your SSH-key is configured in the .aztk/secrets.yaml file.

ssh.yaml

This is the default ssh cluster configuration:

# username: <name of the user account to ssh into>
username: spark

# job_ui_port: <local port where the job ui is forwarded to>
job_ui_port: 4040

# job_history_ui_port: <local port where the job history ui is forwarded to>
job_history_ui_port: 18080

# web_ui_port: <local port where the spark master web ui is forwarded to>
web_ui_port: 8080

# jupyter_port: <local port which where jupyter is forwarded to>
jupyter_port: 8888

# name_node_ui_port: <local port which where Name Node UI is forwarded to>
name_node_ui_port: 50070

# rstudio_server_port: <local port which where rstudio server is forwarded to>
rstudio_server_port: 8787

# connect: <true/false, connect to spark master or print connection string (--no-connect)>
connect: true

Running the command aztk spark cluster ssh --id <cluster_id> will ssh into the master node of the Spark cluster. It will also forward the Spark Job UI to localhost:4040, the Spark master’s web UI to localhost:8080, and Jupyter to localhost:8888.

Note that all of the settings in ssh.yaml will be overridden by parameters passed on the command line.

Spark Configuration

The repository comes with default Spark configuration files which provision your Spark cluster just the same as you would locally. After running aztk spark init to initialize your working environment, you can view and edit these files at .aztk/spark-defaults.conf, .aztk/spark-env.sh and .aztk/core-site.xml. Please note that you can bring your own Spark configuration files by copying your spark-defaults.conf, spark-env.sh and core-site.xml into your .aztk/ directory.

If using aztk job submission, please note that both spark.shuffle.service.enabled and spark.dynamicAllocation.enabled must be set to true so that the number of executors registered with an application can scale as nodes in the job’s cluster come online.

The following settings available in spark-defaults.conf and spark-env.sh are not supported:

spark-env.sh:

  • SPARK_LOCAL_IP
  • SPARK_PUBLIC_DNS
  • SPARK_MASTER_HOST
  • SPARK_MASTER_PORT
  • SPARK_WORKER_PORT
  • SPARK_MASTER_WEBUI_PORT
  • Any options related to YARN client mode or Mesos

spark-defaults.conf:

  • spark.master

History Server

If you want to use Spark’s history server, please set the following values in your .aztk/spark-defaults.conf file:

spark.eventLog.enabled          true
spark.eventLog.dir              <path>
spark.history.fs.logDirectory   <path>

Please note that the path for spark.eventLog.dir and spark.history.fs.logDirectory should most likely match so that the history server reads the logs that each Spark job writes. Also note that while the paths can be local (file:/), it is recommended that the paths be accessible by every node in the cluster so that the history server, which runs on the Spark master node, has access to all application logs. HDFS, WASB, ADL, or any other Hadoop API compliant storage system may be used.

If using WASB, ADL or other cloud storage services, be sure to set your keys in .aztk/core-site.xml. For more information, see the Cloud Storage documentation.

Configuring Spark Storage

The Spark cluster can be configured to use different cloud supported storage offerings (such as Azure Storage Blobs, Azure Data Lake Storage, or any other supported Spark file system). More information can be found in the Cloud Storage documentation.

Placing JARS

Additional JAR files can be added to the cluster by simply adding them to the .aztk/jars directory. These JARS will automatically be added to Spark’s default JAR directory. In the case of a naming conflict, the file in .aztk/jars will overwrite the file in the cluster. Typically new JARS must be registered with Spark. To do this, either run the Spark Submit command with a path to the JARS

aztk spark cluster submit --id <my_cluster_id> --jars $SPARK_HOME/jars/my_jar_file_1.jar <my_application> <my_parameters>

Or update the .aztk/spark-default.conf file as shown below to have it registered for all Spark applications.

spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar

Note: This tool automatically registers several JARS for default cloud storage in the spark-default.conf file. If you want to modify this file, simply append any additional JARS to the end of this list.

Azure Files

The ability to load a file share on the cluster is really useful when you want to be able to share data across all the nodes, and/or want that data to be persisted longer than the lifetime of the cluster. Azure Files provides a very easy way to mount a share into the cluster and have it accessible to all nodes. This is useful in cases where you have small data sets you want to process (less than 1GB) or have notebooks that you want to re-use between clusters.

Mounting an Azure Files share in the cluster only required updating the cluster.yaml file at .aztk/cluster.yaml. For example, the following configuration will load two files shares into the cluster, one with my notebooks and one will a small data set that I have previously uploaded to Azure Files.

azure_files:
    - storage_account_name: STORAGE_ACCOUNT_NAME
      storage_account_key: STORAGE_ACCOUNT_KEY
      # Name of the file share in Azure Files
      file_share_path: data
      # Mount point on the node in the cluster
      mount_path: /mnt/data
    - storage_account_name: STORAGE_ACCOUNT_NAME
      storage_account_key: STORAGE_ACCOUNT_KEY
      # Name of the file share in Azure Files
      file_share_path: notebooks
      # Mount point on the node in the cluster
      mount_path: /mnt/notebooks

From the cluster I can now access both of these file shares directly simply by navigating to /mnt/data or /mnt/notebooks respectively.

Plugins

Plugins are a successor to custom scripts and are the recommended way of running custom code on the cluster.

Plugins can either be one of the Aztk supported plugins or the path to a local file.

Supported Plugins

AZTK ships with a library of default plugins that enable auxiliary services to use with your Spark cluster.

Currently the following plugins are supported:

  • JupyterLab
  • Jupyter
  • HDFS
  • RStudioServer
  • TensorflowOnSpark
  • OpenBLAS
  • mvBLAS

Enable a plugin using the CLI

If you are using the aztk CLI and wish to enable a supported plugin, you need to update you .aztk/cluster.yaml configuration file.

Add or uncomment the plugins section and set the plugins you desire to enable as follows:

plugins:
    - name: jupyterlab
    - name: jupyter
    - name: hdfs
    - name: spark_ui_proxy
    - name: rsutio_server
      args:
        version: "1.1.383"

Enable a plugin using the SDK

If you are using the aztk SDK and wish to enable a supported plugin, you need to import the necessary plugins from the aztk.spark.models.plugin module and add them to your ClusterConfiguration object’s plugin list:

from aztk.spark.models.plugins import RStudioServerPlugin, HDFSPlugin
cluster_config = ClusterConfiguration(
  ...# Other config,
  plugins=[
    JupyterPlugin(),
    HDFSPlugin(),
  ]
)

Custom script plugin

This allows you to run your custom code on the cluster

Run a custom script plugin with the CLI

Example
plugins:
    - script: path/to/my/script.sh
    - name: friendly-name
      script: path/to/my-other/script.sh
      target: host
      target_role: all-nodes
Options
  • script: Required Path to the script you want to run
  • name: Optional Friendly name. By default will be the name of the script file
  • target: Optional Target on where to run the plugin(Default: spark-container). Can be spark-container or host
  • target_role: Optional What should be the role of the node where this script run(Default: master). Can be master, worker or all-nodes

Submitting an Application

Submitting a job to your Spark cluster in this package mimics the experience of a typical standalone cluster. A spark job will be submitted to the system and run to completion.

Spark-Submit

The spark-submit experience is mostly the same as any regular Spark cluster with a few minor differences. You can take a look at aztk spark cluster --help for more detailed information and options.

Run a Spark job:

aztk spark cluster submit --id <name_of_spark_cluster> --name <name_of_spark_job> <executable> <executable_params>

For example, to run a local pi.py file on a Spark cluster, simply specify the local path of the file:

aztk spark cluster submit --id spark --name pipy examples/src/main/python/pi.py 100

To run a remotely hosted pi.py file on a Spark cluster, specify the remote path of the file and use the ‘–remote’ flag:

aztk spark cluster submit --id spark --name pipy --remote wasbs://path@remote/pi.py 100

NOTE: The job name (–name) must be at least 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters. Each job you submit must have a unique name.

Monitoring job

If you have set up a SSH tunnel with port forwarding, you can navigate to http://localhost:8080 and http://localhost:4040 to view the progress of the job using the Spark UI

Getting output logs

The default setting when running a job is –wait. This will simply submit a job to the cluster and wait for the job to run. If you want to just submit the job and not wait, use the –no-wait flag and tail the logs manually:

aztk spark cluster submit --id spark --name pipy --no-wait examples/src/main/python/pi.py 1000
aztk spark cluster app-logs --id spark --name pipy --tail

Cloud storage

Cloud storage for spark enables you to have a persisted storage system backed by a cloud provider. Spark supports this by placing the appropriate storage jars and updating the core-site.xml file accordingly.

Azure Storage Blobs (WASB)

Pre-built into this package is native support for connecting your Spark cluster to Azure Blob Storage (aka WASB). The required WASB jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.

To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using WASB filled out in the .aztk/core-site.xml file. Simply uncomment the in the “Azure Storage Blobs (WASB)” section and fill out the properties for MY_STORAGE_ACCOUNT_NAME, MY_STORAGE_ACCOUNT_SUFFIX and MY_STORAGE_ACCOUNT_KEY.

Once you have correctly filled out the .aztk/core-site.xml with your storage credentials, you will be able to access your storage accounts from your Spark job.

Reading and writing to and from Azure blobs is easily achieved by using the wasb syntax. For example, reading a csv file using Pyspark would be:

# read csv data into data
dataframe = spark.read.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_INPUT_DATA.csv')

# print off the first 5 rows
dataframe.show(5)

# write the csv back to storage
dataframe.write.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_OUTPUT_DATA.csv')

Azure Data Lake (ADL)

Pre-built into this package is native support for connecting your Spark cluster to Azure Data Lake (aka ADL). The required ADL jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.

To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using ADL filled out in the .aztk/core-site.xml file. Simply uncomment the in the “ADL (Azure Data Lake) Configuration” section and fill out the properties for MY_AAD_TENANT_ID, MY_AAD_CLIENT_ID and MY_AAD_CREDENTIAL.

Once you have correctly filled out the .aztk/core-site.xml with your Azure Data Lake credentials, you will be able to access your ADL storage repositories from your Spark job.

Reading and writing to and from Azure Data Lake Storage is easily achieved by using the adl syntax. For example, reading a csv file using Pyspark would be:

# read csv data into data
dataframe = spark.read.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_INPUT_DATA.csv')

# print off the first 5 rows
dataframe.show(5)

# write the csv back to storage
dataframe.write.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_OUTPUT_DATA.csv')

Note: The implementation of the ADL connector is designed to always access ADLS through a secure channel, so there is no adls file system scheme name. You will always use adl. For more information please take a look at https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-data-lake-store.

Note: In order to use ADL you must first create an AAD application and give it permissions to your ADL Storage account. There is a good tutorial on how to create the require AAD security objects to use ADL here. Not shown in this tutorial is that as a last step, you will need to give permissions the application you created permissions to your ADL Storage account.

Additional file system connectors

You can quickly add support for additional data repositories by adding the necessary JARS to your cluster, configuring the spark-defaults.conf and core-site.xml file accordingly.

Adding Jars

To add jar files to the cluster, simply add them to your local .aztk/jars directory. These will automatically get loaded into your cluster and placed under $SPARK_HOME/jars

Registering Jars

To register the jars, update the .aztk/spark-defaults.conf file and add the path to the jar file(s) to the spark.jars property

spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar

Configuring the file system

Configuring the file system requires an update to the aztk/core-site.xml file. Each file system is unique and requires different setup in the core-site.xml. In .aztk/core-site.xml, we have preloaded templates to add WASB and ADL.

GPU

Use GPUs to accelerate your Spark applications. When using a GPU enabled Azure VM, your docker image will contain CUDA-8.0 and cuDnn-6.0 by default. See Docker Image for more information about the AZTK Docker images.

[NOTE: Azure does not have GPU enabled VMs in all regions. Please use this link to make sure that your Batch account is in a region that has GPU enabled VMs]

AZTK uses Nvidia-Docker to expose the VM’s GPU(s) inside the container. Nvidia drivers (ver. 384) are installed at runtime.

Tutorial

Create a cluster specifying a GPU enabled VM

aztk spark cluster create --id gpu-cluster --vm-size standard_nc6 --size 1

Submit your an application to the cluster that will take advantage of the GPU

aztk spark cluster submit --id gpu-cluster --name gpu-app ./examples/src/main/python/gpu/nubma_example.py

Installation Location

By default, CUDA is installed at /usr/local/cuda-8.0.

Jobs

In the Azure Distributed Data Engineering Toolkit,a Job is an entity that runs against an automatically provisioned and managed cluster. Jobs run a collection of Spark applications and and persist the outputs.


Creating a Job

Creating a Job starts with defining the necessary properties in your .aztk/job.yaml file. Jobs have one or more applications to run as well as values that define the Cluster the applications will run on.

Job.yaml

Each Job has one or more applications given as a List in Job.yaml. Applications are defined using the following properties:

  applications:
    - name:
      application:
      application_args:
        -
      main_class:
      jars:
        -
      py_files:
        -
      files:
        -
      driver_java_options:
        -
      driver_library_path:
      driver_class_path:
      driver_memory:
      executor_memory:
      driver_cores:
      executor_cores:

Please note: the only required fields are name and application. All other fields may be removed or left blank.

NOTE: The Application name can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each application must have a unique name.

Jobs also require a definition of the cluster on which the Applications will run. The following properties define a cluster:

  cluster_configuration:
    vm_size: <the Azure VM size>
    size: <the number of nodes in the Cluster>
    toolkit:
      software: spark
      version: 2.2
    subnet_id: <resource ID of a subnet to use (optional)>
    custom_scripts:
      - List
      - of
      - paths
      - to
      - custom
      - scripts

Please Note: For more information about Azure VM sizes, see Azure Batch Pricing. And for more information about Docker repositories see Docker.

The only required fields are vm_size and either size or size_low_priority, all other fields can be left blank or removed.

A Job definition may also include a default Spark Configuration. The following are the properties to define a Spark Configuration:

  spark_configuration:
    spark_defaults_conf: </path/to/your/spark-defaults.conf>
    spark_env_sh: </path/to/your/spark-env.sh>
    core_site_xml: </path/to/your/core-site.xml>

Please note: including a Spark Configuration is optional. Spark Configuration values defined as part of an application will take precedence over the values specified in these files.

Below we will define a simple, functioning job definition.

# Job Configuration

job:
  id: test-job
  cluster_configuration:
    vm_size: standard_f2
    size: 3

  applications:
    - name: pipy100
      application: /path/to/pi.py
      application_args:
        - 100
    - name: pipy200
      application: /path/to/pi.py
      application_args:
        - 200

Once submitted, this Job will run two applications, pipy100 and pipy200, on an automatically provisioned Cluster with 3 dedicated Standard_f2 size Azure VMs. Immediately after both pipy100 and pipy200 have completed the Cluster will be destroyed. Application logs will be persisted and available.

Commands

Submit a Spark Job:

aztk spark job submit --id <your_job_id> --configuration </path/to/job.yaml>

NOTE: The Job id (--id) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each Job must have a unique id.

Low priority nodes

You can create your Job with low-priority VMs at an 80% discount by using --size-low-pri instead of --size. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.

Listing Jobs

You can list all Jobs currently running in your account by running

aztk spark job list

Viewing a Job

To view details about a particular Job, run:

aztk spark job get --id <your_job_id>

For example here Job ‘pipy’ has 2 applications which have already completed.

Job             pipy
------------------------------------------
State:                              | completed
Transition Time:                    | 21:29PM 11/12/17

Applications                        | State          | Transition Time
------------------------------------|----------------|-----------------
pipy100                             | completed      | 21:25PM 11/12/17
pipy200                             | completed      | 21:24PM 11/12/17

Deleting a Job

To delete a Job run:

aztk spark job delete --id <your_job_id>

Deleting a Job also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs flag.

You are only charged for the job while it is active, Jobs handle provisioning and destroying infrastructure, so you are only charged for the time that your applications are running.

Stopping a Job

To stop a Job run:

aztk spark job stop --id <your_job_id>

Stopping a Job will end any currently running Applications and will prevent any new Applications from running.

Get information about a Job’s Application

To get information about a Job’s Application:

aztk spark job get-app --id <your_job_id> --name <your_application_name>

Getting a Job’s Application’s log

To get a job’s application logs:

aztk spark job get-app-logs --id <your_job_id> --name <your_application_name>

Stopping a Job’s Application

To stop an application that is running or going to run on a Job:

aztk spark job stop-app --id <your_job_id> --name <your_application_name>

Migration Guide

0.6.0 to 0.7.0

This guide will describe the steps needed to update a 0.6.0 aztk installation to 0.7.0.

Installation from pip

AZTK is now published on pip! If you installed from github previously, please reinstall.

To uninstall run:

pip3 uninstall aztk

The following command will get the latest release of aztk (please ensure you are using python3.5+):

pip3 install aztk

Or, you can install 0.7.0 specifically using:

pip3 install aztk==0.7.0

Configuration Files

A number of changes have been made that affect previously init’ed aztk environments. To limit potential issues with previous versions, we recommend that you replace any existing .aztk directories.

  1. Backup your existing .aztk directory by renaming it to .aztk.old.
  2. Run aztk spark init to create a new .aztk directory
  3. Copy the values from .aztk.old/secrets.yaml to .aztk/secrets.yaml
  4. Update the new .aztk/cluster.yaml with values from .aztk.old/cluster.yaml if applicable. Please be aware of the new toolkit section that replaces docker_repo for supported images. Similarly for .aztk/job.yaml.
  5. Update the new defaults in .aztk/spark-defaults.conf, .aztk/core-site.xml and .aztk/spark-env.sh if applicable.
  6. Be sure to not copy over the .aztk.old/jars directory. All jars that were placed here by default have been moved on the Docker image. You can add any custom jars you had by placing them in .aztk/jars/.
  7. Create your new 0.7.0 cluster!

cluster.yaml

In cluster.yaml, the toolkit key has been added. It is used to select the default, supported Docker images. Please refer to the configuration file documentation.

Docker images

A major backwards-incompatible refactor of the Docker images has occurred. Previous Docker images will no longer work with 0.7.0. To update to a new supported docker image, you will need to update your .aztk/cluster.yaml configuration file with the toolkit block in place of docker_repo. If you do not do so, cluster creation will fail!

Please refer to the the configuration file documentation for more information on the toolkit in cluster.yaml.

Custom scripts depreciation and Plugins

Custom scripts have been depreciated in favor of Plugins. Plugins have a number of advantages, including the ability to execute on the host (and not in the Spark Docker container). A number of supported plugins are shipped with aztk, please refer to the plugin documentation to learn more.

SDK samples

Create the Spark client

You can get the values for this by either running the Getting Started script or using Batch Labs

    import sys, os, time
    import aztk.spark
    from aztk.error import AztkError

    # set your secrets
    secrets_confg = aztk.spark.models.SecretsConfiguration(
        service_principal=aztk.spark.models.ServicePrincipalConfiguration(
            tenant_id="<org>.onmicrosoft.com",
            client_id="",
            credential="",
            batch_account_resource_id="",
            storage_account_resource_id="",
        ),
        ssh_pub_key=""
    )


    # create a client
    client = aztk.spark.Client(secrets_confg)

List available clusters

# list available clusters
clusters = client.list_clusters()

Create a new cluster

# define a custom script
plugins = [
    aztk.spark.models.plugins.JupyterPlugin(),
]

# define spark configuration
spark_conf = aztk.spark.models.SparkConfiguration(
    spark_defaults_conf=os.path.join(ROOT_PATH, 'config', 'spark-defaults.conf'),
    spark_env_sh=os.path.join(ROOT_PATH, 'config', 'spark-env.sh'),
    core_site_xml=os.path.join(ROOT_PATH, 'config', 'core-site.xml'),
    jars=[os.path.join(ROOT_PATH, 'config', 'jars', jar) for jar in os.listdir(os.path.join(ROOT_PATH, 'config', 'jars'))]
)

# configure my cluster
cluster_config = aztk.spark.models.ClusterConfiguration(
    cluster_id="sdk-test",
    toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
    size_low_priority=2,
    vm_size="standard_f2",
    plugins=plugins,
    spark_configuration=spark_conf
)

# create a cluster, and wait until it is ready
try:
    cluster = client.create_cluster(cluster_config)
    cluster = client.wait_until_cluster_is_ready(cluster.id)
except AztkError as e:
    print(e.message)
    sys.exit()

Get an exiting cluster

    cluster = client.get_cluster(cluster_config.cluster_id)

Run an application on the cluster


# create some apps to run
app1 = aztk.spark.models.ApplicationConfiguration(
    name="pipy1",
    application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
    application_args="10"
)

app2 = aztk.spark.models.ApplicationConfiguration(
    name="pipy2",
    application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
    application_args="20"
)

app3 = aztk.spark.models.ApplicationConfiguration(
    name="pipy3",
    application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
    application_args="30"
)

# submit an app and wait until it is finished running
client.submit(cluster.id, app1)
client.wait_until_application_done(cluster.id, app1.name)

# submit some other apps to the cluster in parallel
client.submit_all_applications(cluster.id, [app2, app3])

Get the logs of an application

# get logs for app, print to console
app1_logs = client.get_application_log(cluster_id=cluster_config.cluster_id, application_name=app1.name)
print(app1_logs.log)

Get status of app

status = client.get_application_status(cluster_config.cluster_id, app2.name)

Stream logs of app, print to console as it runs


current_bytes = 0
while True:
    app2_logs = client.get_application_log(
        cluster_id=cluster_config.cluster_id,
        application_name=app2.name,
        tail=True,
        current_bytes=current_bytes)

    print(app2_logs.log, end="")

    if app2_logs.application_state == 'completed':
        break
    current_bytes = app2_logs.total_bytes
    time.sleep(1)

# wait until all jobs finish, then delete the cluster
client.wait_until_applications_done(cluster.id)
client.delete_cluster(cluster.id)

Define a custom plugin

Full example


from aztk.spark.models.plugins import PluginConfiguration, PluginFile,PluginPort, PluginTarget, PluginTargetRole

cluster_config = ClusterConfiguration(
  ...# Other config,
  plugins=[
    PluginConfiguration(
        name="my-custom-plugin",
        files=[
            PluginFile("file.sh", "/my/local/path/to/file.sh"),
            PluginFile("data/one.json", "/my/local/path/to/data/one.json"),
            PluginFile("data/two.json", "/my/local/path/to/data/two.json"),
        ],
        execute="file.sh", # This must be one of the files defined in the file list and match the target path,
        env=dict(
            SOME_ENV_VAR="foo"
        ),
        args=["arg1"], # Those arguments are passed to your execute script
        ports=[
            PluginPort(internal="1234"),                # Internal only(For node communication for example)
            PluginPort(internal="2345", public=True),   # Open the port to the public(When ssh into). Used for UI for example
        ],

        # Pick where you want the plugin to run
        target=PluginTarget.Host,                       # The script will be run on the host. Default value is to run in the spark container
        target_role=PluginTargetRole.All,               # If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)
    )
  ]
)

Parameters

PluginConfiguration

name required | string

Name of your plugin(This will be used for creating folder, it is recommended to have a simple letter, dash, underscore only name)

files required | List[PluginFile|PluginTextFile]

List of files to upload

execute required | str

Script to execute. This script must be defined in the files above and must match its remote path

args optional | List[str]

List of arguments to be passed to your execute scripts

env optional | dict

List of environment variables to access in the script(This can be used to pass arguments to your script instead of args)

ports optional | List[PluginPort]

List of ports to open if the script is running in a container. A port can also be specific public and it will then be accessible when ssh into the master node.

target | optional | PluginTarget

Define where the execute script should be running. Potential values are PluginTarget.SparkContainer(Default) and PluginTarget.Host

taget_role | optional | PluginTargetRole

If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)

PluginFile

target required | str

Where the file should be dropped relative to the plugin working directory

local_path | required | str

Path to the local file you want to upload(Could form the plugins parameters)

TextPluginFile

target | required | str

Where the file should be dropped relative to the plugin working directory

content | required | str | io.StringIO

Path to the local file you want to upload(Could form the plugins parameters)

PluginPort

internal | required | int

Internal port to open on the docker container

public | optional | bool

If the port should be open publicly(Default: False)

Environment variables available in the plugin

AZTK provide a few environment variables that can be used in your plugin script

  • AZTK_IS_MASTER: Is the plugin running on the master node. Can be either true or false
  • AZTK_IS_WORKER: Is a worker setup on the current node(This might also be a master if you have worker_on_master set to true) Can be either true or false
  • AZTK_MASTER_IP: Internal ip of the master

Debug your plugin

When your plugin is not working as expected there is a few things you do to investigate issues

Check the logs, you can either use the debug tool or BatchLabs Navigate to startup/wd/logs/plugins

  • Now if you see a file named <your-plugin-name>.txt under that folder it means that your plugin started correctly and you can check this file to see what you execute script logged.

  • IF this file doesn’t exists this means the script was not run on this node. There could be multiple reasons for this:

    • If you want your plugin to run on the spark container check the startup/wd/logs/docker.log file for information about this
    • If you want your plugin to run on the host check the startup/stdout.txt and startup/stderr.txt

    The log could mention you picked the wrong target or target role for that plugin which is why this plugin is not running on this node.

aztk package

aztk.models package

class aztk.models.ClusterConfiguration(*args, **kwargs)[source]

Bases: aztk.core.models.model.Model

Cluster configuration model

Parameters:
  • cluster_id (str) – Id of the Aztk cluster
  • toolkit (aztk.models.Toolkit) – Toolkit to be used for this cluster
  • size (int) – Number of dedicated nodes for this cluster
  • size_low_priority (int) – Number of low priority nodes for this cluster
  • vm_size (int) – Azure Vm size to be used for each node
  • subnet_id (str) – Full resource id of the subnet to be used(Required for mixed mode clusters)
  • plugins (List[aztk.models.plugins.PluginConfiguration]) – List of plugins to be used
  • file_shares (List[aztk.models.FileShare]) – List of File shares to be used
  • user_configuration (aztk.models.UserConfiguration) – Configuration of the user to be created on the master node to ssh into.
mixed_mode() → bool[source]
Returns:if the pool is using mixed mode(Both dedicated and low priority nodes)
class aztk.models.CustomScript(**kwargs)[source]

Bases: aztk.core.models.model.Model

class aztk.models.DockerConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

Configuration for connecting to private docker

Parameters:
  • endpoint (str) – Which docker endpoint to use. Default to docker hub.
  • username (str) – Docker endpoint username
  • password (str) – Docker endpoint password
class aztk.models.Enum[source]

Bases: object

Generic enumeration.

Derive from this class to define new enumerations.

class aztk.models.FileShare(**kwargs)[source]

Bases: aztk.core.models.model.Model

class aztk.models.Model(**kwargs)[source]

Bases: object

Base class for all aztk models

To implement model wide validation implement __validate__ method

__getstate__()[source]

For pickle serialization. This return the state of the model

__setstate__(state)[source]

For pickle serialization. This update the current model with the given state

validate()[source]

Validate the entire model

class aztk.models.PluginConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

Plugin manifest that should be returned in the main.py of your plugin

Args
name: Name of the plugin. Used to reference the plugin runOn: Where the plugin should run execute: Path to the file to execute(This must match the target of one of the files) files: List of files to upload args: List of arguments to pass to the executing script env: Dict of environment variables to pass to the script
class aztk.models.PluginFile(target: str = None, local_path: str = None)[source]

Bases: aztk.core.models.model.Model

Reference to a file for a plugin.

class aztk.models.PluginPort(**kwargs)[source]

Bases: aztk.core.models.model.Model

Definition for a port that should be opened on node :param internal: Port on the node :param public: [Optional] Port available to the user. If none won’t open any port to the user :param name: [Optional] name to differentiate ports if you have multiple

class aztk.models.PluginTarget[source]

Bases: enum.Enum

Where this plugin should run

class aztk.models.PluginTargetRole[source]

Bases: enum.Enum

An enumeration.

class aztk.models.PortForwardingSpecification(**kwargs)[source]

Bases: aztk.core.models.model.Model

class aztk.models.SchedulingTarget[source]

Bases: enum.Enum

Target where task will get scheduled. For spark this is where the driver will live.

Master = 'master'

Only master is allowed to run task

Dedicated = 'dedicated'

Any dedicated node is allowed to run task(Default)

Any = 'any'

Any node(Not recommended if using low pri)

class aztk.models.SecretsConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

class aztk.models.ServicePrincipalConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

Container class for AAD authentication

class aztk.models.SharedKeyConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

Container class for shared key authentication

class aztk.models.Software[source]

Bases: object

Enum with list of available softwares

class aztk.models.TextPluginFile(target: str, content: Union[str, _io.StringIO])[source]

Bases: aztk.core.models.model.Model

Reference to a file for a plugin.

Args: target (str): Where should the file be uploaded relative to the plugin working dir content (str|io.StringIO): Content of the file. Can either be a string or a StringIO

class aztk.models.Toolkit(**kwargs)[source]

Bases: aztk.core.models.model.Model

Toolkit for a cluster. This will help pick the docker image needed

Parameters:
  • software (str) – Name of the toolkit(spark)
  • version (str) – Version of the toolkit
  • environment (str) – Which environment to use for this toolkit
  • environment_version (str) – If there is multiple version for an environment you can specify which one
  • docker_repo (str) – Optional docker repo
class aztk.models.Union[source]

Bases: typing.Final

Union type; Union[X, Y] means either X or Y.

To define a union, use e.g. Union[int, str]. Details:

  • The arguments must be types and there must be at least one.

  • None as an argument is a special case and is replaced by type(None).

  • Unions of unions are flattened, e.g.:

    Union[Union[int, str], float] == Union[int, str, float]
    
  • Unions of a single argument vanish, e.g.:

    Union[int] == int  # The constructor actually returns int
    
  • Redundant arguments are skipped, e.g.:

    Union[int, str, int] == Union[int, str]
    
  • When comparing unions, the argument order is ignored, e.g.:

    Union[int, str] == Union[str, int]
    
  • When two arguments have a subclass relationship, the least derived argument is kept, e.g.:

    class Employee: pass
    class Manager(Employee): pass
    Union[int, Employee, Manager] == Union[int, Employee]
    Union[Manager, int, Employee] == Union[int, Employee]
    Union[Employee, Manager] == Employee
    
  • Corollary: if Any is present it is the sole survivor, e.g.:

    Union[int, Any] == Any
    
  • Similar for object:

    Union[int, object] == object
    
  • To cut a tie: Union[object, Any] == Union[Any, object] == Any.

  • You cannot subclass or instantiate a union.

  • You cannot write Union[X][Y] (what would it mean?).

  • You can use Optional[X] as a shorthand for Union[X, None].

class aztk.models.UserConfiguration(**kwargs)[source]

Bases: aztk.core.models.model.Model

aztk.spark package

aztk.spark.models package

aztk.spark.models.plugins package
aztk.spark.models.plugins.AptGetPlugin(packages=None)[source]
aztk.spark.models.plugins.CondaPlugin(packages=None)[source]
class aztk.spark.models.plugins.HDFSPlugin[source]

Bases: aztk.models.plugins.plugin_configuration.PluginConfiguration

aztk.spark.models.plugins.JupyterLabPlugin()[source]
aztk.spark.models.plugins.JupyterPlugin()[source]
aztk.spark.models.plugins.NvBLASPlugin()[source]
aztk.spark.models.plugins.OpenBLASPlugin()[source]
aztk.spark.models.plugins.PipPlugin(packages=None)[source]
aztk.spark.models.plugins.RStudioServerPlugin(version='1.1.383')[source]
class aztk.spark.models.plugins.ResourceMonitorPlugin[source]

Bases: aztk.models.plugins.plugin_configuration.PluginConfiguration

class aztk.spark.models.plugins.SimplePlugin[source]

Bases: aztk.models.plugins.plugin_configuration.PluginConfiguration

class aztk.spark.models.plugins.SparkUIProxyPlugin[source]

Bases: aztk.models.plugins.plugin_configuration.PluginConfiguration

aztk.spark.models.plugins.TensorflowOnSparkPlugin()[source]
class aztk.spark.models.Application(cloud_task: azure.batch.models.cloud_task.CloudTask)[source]

Bases: object

class aztk.spark.models.ApplicationConfiguration(name=None, application=None, application_args=None, main_class=None, jars=None, py_files=None, files=None, driver_java_options=None, driver_library_path=None, driver_class_path=None, driver_memory=None, executor_memory=None, driver_cores=None, executor_cores=None, max_retry_count=None)[source]

Bases: object

class aztk.spark.models.ApplicationLog(name: str, cluster_id: str, log: str, total_bytes: int, application_state: azure.batch.models.batch_service_client_enums.TaskState, exit_code: int)[source]

Bases: object

class aztk.spark.models.Cluster(pool: azure.batch.models.cloud_pool.CloudPool = None, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]

Bases: aztk.models.cluster.Cluster

is_pool_running_spark(pool: azure.batch.models.cloud_pool.CloudPool)[source]
class aztk.spark.models.ClusterConfiguration(*args, **kwargs)[source]

Bases: aztk.models.cluster_configuration.ClusterConfiguration

spark_configuration

Field is another model

Parameters:
  • model (aztk.core.models.Model) – Model object that field should be
  • merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
worker_on_master

Model Boolean field

class aztk.spark.models.CustomScript(**kwargs)[source]

Bases: aztk.models.custom_script.CustomScript

class aztk.spark.models.DockerConfiguration(**kwargs)[source]

Bases: aztk.models.secrets_configuration.DockerConfiguration

class aztk.spark.models.File(name: str, payload: _io.StringIO)[source]

Bases: aztk.models.file.File

class aztk.spark.models.FileShare(**kwargs)[source]

Bases: aztk.models.file_share.FileShare

class aztk.spark.models.Job(cloud_job_schedule: azure.batch.models.cloud_job_schedule.CloudJobSchedule, cloud_tasks: List[azure.batch.models.cloud_task.CloudTask] = None, pool: azure.batch.models.cloud_pool.CloudPool = None, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]

Bases: object

class aztk.spark.models.JobConfiguration(id=None, applications=None, vm_size=None, custom_scripts=None, spark_configuration=None, toolkit=None, max_dedicated_nodes=0, max_low_pri_nodes=0, subnet_id=None, scheduling_target: aztk.models.scheduling_target.SchedulingTarget = None, worker_on_master=None)[source]

Bases: object

to_cluster_config()[source]
mixed_mode() → bool[source]
get_docker_repo() → str[source]
validate() → bool[source]

Validate the config at its current state. Raises: Error if invalid

class aztk.spark.models.JobState[source]

Bases: object

complete = 'completed'
active = 'active'
completed = 'completed'
disabled = 'disabled'
terminating = 'terminating'
deleting = 'deleting'
class aztk.spark.models.List[source]

Bases: list, typing.MutableSequence

class aztk.spark.models.Model(**kwargs)[source]

Bases: object

Base class for all aztk models

To implement model wide validation implement __validate__ method

__getstate__()[source]

For pickle serialization. This return the state of the model

__setstate__(state)[source]

For pickle serialization. This update the current model with the given state

validate()[source]

Validate the entire model

merge(other)[source]
classmethod from_dict(val: dict)[source]
to_dict()[source]
class aztk.spark.models.PluginConfiguration(**kwargs)[source]

Bases: aztk.models.plugins.plugin_configuration.PluginConfiguration

class aztk.spark.models.PortForwardingSpecification(**kwargs)[source]

Bases: aztk.models.port_forward_specification.PortForwardingSpecification

class aztk.spark.models.RemoteLogin(ip_address, port)[source]

Bases: aztk.models.remote_login.RemoteLogin

class aztk.spark.models.SchedulingTarget[source]

Bases: enum.Enum

Target where task will get scheduled. For spark this is where the driver will live.

Master = 'master'

Only master is allowed to run task

Dedicated = 'dedicated'

Any dedicated node is allowed to run task(Default)

Any = 'any'

Any node(Not recommended if using low pri)

class aztk.spark.models.SecretsConfiguration(**kwargs)[source]

Bases: aztk.models.secrets_configuration.SecretsConfiguration

class aztk.spark.models.ServicePrincipalConfiguration(**kwargs)[source]

Bases: aztk.models.secrets_configuration.ServicePrincipalConfiguration

class aztk.spark.models.SharedKeyConfiguration(**kwargs)[source]

Bases: aztk.models.secrets_configuration.SharedKeyConfiguration

class aztk.spark.models.SparkConfiguration(*args, **kwargs)[source]

Bases: aztk.core.models.model.Model

spark_defaults_conf

Model String field

spark_env_sh

Model String field

core_site_xml

Model String field

jars

Field that should be a list

class aztk.spark.models.SparkToolkit(version: str, environment: str = None, environment_version: str = None)[source]

Bases: aztk.models.toolkit.Toolkit

class aztk.spark.models.UserConfiguration(**kwargs)[source]

Bases: aztk.models.user_configuration.UserConfiguration

class aztk.spark.models.VmImage(publisher, offer, sku)[source]

Bases: aztk.models.vm_image.VmImage

aztk.spark.client module

class aztk.spark.client.Client(secrets_config: aztk.models.secrets_configuration.SecretsConfiguration)[source]

Bases: aztk.client.Client

Aztk Spark Client This is the main entry point for using aztk for spark

Parameters:secrets_config (aztk.spark.models.models.SecretsConfiguration) – Configuration with all the needed credentials
create_cluster(cluster_conf: aztk.spark.models.models.ClusterConfiguration, wait: bool = False)[source]

Create a new aztk spark cluster

Parameters:
  • cluster_conf (aztk.spark.models.models.ClusterConfiguration) – Configuration for the the cluster to be created
  • wait (bool) – If you should wait for the cluster to be ready before returning
Returns:

aztk.spark.models.Cluster

create_clusters_in_parallel(cluster_confs)[source]
delete_cluster(cluster_id: str, keep_logs: bool = False)[source]
get_cluster(cluster_id: str)[source]
list_clusters()[source]
get_remote_login_settings(cluster_id: str, node_id: str)[source]
submit(cluster_id: str, application: aztk.spark.models.models.ApplicationConfiguration, remote: bool = False, wait: bool = False)[source]
submit_all_applications(cluster_id: str, applications)[source]
wait_until_application_done(cluster_id: str, task_id: str)[source]
wait_until_applications_done(cluster_id: str)[source]
wait_until_cluster_is_ready(cluster_id: str)[source]
wait_until_all_clusters_are_ready(clusters: List[str])[source]
create_user(cluster_id: str, username: str, password: str = None, ssh_key: str = None) → str[source]
get_application_log(cluster_id: str, application_name: str, tail=False, current_bytes: int = 0)[source]
get_application_status(cluster_id: str, app_name: str)[source]
cluster_run(cluster_id: str, command: str, host=False, internal: bool = False, timeout=None)[source]
node_run(cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None)[source]
cluster_copy(cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None)[source]
cluster_download(cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None)[source]
cluster_ssh_into_master(cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False)[source]
submit_job(job_configuration: aztk.spark.models.models.JobConfiguration)[source]
list_jobs()[source]
list_applications(job_id)[source]
get_job(job_id)[source]
stop_job(job_id)[source]
delete_job(job_id: str, keep_logs: bool = False)[source]
get_application(job_id, application_name)[source]
get_job_application_log(job_id, application_name)[source]
stop_job_app(job_id, application_name)[source]
wait_until_job_finished(job_id)[source]
wait_until_all_jobs_finished(jobs)[source]
run_cluster_diagnostics(cluster_id, output_directory=None)[source]

aztk.client module

class aztk.client.Client(secrets_config: aztk.models.secrets_configuration.SecretsConfiguration)[source]

Bases: object

get_cluster_config(cluster_id: str) → aztk.models.cluster_configuration.ClusterConfiguration[source]
create_cluster(cluster_conf, wait: bool = False)[source]
create_clusters_in_parallel(cluster_confs)[source]
delete_cluster(cluster_id: str)[source]
get_cluster(cluster_id: str)[source]
list_clusters()[source]
wait_until_cluster_is_ready(cluster_id)[source]
create_user(cluster_id: str, username: str, password: str = None, ssh_key: str = None) → str[source]
get_remote_login_settings(cluster_id, node_id)[source]
cluster_run(cluster_id, command)[source]
cluster_copy(cluster_id, source_path, destination_path)[source]
cluster_download(cluster_id, source_path, destination_path)[source]
submit_job(job)[source]

aztk.error module

Contains all errors used in Aztk. All error should inherit from AztkError

exception aztk.error.AztkError[source]

Bases: Exception

exception aztk.error.AztkAttributeError[source]

Bases: aztk.error.AztkError

exception aztk.error.ClusterNotReadyError[source]

Bases: aztk.error.AztkError

exception aztk.error.AzureApiInitError[source]

Bases: aztk.error.AztkError

exception aztk.error.InvalidPluginConfigurationError[source]

Bases: aztk.error.AztkError

exception aztk.error.InvalidModelError(message: str, model=None)[source]

Bases: aztk.error.AztkError

exception aztk.error.MissingRequiredAttributeError(message: str, model=None)[source]

Bases: aztk.error.InvalidModelError

exception aztk.error.InvalidCustomScriptError(message: str, model=None)[source]

Bases: aztk.error.InvalidModelError

exception aztk.error.InvalidPluginReferenceError(message: str, model=None)[source]

Bases: aztk.error.InvalidModelError

exception aztk.error.InvalidModelFieldError(message: str, model=None, field=None)[source]

Bases: aztk.error.InvalidModelError

Writing docs

Docs are located in the docs folder. We are using sphinx to generate the docs and then hosting them on readthedocs.

Start docs autobuild to test locally

sphinx-autobuild docs docs/_build/html --watch aztk

Open docs/_build/index.html

Publish the docs

Docs should be published automatically to read the docs as soon as you push to master under the latest tag. You when creating a git tag readthedocs can also build that one.

Writing a model

Getting started

In aztk/models create a new file with the name of your model my_model.py

In aztk/models/__init__.py add from .my_model import MyModel

Create a new class MyModel that inherit Modle

from aztk.core.models import Model, fields

class MyModel(Model):
    """
    MyModel is an sample model

    Args:
        input1 (str): This is the first input
    """

    input1 = fields.String()

    def __validate__(self):
        pass

Available fields types

Check aztk/core/models/fields.py for the sources

  • Field: Base field class
  • String: Field that validate it is given a string
  • Integer: Field that validate it is given a int
  • Float: Field that validate it is given a float
  • Boolean: Field that validate it is given a boolean
  • List: Field that validate it is given a list and can also automatically convert entries to the given model type.
  • Model: Field that map to another model. If passed a dict it will automatically try to convert to the Model type
  • Enum: Field which value should be an enum. It will convert automatically to the enum if given the value.

Add validation

The fields provide basic validation automatically. A field without a default will be marked as required.

To provide model wide validation implement a __validate__ method and raise a InvalidModelError if there is any problems with the values

def __validate__(self):
    if 'secret' in self.input1:
        raise InvalidModelError("Input1 contains secrets")

Convert dict to model

When inheriting from Model it comes with a from_dict class method which allows to convert a dict to this class

Tests

AZTK comes with a testing library that can be used for verification, and debugging. Please note that some tests will provision and test real resources in Azure, and as a result, will cost money to run. See Integration Tests for more details.

Integration Tests

Integration tests use the credentials given in your .aztk/secrets.yaml file to spin up real Clusters and Jobs to verify the functionality of the library. Please note that these tests will cost money to run. All created Clusters nad Jobs will be deleted when the test completes.

Since each integration test spins up a Cluster or Job, you may want to run the tests in parallel to reduce the time needed to complete the testing library:

pytest $path_to_repo_root -n <5>

Note: $path_to_repo_root represents the path to the root of the aztk repository, and is only required if you are running the tests from a different location.

Please note that the number passed to the -n flag determines the number of tests you wish to run in parallel. Parallelizing the tests will increase the number of CPU cores used at one time, so please verify that you have the available core quota in your Batch account.

Indices and tables