Azure Distributed Data Engineering Toolkit¶
Azure Distributed Data Engineering Toolkit (AZTK) is a python CLI application for provisioning on-demand Spark on Docker clusters in Azure. It’s a cheap and easy way to get up and running with a Spark cluster, and a great tool for Spark users who want to experiment and start testing at scale.
This toolkit is built on top of Azure Batch but does not require any Azure Batch knowledge to use.
Getting Started¶
The minimum requirements to get started with this package are:
- Python 3.5+, pip 9.0.1+
- An Azure account
- An Azure Batch account
- An Azure Storage account
Cloning and installing the project¶
Clone the repo
Make sure you are running python 3.5 or greater. If the default version on your machine is python 2 make sure to run the following commands with pip3 instead of pip.
Install
aztk
:pip install -e .
Initialize your environment:
Navigate to the directory you wish to use as your spark development environment, and run:
aztk spark init
This will create a .aztk folder with preset configuration files in your current working directory.
If you would like to initialize your AZTK clusters with a specific development toolset, please pass one of the following flags:
aztk spark init --python aztk spark init --R aztk spark init --scala aztk spark init --java
If you wish to have global configuration files that will be read regardless of your current working directory, run:
aztk spark init --global
This will put default configuration files in your home directory, ~/. Please note that configuration files in your current working directory will take precident over global configuration files in your home directory.
Setting up your accounts¶
Using the account setup script¶
A script to create and configure the Azure resources required to use aztk
is provided. For more more information and usage, see Getting Started Script
Manual resource creation¶
To finish setting up, you need to fill out your Azure Batch and Azure Storage secrets in .aztk/secrets.yaml. We’d also recommend that you enter SSH key info in this file too.
Please note that if you use ssh keys and a have a non-standard ssh key file name or path, you will need to specify the location of your ssh public and private keys. To do so, set them as shown below:
default:
# SSH keys used to create a user and connect to a server.
# The public key can either be the public key itself(ssh-rsa ...) or the path to the ssh key.
# The private key must be the path to the key.
ssh_pub_key: ~/.ssh/my-public-key.pub
ssh_priv_key: ~/.ssh/my-private-key
Log into Azure If you do not already have an Azure account, go to https://azure.microsoft.com/ to get started for free today.
Once you have one, simply log in and go to the Azure Portal to start creating your Azure Batch account and Azure Storage account.
Using AAD¶
To get the required keys for your Azure Active Directory (AAD) Service Principal, Azure Batch Account and Azure Storage Account, please follow these instructions. Note that this is the recommended path for use with AZTK, as some features require AAD and are disabled if using Shared Key authentication.
- Register an Azure Active Directory (AAD) Application
- Navigate to Azure Active Direcotry by searching in “All Services”. Click “Properties” and record the value in the “Directory ID” field. This is your tenant ID.
- Navigate to App Registrations by searching in “All Services”.
- Click the “+ New application registration” option at the top left of the window. Fill in the necessary fields for the “Create” form. For “Application type” use “Web app/ API.”
- Click on the newly created App to reveal more info. Record the Application ID (for use as Client ID). Then click “Settings”, then “Keys.” Create a new password using the provided form, ensure to copy and save the password as it will only be revealed once. This password is used as the credential in secrets.yaml.
- Create a Storage Account
- Click the ‘+’ button at the top left of the screen and search for ‘Storage’. Select ‘Storage account - blob, file, table, queue’ and click ‘Create’
- Fill in the form and create the Storage account.
- Record the Storage account’s resource ID.
- Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.
- Create a Batch Acccount
- Click the ‘+’ button at the top left of the screen and search for ‘Compute’. Select ‘Batch’ and click ‘Create’
- Fill in the form and create the Batch account.
- Navigate to your newly created Batch Account and record it’s resource ID by clicking “Properties” and copying.
- Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.
- Save your account credentials into the secrets.yaml file
- Open the secrets.yaml file in the .aztk folder in your current working directory (if .aztk doesn’t exist, run
aztk spark init
). Fill in all of the fields as described below. - Fill in the Service princripal block with your recorded values as shown below:
service_principal:
tenant_id: <AAD Diretory ID>
client_id: <AAD App Application ID>
credential: <AAD App Password>
batch_account_resource_id: </batch/account/resource/id>
storage_account_resource_id: </storage/account/resource/id>
Next Steps¶
Getting Started Script¶
The provided account setup script creates and configures all of the required Azure resources.
The script will create and configure the following resources:
- Resource group
- Storage account
- Batch account
- Azure Active Directory application and service principal
The script outputs all of the necessary information to use aztk
, just copy the output into the .aztk/secrets.yaml
file created when running aztk spark init
.
Usage¶
Copy and paste the following into an Azure Cloud Shell:
wget -q https://raw.githubusercontent.com/Azure/aztk/v0.7.0/account_setup.sh &&
chmod 755 account_setup.sh &&
/bin/bash account_setup.sh
A series of prompts will appear, and you can set the values you desire for each field. Default values appear in brackets []
and will be used if no value is provided.
Azure Region [westus]:
Resource Group Name [aztk]:
Storage Account Name [aztkstorage]:
Batch Account Name [aztkbatch]:
Active Directory Application Name [aztkapplication]:
Active Directory Application Credential Name [aztk]:
Once the script has finished running you will see the following output:
service_principal:
tenant_id: <AAD Diretory ID>
client_id: <AAD App Application ID>
credential: <AAD App Password>
batch_account_resource_id: </batch/account/resource/id>
storage_account_resource_id: </storage/account/resource/id>
Copy the entire service_principal
section in your .aztk/secrets.yaml
. If you do not have a secrets.yaml
file, you can create one in your current working directory by running aztk spark init
.
Now you are ready to create your first aztk
cluster. See Creating a Cluster.
Clusters¶
In the Azure Distributed Data Engineering Toolkit, a cluster is primarily designed to run Spark jobs. This document describes how to create a cluster to use for Spark jobs. Alternitavely for getting started and debugging you can also use the cluster in interactive mode which will allow you to log into the master node and interact with the cluster from there.
Creating a Cluster¶
Creating a Spark cluster only takes a few simple steps after which you will be able to SSH into the master node of the cluster and interact with Spark. You will be able to view the Spark Web UI, Spark Jobs UI, submit Spark jobs (with spark-submit), and even interact with Spark in a Jupyter notebook.
For the advanced user, please note that the default cluster settings are preconfigured in the .aztk/cluster.yaml file that is generated when you run aztk spark init
. More information on cluster config here.
Commands¶
Create a Spark cluster:
aztk spark cluster create --id <your_cluster_id> --vm-size <vm_size_name> --size <number_of_nodes>
For example, to create a cluster of 4 Standard_A2 nodes called ‘spark’ you can run:
aztk spark cluster create --id spark --vm-size standard_a2 --size 4
You can find more information on VM sizes here. Please note that you must use the official SKU name when setting your VM size - they usually come in the form: “standard_d2_v2”.
Note: The cluster id (--id
) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each cluster must have a unique cluster id.
By default, you cannot create clusters of more than 20 cores in total. Visit this page to request a core quota increase.
Low priority nodes¶
You can create your cluster with low-priority VMs at an 80% discount by using --size-low-pri
instead of --size
. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.
Mixed Mode¶
You can create clusters with a mixed of low-priority and dedicated VMs to reach the optimal balance of price and availability. In Mixed Mode, your cluster will have both dedicated instances and low priority instances. To mimize the potential impact on your Spark workloads, the Spark master node will always be provisioned on one of the dedicated nodes while each of the low priority nodes will be Spark workers.
Please note, to use Mixed Mode clusters, you need to authenticate using Azure Active Directory (AAD) by configuring the Service Principal in .aztk/secrets.yaml
. You also need to create a Virtual Network (VNET), and provide the resource ID to a Subnet within the VNET in your ./aztk/cluster.yaml` configuration file.
Setting your Spark and/or Python versions¶
By default, the Azure Distributed Data Engineering Toolkit will use Spark v2.2.0 and Python v3.5.4. However, you can set your Spark and/or Python versions by configuring the base Docker image used by this package.
Listing clusters¶
You can list all clusters currently running in your account by running
aztk spark cluster list
Viewing a cluster¶
To view details about a particular cluster run:
aztk spark cluster get --id <your_cluster_id>
Note that the cluster is not fully usable until a master node has been selected and it’s state is idle
.
For example here cluster ‘spark’ has 2 nodes and node tvm-257509324_2-20170820t200959z
is the mastesr and ready to run a job.
Cluster spark
------------------------------------------
State: active
Node Size: standard_a2
Nodes: 2
| Dedicated: 2
| Low priority: 0
Nodes | State | IP:Port | Master
------------------------------------|-----------------|----------------------|--------
tvm-257509324_1-20170820t200959z | idle | 40.83.254.90:50001 |
tvm-257509324_2-20170820t200959z | idle | 40.83.254.90:50000 | *
Deleting a cluster¶
To delete a cluster run:
aztk spark cluster delete --id <your_cluster_id>
Deleting a cluster also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs
flag.
You are charged for the cluster as long as the nodes are provisioned in your account. Make sure to delete any clusters you are not using to avoid unwanted costs.
Run a command on all nodes in the cluster¶
To run a command on all nodes in the cluster, run:
aztk spark cluster run --id <your_cluster_id> "<command>"
The command is executed through an SSH tunnel.
Copy a file to all nodes in the cluster¶
To securely copy a file to all nodes, run:
aztk spark cluster copy --id <your_cluster_id> --source-path </path/to/local/file> --dest-path </path/on/node>
The file will be securely copied to each node using SFTP.
Interactive Mode¶
All other interaction with the cluster is done via SSH and SSH tunneling. If you didn’t create a user during cluster create (aztk spark cluster create
), the first step is to enable to add a user to the master node.
Make sure that the .aztk/secrets.yaml file has your SSH key (or path to the SSH key), and it will automatically use it to make the SSH connection.
aztk spark cluster add-user --id spark --username admin
Alternatively, you can add the SSH key as a parameter when running the add-user
command.
aztk spark cluster add-user --id spark --username admin --ssh-key <your_key_OR_path_to_key>
You can also use a password to create your user:
aztk spark cluster add-user --id spark --username admin --password <my_password>
Using a SSH key is the recommended method.
SSH and Port Forwarding¶
After a user has been created, SSH into the Spark container on the master node with:
aztk spark cluster ssh --id spark --username admin
If you would like to ssh into the host instead of the Spark container on it, run:
aztk spark cluster ssh --id spark --username admin --host
If you ssh into the host and wish to access the running Docker Spark environment, you can run the following:
sudo docker exec -it spark /bin/bash
Now that you’re in, you can change directory to your familiar $SPARK_HOME
cd $SPARK_HOME
Debugging your Spark Cluster¶
If your cluster is in an unknown or unusbale state, you can debug by running:
aztk spark cluster debug --id <cluster-id> --output </path/to/output/directory/>
The debug utility will pull logs from all nodes in the cluster. The utility will check for:
- free diskspace
- docker image status
- docker container status
- docker container logs
- docker container process status
- aztk code & version
- spark component logs (master, worker, shuffle service, history server, etc) from $SPARK_HOME/logs
- spark application logs from $SPARK_HOME/work
Please be careful sharing the output of the debug
command as secrets and application code are present in the output.
Interact with your Spark cluster¶
By default, the aztk spark cluster ssh
command port forwards the Spark Web UI to localhost:8080, Spark Jobs UI to localhost:4040, and Spark History Server to your locahost:18080. This can be configured in .aztk/ssh.yaml.
Custom scripts¶
Custom scripts are DEPRECATED. Use plugins instead.
Custom scripts allow for additional cluster setup steps when the cluster is being provisioned. This is useful if you want to install additional software, and if you need to modify the default cluster configuration for things such as modifying spark.conf, adding jars or downloading any files you need in the cluster.
You can specify the location of custom scripts on your local machine in .aztk/cluster.yaml
. If you do not have a .aztk/
directory in you current working directory, run aztk spark init
or see Getting Started. Note that the path can be absolute or relative to your current working directory.
The custom scripts can be configured to run on the Spark master only, the Spark workers only, or all nodes in the cluster (Please note that by default, the Spark master node is also a Spark worker). For example, the following custom script configuration will run 3 custom scripts in the order they are provided:
custom_scripts:
- script: ./custom-scripts/simple.sh
runOn: all-nodes
- script: ./custom-scripts/master-only.sh
runOn: master
- script: ./custom-scripts/worker-only.sh
runOn: worker
The first script, simple.sh, will run on all nodes and will be executed first. The next script, master-only.sh will run only on nodes that are Spark masters and after simple.sh. The next script, worker-only.sh, will run last and only on nodes that are Spark workers.
Directories may also be provided in the custom_scripts section of .aztk/cluster.yaml
.
custom_scripts:
- script: /custom-scripts/
runOn: all-nodes
The above configuration takes the absolute path /custom-scripts/
and uploads every file within it. These files will all be executed, although order of exection is not guarenteed. If your custom scripts have dependencies, specify the order by providing the full path to the file as seen in the first example.
Scripting considerations¶
- The default OS is Ubuntu 16.04.
- The scripts run on the specified nodes in the cluster after Spark has been installed.
- The scripts execute in the order provided
- If a script directory is provided, order of execution is not guarenteed
- The environment variable $SPARK_HOME points to the root Spark directory.
- The environment variable $IS_MASTER identifies if this is the node running the master role. The node running the master role also runs a worker role on it.
- The Spark cluster is set up using Standalone Mode
Provided Custom Scripts¶
HDFS¶
A custom-script to install HDFS (2.8.2) is provided at custom-scripts/hdfs.sh
directory. This will install and provision HDFS for your cluster.
To enable HDFS, add this snippet to the custom_scripts section of your .aztk/cluster.yaml
configuration file:
custom_scripts:
- script: ./custom-scripts/hdfs.sh
runOn: all-nodes
When SSHing into the cluster, you will have access to the Namenode UI at the default port 50070. This port can be changed in the ssh.yaml file in your .aztk/
directory, or by passing the --namenodeui
flag to the aztk spark cluster ssh
command.
When enabled on the cluster, HDFS can be used to read or write data locally during program execution.
Docker¶
Azure Distributed Data Engineering Toolkit runs Spark on Docker.
Supported Azure Distributed Data Engineering Toolkit images are hosted publicly on Docker Hub.
By default, the aztk/spark:v0.1.0-spark2.3.0-base
image will be used.
To select an image other than the default, you can set your Docker image at cluster creation time with the optional –docker-repo parameter:
aztk spark cluster create ... --docker-repo <name_of_docker_image_repo>
For example, if I wanted to use Spark v2.2.0, I could run the following cluster create command:
aztk spark cluster create ... --docker-repo aztk/base:spark1.6.3
Using a custom Docker Image¶
You can build your own Docker image on top or beneath one of our supported base images OR you can modify the supported Dockerfiles and build your own image that way.
Once you have your Docker image built and hosted publicly, you can then use the –docker-repo parameter in your aztk spark cluster create command to point to it.
Using a custom Docker Image that is Privately Hosted¶
To use a private docker image you will need to provide a docker username and password that have access to the repository you want to use.
In .aztk/secrets.yaml
setup your docker config
docker:
username: <myusername>
password: <mypassword>
If your private repository is not on docker hub (Azure container registry for example) you can provide the endpoint here too
docker:
username: <myusername>
password: <mypassword>
endpoint: <https://my-custom-docker-endpoint.com>
Building Your Own Docker Image¶
Building your own Docker Image provides more customization over your cluster’s environment. For some, this may look like installing specific, and even private, libraries that their Spark jobs require. For others, it may just be setting up a version of Spark, Python or R that fits their particular needs.
The Azure Distributed Data Engineering Toolkit supports custom Docker images. To guarantee that your Spark deployment works, we recommend that you build on top of one of our supported images.
To build your own image, can either build on top or beneath one of our supported images OR you can just modify one of the supported Dockerfiles to build your own.
Building on top¶
You can build on top of our images by referencing the aztk/spark image in the FROM keyword of your Dockerfile:
# Your custom Dockerfile
FROM aztk/spark:v0.1.0-spark2.3.0-base
...
Building beneath¶
To build beneath one of our images, modify one of our Dockerfiles so that the FROM keyword pulls from your Docker image’s location (as opposed to the default which is a base Ubuntu image):
# One of the Dockerfiles that AZTK supports
# Change the FROM statement to point to your hosted image repo
FROM my_username/my_repo:latest
...
Please note that for this method to work, your Docker image must have been built on Ubuntu.
Custom Docker Image Rquirements¶
If you are building your own custom image and not building on top of a supported image, the following requirements are necessary.
Please make sure that the following environment variables are set:
- AZTK_DOCKER_IMAGE_VERSION
- JAVA_HOME
- SPARK_HOME
You also need to make sure that PATH is correctly configured with $SPARK_HOME
- PATH=$SPARK_HOME/bin:$PATH
By default, these are set as follows:
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64
ENV SPARK_HOME /home/spark-current
ENV PATH $SPARK_HOME/bin:$PATH
If you are using your own version of Spark, make that it is symlinked by “/home/spark-current”. $SPARK_HOME, must also point to “/home/spark-current”.
Configuration Files¶
This section refers to the files in the directory .aztk that are generated from the aztk spark init
command.
cluster.yaml¶
The core settings for a cluster are configured in the cluster.yaml file. Once you have set your desired values in .aztk/cluster.yaml, you can create a cluster using aztk spark cluster create
.
This is the default cluster configuration:
# id: <id of the cluster to be created>
id: spark_cluster
# Toolkit configuration [Required] You can use `aztk toolkit` command to find which are the available tookits
toolkit:
software: spark
version: 2.2
# environment: python
# Optional version for the environment
# environment_version:
# Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environemnt if using default images)
# docker_repo: <name of docker image repo (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>
# vm_size: <vm-size, see available options here: https://azure.microsoft.com/pricing/details/batch//>
vm_size: standard_a2
# size: <number of dedicated nodes in the cluster, not that clusters must contain all dedicated or all low priority nodes>
size: 2
# size_low_pri: <number of low priority nodes in the cluster, mutually exclusive with size setting>
# username: <username for the linux user to be created> (optional)
username: spark
# Enable plugins
plugins:
# - name: spark_ui_proxy
# - name: jupyterlab
# - name: jupyter
# - name: hdfs
# - name: rstudio_server
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true
# wait: <true/false>
wait: true
Running aztk spark cluster create
will create a cluster of 4 Standard_A2 nodes called ‘spark_cluster’ with a linux user named ‘spark’. This is equivalent to running the command
aztk spark cluster create --id spark --vm-size standard_a2 --size 4 --username spark --wait
NOTE: This assumes that your SSH-key is configured in the .aztk/secrets.yaml file.
ssh.yaml¶
This is the default ssh cluster configuration:
# username: <name of the user account to ssh into>
username: spark
# job_ui_port: <local port where the job ui is forwarded to>
job_ui_port: 4040
# job_history_ui_port: <local port where the job history ui is forwarded to>
job_history_ui_port: 18080
# web_ui_port: <local port where the spark master web ui is forwarded to>
web_ui_port: 8080
# jupyter_port: <local port which where jupyter is forwarded to>
jupyter_port: 8888
# name_node_ui_port: <local port which where Name Node UI is forwarded to>
name_node_ui_port: 50070
# rstudio_server_port: <local port which where rstudio server is forwarded to>
rstudio_server_port: 8787
# connect: <true/false, connect to spark master or print connection string (--no-connect)>
connect: true
Running the command aztk spark cluster ssh --id <cluster_id>
will ssh into the master node of the Spark cluster. It will also forward the Spark Job UI to localhost:4040, the Spark master’s web UI to localhost:8080, and Jupyter to localhost:8888.
Note that all of the settings in ssh.yaml will be overrided by parameters passed on the command line.
Spark Configuration¶
The repository comes with default Spark configuration files which provision your Spark cluster just the same as you would locally. After running aztk spark init
to initialize your working environment, you can view and edit these files at .aztk/spark-defaults.conf
, .aztk/spark-env.sh
and .aztk/core-site.xml
. Please note that you can bring your own Spark configuration files by copying your spark-defaults.conf
, spark-env.sh
and core-site.xml
into your .aztk/
direcotry.
If using aztk
job submission, please note that both spark.shuffle.service.enabled
and spark.dynamicAllocation.enabled
must be set to true so that the number of executors registered with an application can scale as nodes in the job’s cluster come online.
The following settings available in spark-defaults.conf
and spark-env.sh
are not supported:
spark-env.sh
:
- SPARK_LOCAL_IP
- SPARK_PUBLIC_DNS
- SPARK_MASTER_HOST
- SPARK_MASTER_PORT
- SPARK_WORKER_PORT
- SPARK_MASTER_WEBUI_PORT
- Any options related to YARN client mode or Mesos
spark-defaults.conf
:
- spark.master
History Server¶
If you want to use Spark’s history server, please set the following values in your .aztk/spark-defaults.conf
file:
spark.eventLog.enabled true
spark.eventLog.dir <path>
spark.history.fs.logDirectory <path>
Please note that the path for spark.eventLog.dir
and spark.history.fs.logDirectory
should most likely match so that the history server reads the logs that each Spark job writes. Also note that while the paths can be local (file:/
), it is recommended that the paths be accessible by every node in the cluster so that the history server, which runs on the Spark master node, has access to all application logs. HDFS, WASB, ADL, or any other Hadoop API compliant storage system may be used.
If using WASB, ADL or other cloud storage services, be sure to set your keys in .aztk/core-site.xml
. For more information, see the Cloud Storage documentation.
Configuring Spark Storage¶
The Spark cluster can be configured to use different cloud supported storage offerrings (such as Azure Storage Blobs, Azure Data Lake Storage, or any other supported Spark file system). More information can be found in the Cloud Storage documentation.
Placing JARS¶
Additional JAR files can be added to the cluster by simply adding them to the .aztk/jars directory. These JARS will automatically be added to Spark’s default JAR directory. In the case of a naming conflict, the file in .aztk/jars will overwrite the file in the cluster. Typically new JARS must be registered with Spark. To do this, either run the Spark Submit command with a path to the JARS
aztk spark cluster submit --id <my_cluster_id> --jars $SPARK_HOME/jars/my_jar_file_1.jar <my_application> <my_parameters>
Or update the .aztk/spark-default.conf file as shown below to have it registered for all Spark applications.
spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar
Note: This tool automatically registers several JARS for default cloud storage in the spark-default.conf file. If you want to modify this file, simply append any additional JARS to the end of this list.
Next Steps¶
Azure Files¶
The ability to load a file share on the cluster is really useful when you want to be able to share data across all the nodes, and/or want that data to be persisted longer than the lifetime of the cluster. Azure Files provides a very easy way to mount a share into the cluster and have it accessible to all nodes. This is useful in cases where you have small data sets you want to process (less than 1GB) or have notebooks that you want to re-use between clusters.
Mounting an Azure Files share in the cluster only required updating the cluster.yaml file at .aztk/cluster.yaml
. For example, the following configuration will load two files shares into the cluster, one with my notebooks and one will a small data set that I have previously uploaded to Azure Files.
azure_files:
- storage_account_name: STORAGE_ACCOUNT_NAME
storage_account_key: STORAGE_ACCOUNT_KEY
# Name of the file share in Azure Files
file_share_path: data
# Mount point on the node in the cluster
mount_path: /mnt/data
- storage_account_name: STORAGE_ACCOUNT_NAME
storage_account_key: STORAGE_ACCOUNT_KEY
# Name of the file share in Azure Files
file_share_path: notebooks
# Mount point on the node in the cluster
mount_path: /mnt/notebooks
From the cluster I can now access both of these file shares directly simply by navigating to /mnt/data or /mnt/notebooks respectively.
Plugins¶
Plugins are a successor to custom scripts and are the reconmmended way of running custom code on the cluster.
Plugins can either be one of the Aztk supported plugins or the path to a local file.
Supported Plugins¶
AZTK ships with a library of default plugins that enable auxillary services to use with your Spark cluster.
Currently the following plugins are supported:
- JupyterLab
- Jupyter
- HDFS
- RStudioServer
- Spark UI Proxy
Enable a plugin using the CLI¶
If you are uing the aztk
CLI and wish to enable a supported plugin, you need to update you .aztk/cluster.yaml
configuration file.
Add or uncomment the plugins
section and set the plugins you desire to enable as follows:
plugins:
- name: jupyterlab
- name: jupyter
- name: hdfs
- name: spark_ui_proxy
- name: rsutio_server
args:
version: "1.1.383"
Enable a plugin using the SDK¶
If you are uing the aztk
SDK and wish to enable a supported plugin, you need to import the necessary plugins from the aztk.spark.models.plugin
module and add them to your ClusterConfiguration object’s plugin list:
from aztk.spark.models.plugins import RStudioServerPlugin, HDFSPlugin
cluster_config = ClusterConfiguration(
...# Other config,
plugins=[
JupyterPlugin(),
RStudioServerPlugin(version="1.1.383"),
HDFSPlugin(),
]
)
Custom script plugin¶
This allows you to run your custom code on the cluster
Run a custom script plugin with the CLI¶
Example¶
plugins:
- script: path/to/my/script.sh
- name: friendly-name
script: path/to/my-other/script.sh
target: host
target_role: all-nodes
Options¶
script
: Required Path to the script you want to runname
: Optional Friendly name. By default will be the name of the script filetarget
: Optional Target on where to run the plugin(Default:spark-container
). Can bespark-container
orhost
target_role
: Optional What should be the role of the node where this script run(Default:master
). Can bemaster
,worker
orall-nodes
Submitting an Application¶
Submitting a job to your Spark cluster in this package mimics the experience of a typical standalone cluster. A spark job will be submitted to the system and run to completion.
Spark-Submit¶
The spark-submit experience is mostly the same as any regular Spark cluster with a few minor differences. You can take a look at aztk spark cluster --help
for more detailed information and options.
Run a Spark job:
aztk spark cluster submit --id <name_of_spark_cluster> --name <name_of_spark_job> <executable> <executable_params>
For example, run a local pi.py file on a Spark cluster
aztk spark cluster submit --id spark --name pipy examples/src/main/python/pi.py 100
NOTE: The job name (–name) must be atleast 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters. Each job you submit must have a unique name.
Monitoring job¶
If you have set up a SSH tunnel with port fowarding, you can naviate to http://localhost:8080 and http://localhost:4040 to view the progess of the job using the Spark UI
Getting output logs¶
The default setting when running a job is –wait. This will simply submit a job to the cluster and wait for the job to run. If you want to just submit the job and not wait, use the –no-wait flag and tail the logs manually:
aztk spark cluster submit --id spark --name pipy --no-wait examples/src/main/python/pi.py 1000
aztk spark cluster app-logs --id spark --name pipy --tail
Cloud storage¶
Cloud stoarge for spark enables you to have a persisted storage system backed by a cloud provider. Spark supports this by placing the appropriate storage jars and updating the core-site.xml file accordingly.
Azure Storage Blobs (WASB)¶
Pre-built into this package is native support for connecting your Spark cluster to Azure Blob Storage (aka WASB). The required WASB jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.
To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using WASB filled out in the .aztk/core-site.xml file. Simply uncomment the in the “Azure Storage Blobs (WASB)” section and fill out the properties for MY_STORAGE_ACCOUNT_NAME, MY_STORAGE_ACCOUNT_SUFFIX and MY_STORAGE_ACCOUNT_KEY.
Once you have correctly filled out the .aztk/core-site.xml with your storage credentials, you will be able to access your storage accounts from your Spark job.
Reading and writing to and from Azure blobs is easily achieved by using the wasb
syntax. For example, reading a csv file using Pyspark would be:
# read csv data into data
dataframe = spark.read.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_INPUT_DATA.csv')
# print off the first 5 rows
dataframe.show(5)
# write the csv back to storage
dataframe.write.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_OUTPUT_DATA.csv')
Azure Data Lake (ADL)¶
Pre-built into this package is native support for connecting your Spark cluster to Azure Data Lake (aka ADL). The required ADL jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.
To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using ADL filled out inthe .aztk/core-site.xml file. Simply uncomment the in the “ADL (Azure Data Lake) Configuration” section and fill out the properties for MY_AAD_TENANT_ID, MY_AAD_CLIENT_ID and MY_AAD_CREDENTIAL.
Once you have correctly filled out the .aztk/core-site.xml with your Azure Data Lake credentials, you will be able to access your ADL stroage repositories from your Spark job.
Reading and writing to and from Azure Data Lake Storage is easily achieved by using the adl
syntax. For example, reading a csv file using Pyspark would be:
# read csv data into data
dataframe = spark.read.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_INPUT_DATA.csv')
# print off the first 5 rows
dataframe.show(5)
# write the csv back to storage
dataframe.write.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_OUTPUT_DATA.csv')
Note: The implementation of the ADL connector is designed to always access ADLS through a secure channel, so there is no adls file system scheme name. You will always use adl. For more information please take a look at https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-data-lake-store.
Note: In order to use ADL you must first create an AAD application and give it permissions to your ADL Storage account. There is a good tutorial on how to create the require AAD security objects to use ADL here. Not shown in this tutorial is that as a last step, you will need to give permissions the application you created permissions to your ADL Storage account.
Additional file system connectors¶
You can quickly add support for additional data repositories by adding the necessary JARS to your cluster, configuring the spark-defaults.conf and core-site.xml file accordingly.
Adding Jars¶
To add jar files to the cluster, simply add them to your local .aztk/jars directory. These will automatically get loaded into your cluster and placed under $SPARK_HOME/jars
Registering Jars¶
To register the jars, update the .aztk/spark-defaults.conf file and add the path to the jar file(s) to the spark.jars property
spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar
Configuring the file system¶
Configuring the file system requires an update to the aztk/core-site.xml file. Each file system is unique and requires different setup in the core-site.xml. In .aztk/core-site.xml, we have preloaded templates to add WASB and ADL.
GPU¶
Use GPUs to accelerate your Spark applications. When using a GPU enabled Azure VM, your docker image will contain CUDA-8.0 and cuDnn-6.0 by default. See Docker Image for more information about the AZTK Docker images.
[NOTE: Azure does not have GPU enabled VMs in all regions. Please use this link to make sure that your Batch account is in a region that has GPU enabled VMs]
AZTK uses Nvidia-Docker to expose the VM’s GPU(s) inside the container. Nvidia drivers (ver. 384) are installed at runtime.
Tutorial¶
Create a cluster specifying a GPU enabled VM
aztk spark cluster create --id gpu-cluster --vm-size standard_nc6 --size 1
Submit your an application to the cluster that will take advantage of the GPU
aztk spark cluster submit --id gpu-cluster --name gpu-app ./examples/src/main/python/gpu/nubma_example.py
Installation Location¶
By default, CUDA is installed at /usr/local/cuda-8.0
.
Jobs¶
In the Azure Distributed Data Engineering Toolkit,a Job is an entity that runs against an automatically provisioned and managed cluster. Jobs run a collection of Spark applications and and persist the outputs.
Creating a Job¶
Creating a Job starts with defining the necessary properties in your .aztk/job.yaml
file. Jobs have one or more applications to run as well as values that define the Cluster the applications will run on.
Job.yaml¶
Each Job has one or more applications given as a List in Job.yaml. Applications are defined using the following properties:
applications:
- name:
application:
application_args:
-
main_class:
jars:
-
py_files:
-
files:
-
driver_java_options:
-
driver_library_path:
driver_class_path:
driver_memory:
executor_memory:
driver_cores:
executor_cores:
Please note: the only required fields are name and application. All other fields may be removed or left blank.
NOTE: The Applcaition name can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each application must have a unique name.
Jobs also require a definition of the cluster on which the Applications will run. The following properties define a cluster:
cluster_configuration:
vm_size: <the Azure VM size>
size: <the number of nodes in the Cluster>
toolkit:
software: spark
version: 2.2
subnet_id: <resource ID of a subnet to use (optional)>
custom_scripts:
- List
- of
- paths
- to
- custom
- scripts
Please Note: For more information about Azure VM sizes, see Azure Batch Pricing. And for more information about Docker repositories see Docker.
The only required fields are vm_size and either size or size_low_pri, all other fields can be left blank or removed.
A Job definition may also include a default Spark Configuration. The following are the properties to define a Spark Configuration:
spark_configuration:
spark_defaults_conf: </path/to/your/spark-defaults.conf>
spark_env_sh: </path/to/your/spark-env.sh>
core_site_xml: </path/to/your/core-site.xml>
Please note: including a Spark Configuration is optional. Spark Configuration values defined as part of an application will take precedence over the values specified in these files.
Below we will define a simple, functioning job definition.
# Job Configuration
job:
id: test-job
cluster_configuration:
vm_size: standard_f2
size: 3
applications:
- name: pipy100
application: /path/to/pi.py
application_args:
- 100
- name: pipy200
application: /path/to/pi.py
application_args:
- 200
Once submitted, this Job will run two applications, pipy100 and pipy200, on an automatically provisioned Cluster with 3 dedicated Standard_f2 size Azure VMs. Immediately after both pipy100 and pipy200 have completed the Cluster will be destroyed. Application logs will be persisted and available.
Commands¶
Submit a Spark Job:
aztk spark job submit --id <your_job_id> --configuration </path/to/job.yaml>
NOTE: The Job id (--id
) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each Job must have a unique id.
Low priority nodes¶
You can create your Job with low-priority VMs at an 80% discount by using --size-low-pri
instead of --size
. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.
Listing Jobs¶
You can list all Jobs currently running in your account by running
aztk spark job list
Viewing a Job¶
To view details about a particular Job, run:
aztk spark job get --id <your_job_id>
For example here Job ‘pipy’ has 2 applications which have already completed.
Job pipy
------------------------------------------
State: | completed
Transition Time: | 21:29PM 11/12/17
Applications | State | Transition Time
------------------------------------|----------------|-----------------
pipy100 | completed | 21:25PM 11/12/17
pipy200 | completed | 21:24PM 11/12/17
Deleting a Job¶
To delete a Job run:
aztk spark job delete --id <your_job_id>
Deleting a Job also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs
flag.
You are only charged for the job while it is active, Jobs handle provisioning and destorying infrastructure, so you are only charged for the time that your applications are running.
Stopping a Job¶
To stop a Job run:
aztk spark job stop --id <your_job_id>
Stopping a Job will end any currently running Applications and will prevent any new Applications from running.
Get information about a Job’s Application¶
To get information about a Job’s Application:
aztk spark job get-app --id <your_job_id> --name <your_application_name>
Getting a Job’s Application’s log¶
To get a job’s application logs:
aztk spark job get-app-logs --id <your_job_id> --name <your_application_name>
Stopping a Job’s Application¶
To stop an application that is running or going to run on a Job:
aztk spark job stop-app --id <your_job_id> --name <your_application_name>
Migration Guide¶
0.6.0 to 0.7.0¶
This guide will describe the steps needed to update a 0.6.0 aztk installation to 0.7.0.
Installation from pip¶
AZTK is now published on pip! If you installed from github previously, please reinstall.
To uninstall run:
pip3 uninstall aztk
The following command will get the latest release of aztk (please ensure you are using python3.5+):
pip3 install aztk
Or, you can install 0.7.0 specifically using:
pip3 install aztk==0.7.0
Configuration Files¶
A number of changes have been made that affect previously init’ed aztk environments. To limit potential issues with previous versions, we recommend that you replace any existing .aztk
directories.
- Backup your existing
.aztk
directory by renaming it to.aztk.old
. - Run
aztk spark init
to create a new.aztk
directory - Copy the values from
.aztk.old/secrets.yaml
to.aztk/secrets.yaml
- Update the new
.aztk/cluster.yaml
with values from.aztk.old/cluster.yaml
if applicable. Please be aware of the newtoolkit
section that replacesdocker_repo
for supported images. Similarly for.aztk/job.yaml
. - Update the new defaults in
.aztk/spark-defaults.conf
,.aztk/core-site.xml
and.aztk/spark-env.sh
if applicable. - Be sure to not copy over the
.aztk.old/jars
directory. All jars that were placed here by default have been moved on the Docker image. You can add any custom jars you had by placing them in.aztk/jars/
. - Create your new 0.7.0 cluster!
cluster.yaml¶
In cluster.yaml, the toolkit
key has been added. It is used to select the default, supported Docker images. Please refer to the configuration file documentation.
Docker images¶
A major backwards-incompatible refactor of the Docker images has occurred. Previous Docker images will no longer work with 0.7.0. To update to a new supported docker image, you will need to update your .aztk/cluster.yaml
configuration file with the toolkit
block in place of docker_repo
. If you do not do so, cluster creation will fail!
Please refer to the the configuration file documentation for more information on the toolkit
in cluster.yaml
.
Custom scripts depreciation and Plugins¶
Custom scripts have been depreciated in favor of Plugins. Plugins have a number of advantages, including the ability to execute on the host (and not in the Spark Docker container). A number of supported plugins are shipped with aztk, please refer to the plugin documentation to learn more.
Read the Docs¶
SDK samples¶
Create the Spark client¶
You can get the values for this by either running the Getting Started script or using Batch Labs
import sys, os, time
import aztk.spark
from aztk.error import AztkError
# set your secrets
secrets_confg = aztk.spark.models.SecretsConfiguration(
service_principal=aztk.spark.models.ServicePrincipalConfiguration(
tenant_id="<org>.onmicrosoft.com",
client_id="",
credential="",
batch_account_resource_id="",
storage_account_resource_id="",
),
ssh_pub_key=""
)
# create a client
client = aztk.spark.Client(secrets_confg)
List available clusters¶
# list available clusters
clusters = client.list_clusters()
Create a new cluster¶
# define a custom script
plugins = [
aztk.spark.models.plugins.JupyterPlugin(),
]
# define spark configuration
spark_conf = aztk.spark.models.SparkConfiguration(
spark_defaults_conf=os.path.join(ROOT_PATH, 'config', 'spark-defaults.conf'),
spark_env_sh=os.path.join(ROOT_PATH, 'config', 'spark-env.sh'),
core_site_xml=os.path.join(ROOT_PATH, 'config', 'core-site.xml'),
jars=[os.path.join(ROOT_PATH, 'config', 'jars', jar) for jar in os.listdir(os.path.join(ROOT_PATH, 'config', 'jars'))]
)
# configure my cluster
cluster_config = aztk.spark.models.ClusterConfiguration(
cluster_id="sdk-test",
vm_low_pri_count=2,
vm_size="standard_f2",
plugins=plugins,
spark_configuration=spark_conf
)
# create a cluster, and wait until it is ready
try:
cluster = client.create_cluster(cluster_config)
cluster = client.wait_until_cluster_is_ready(cluster.id)
except AztkError as e:
print(e.message)
sys.exit()
Get an exiting cluster¶
cluster = client.get_cluster(cluster_config.cluster_id)
Run an application on the cluster¶
# create some apps to run
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy1",
application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
application_args="10"
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy2",
application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
application_args="20"
)
app3 = aztk.spark.models.ApplicationConfiguration(
name="pipy3",
application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
application_args="30"
)
# submit an app and wait until it is finished running
client.submit(cluster.id, app1)
client.wait_until_application_done(cluster.id, app1.name)
# submit some other apps to the cluster in parallel
client.submit_all_applications(cluster.id, [app2, app3])
Get the logs of an application¶
# get logs for app, print to console
app1_logs = client.get_application_log(cluster_id=cluster_config.cluster_id, application_name=app1.name)
print(app1_logs.log)
Get status of app¶
status = client.get_application_status(cluster_config.cluster_id, app2.name)
Stream logs of app, print to console as it runs¶
current_bytes = 0
while True:
app2_logs = client.get_application_log(
cluster_id=cluster_config.cluster_id,
application_name=app2.name,
tail=True,
current_bytes=current_bytes)
print(app2_logs.log, end="")
if app2_logs.application_state == 'completed':
break
current_bytes = app2_logs.total_bytes
time.sleep(1)
# wait until all jobs finish, then delete the cluster
client.wait_until_applications_done(cluster.id)
client.delete_cluster(cluster.id)
Define a custom plugin¶
Full example¶
from aztk.spark.models.plugins import PluginConfiguration, PluginFile,PluginPort, PluginTarget, PluginTargetRole
cluster_config = ClusterConfiguration(
...# Other config,
plugins=[
PluginConfiguration(
name="my-custom-plugin",
files=[
PluginFile("file.sh", "/my/local/path/to/file.sh"),
PluginFile("data/one.json", "/my/local/path/to/data/one.json"),
PluginFile("data/two.json", "/my/local/path/to/data/two.json"),
],
execute="file.sh", # This must be one of the files defined in the file list and match the target path,
env=dict(
SOME_ENV_VAR="foo"
),
args=["arg1"], # Those arguments are passed to your execute script
ports=[
PluginPort(internal="1234"), # Internal only(For node communication for example)
PluginPort(internal="2345", public=True), # Open the port to the public(When ssh into). Used for UI for example
],
# Pick where you want the plugin to run
target=PluginTarget.Host, # The script will be run on the host. Default value is to run in the spark container
target_role=PluginTargetRole.All, # If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)
)
]
)
Parameters¶
PluginConfiguration
¶
name required
| string
¶
Name of your plugin(This will be used for creating folder, it is recommended to have a simple letter, dash, underscore only name)
files required
| List[PluginFile|PluginTextFile]
¶
List of files to upload
execute required
| str
¶
Script to execute. This script must be defined in the files above and must match its remote path
args optional
| List[str]¶
List of arguments to be passed to your execute scripts
env optional
| dict¶
List of environment variables to access in the script(This can be used to pass arguments to your script instead of args)
ports optional
| List[PluginPort]
¶
List of ports to open if the script is running in a container. A port can also be specific public and it will then be accessible when ssh into the master node.
target | optional
| PluginTarget
¶
Define where the execute script should be running. Potential values are PluginTarget.SparkContainer(Default)
and PluginTarget.Host
taget_role
| optional
| PluginTargetRole
¶
If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)
PluginFile
¶
target
required
| str
¶
Where the file should be dropped relative to the plugin working directory
local_path
| required
| str
¶
Path to the local file you want to upload(Could form the plugins parameters)
Environment variables availables in the plugin¶
AZTK provide a few environment variables that can be used in your plugin script
AZTK_IS_MASTER
: Is the plugin running on the master node. Can be eithertrue
orfalse
AZTK_IS_WORKER
: Is a worker setup on the current node(This might also be a master if you haveworker_on_master
set to true) Can be eithertrue
orfalse
AZTK_MASTER_IP
: Internal ip of the master
Debug your plugin¶
When your plugin is not working as expected there is a few things you do to invesigate issues
Check the logs, you can either use the debug tool or BatchLabs
Navigate to startup/wd/logs/plugins
Now if you see a file named
<your-plugin-name>.txt
under that folder it means that your plugin started correctly and you can check this file to see what you execute script logged.IF this file doesn’t exists this means the script was not run on this node. There could be multiple reasons for this:
- If you want your plugin to run on the spark container check the
startup/wd/logs/docker.log
file for information about this - If you want your plugin to run on the host check the
startup/stdout.txt
andstartup/stderr.txt
The log could mention you picked the wrong target or target role for that plugin which is why this plugin is not running on this node.
- If you want your plugin to run on the spark container check the
aztk package¶
aztk.models package¶
aztk.models.models module¶
Bases:
object
-
class
aztk.models.models.
CustomScript
(name: str = None, script=None, run_on=None)[source]¶ Bases:
object
-
class
aztk.models.models.
UserConfiguration
(username: str, ssh_key: str = None, password: str = None)[source]¶ Bases:
aztk.internal.configuration_base.ConfigurationBase
-
class
aztk.models.models.
ClusterConfiguration
(toolkit: aztk.models.toolkit.Toolkit = None, custom_scripts: List[aztk.models.models.CustomScript] = None, file_shares: List[aztk.models.models.FileShare] = None, cluster_id: str = None, vm_count=0, vm_low_pri_count=0, vm_size=None, subnet_id=None, plugins: List[aztk.models.plugins.plugin_configuration.PluginConfiguration] = None, user_configuration: aztk.models.models.UserConfiguration = None)[source]¶ Bases:
aztk.internal.configuration_base.ConfigurationBase
Cluster configuration model
Parameters: toolkit –
-
class
aztk.models.models.
ServicePrincipalConfiguration
(tenant_id: str = None, client_id: str = None, credential: str = None, batch_account_resource_id: str = None, storage_account_resource_id: str = None)[source]¶ Bases:
aztk.internal.configuration_base.ConfigurationBase
Container class for AAD authentication
Bases:
aztk.internal.configuration_base.ConfigurationBase
Container class for shared key authentication
Validate the config at its current state. Raises: Error if invalid
-
class
aztk.models.models.
DockerConfiguration
(endpoint=None, username=None, password=None)[source]¶ Bases:
aztk.internal.configuration_base.ConfigurationBase
-
class
aztk.models.models.
SecretsConfiguration
(service_principal=None, shared_key=None, docker=None, ssh_pub_key=None, ssh_priv_key=None)[source]¶ Bases:
aztk.internal.configuration_base.ConfigurationBase
aztk.spark package¶
aztk.spark.models package¶
aztk.spark.models.plugins package¶
-
class
aztk.spark.models.plugins.
HDFSPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
JupyterLabPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
JupyterPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
RStudioServerPlugin
(version='1.1.383')[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
ResourceMonitorPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.
Application
(cloud_task: azure.batch.models.cloud_task.CloudTask)[source]¶ Bases:
object
-
class
aztk.spark.models.
ApplicationConfiguration
(name=None, application=None, application_args=None, main_class=None, jars=None, py_files=None, files=None, driver_java_options=None, driver_library_path=None, driver_class_path=None, driver_memory=None, executor_memory=None, driver_cores=None, executor_cores=None, max_retry_count=None)[source]¶ Bases:
object
-
class
aztk.spark.models.
ApplicationLog
(name: str, cluster_id: str, log: str, total_bytes: int, application_state: azure.batch.models.batch_service_client_enums.TaskState, exit_code: int)[source]¶ Bases:
object
-
class
aztk.spark.models.
Cluster
(pool: azure.batch.models.cloud_pool.CloudPool = None, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]¶ Bases:
aztk.models.models.Cluster
-
class
aztk.spark.models.
ClusterConfiguration
(custom_scripts: List[aztk.spark.models.models.CustomScript] = None, file_shares: List[aztk.spark.models.models.FileShare] = None, cluster_id: str = None, vm_count=0, vm_low_pri_count=0, vm_size=None, subnet_id=None, toolkit: aztk.spark.models.models.SparkToolkit = None, user_configuration: aztk.spark.models.models.UserConfiguration = None, spark_configuration: aztk.spark.models.models.SparkConfiguration = None, worker_on_master: bool = None)[source]¶
-
class
aztk.spark.models.
File
(name: str, payload: _io.StringIO)[source]¶ Bases:
aztk.models.models.File
Bases:
aztk.models.models.FileShare
-
class
aztk.spark.models.
Job
(cloud_job_schedule: azure.batch.models.cloud_job_schedule.CloudJobSchedule, cloud_tasks: List[azure.batch.models.cloud_task.CloudTask] = None, pool: azure.batch.models.cloud_pool.CloudPool = None, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]¶ Bases:
object
-
class
aztk.spark.models.
JobConfiguration
(id, applications, vm_size, custom_scripts=None, spark_configuration=None, toolkit=None, max_dedicated_nodes=0, max_low_pri_nodes=0, subnet_id=None, worker_on_master=None)[source]¶ Bases:
object
-
class
aztk.spark.models.
JobState
[source]¶ Bases:
object
-
complete
= 'completed'¶
-
active
= 'active'¶
-
completed
= 'completed'¶
-
disabled
= 'disabled'¶
-
terminating
= 'terminating'¶
-
deleting
= 'deleting'¶
-
-
class
aztk.spark.models.
List
[source]¶ Bases:
list
,typing.MutableSequence
-
class
aztk.spark.models.
PluginConfiguration
(name: str, ports: List[aztk.models.plugins.plugin_configuration.PluginPort] = None, files: List[aztk.models.plugins.plugin_file.PluginFile] = None, execute: str = None, args=None, env=None, target_role: aztk.models.plugins.plugin_configuration.PluginTargetRole = None, target: aztk.models.plugins.plugin_configuration.PluginTarget = None)[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.
SecretsConfiguration
(service_principal=None, shared_key=None, docker=None, ssh_pub_key=None, ssh_priv_key=None)[source]¶
-
class
aztk.spark.models.
ServicePrincipalConfiguration
(tenant_id: str = None, client_id: str = None, credential: str = None, batch_account_resource_id: str = None, storage_account_resource_id: str = None)[source]¶
-
class
aztk.spark.models.
SparkConfiguration
(spark_defaults_conf=None, spark_env_sh=None, core_site_xml=None, jars=None)[source]¶ Bases:
object
-
class
aztk.spark.models.
SparkToolkit
(version: str, environment: str = None, environment_version: str = None)[source]¶ Bases:
aztk.models.toolkit.Toolkit
-
class
aztk.spark.models.
UserConfiguration
(username: str, ssh_key: str = None, password: str = None)[source]¶
-
class
aztk.spark.models.
VmImage
(publisher, offer, sku)[source]¶ Bases:
aztk.models.models.VmImage
aztk.spark.client module¶
-
class
aztk.spark.client.
Client
(secrets_config)[source]¶ Bases:
aztk.client.Client
Aztk Spark Client This is the main entry point for using aztk for spark
Parameters: secrets_config (aztk.spark.models.models.SecretsConfiguration) – Configuration with all the needed credentials -
create_cluster
(configuration: aztk.spark.models.models.ClusterConfiguration, wait: bool = False)[source]¶ Create a new aztk spark cluster
Parameters: - cluster_conf (aztk.spark.models.models.ClusterConfiguration) – Configuration for the the cluster to be created
- wait (bool) – If you should wait for the cluster to be ready before returning
Returns: aztk.spark.models.Cluster
-
submit
(cluster_id: str, application: aztk.spark.models.models.ApplicationConfiguration, wait: bool = False)[source]¶
-
create_user
(cluster_id: str, username: str, password: str = None, ssh_key: str = None) → str[source]¶
-
get_application_log
(cluster_id: str, application_name: str, tail=False, current_bytes: int = 0)[source]¶
-
cluster_run
(cluster_id: str, command: str, host=False, internal: bool = False, timeout=None)[source]¶
-
cluster_copy
(cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None)[source]¶
-
aztk.client module¶
aztk.error module¶
Contains all errors used in Aztk. All error should inherit from AztkError
-
exception
aztk.error.
ClusterNotReadyError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
AzureApiInitError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
InvalidPluginConfigurationError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
InvalidModelError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
MissingRequiredAttributeError
[source]¶ Bases:
aztk.error.InvalidModelError
-
exception
aztk.error.
InvalidCustomScriptError
[source]¶ Bases:
aztk.error.InvalidModelError
-
exception
aztk.error.
InvalidPluginReferenceError
[source]¶ Bases:
aztk.error.InvalidModelError
Writting docs¶
Docs are located in the docs folder. We are using sphinx
to generate the docs and then hosting them on readthedocs
.
Start docs autobuild to test locally¶
sphinx-autobuild docs docs/_build/html --watch aztk
Open docs/_build/index.html
Publish the docs¶
Docs should be published automatically to read the docs as soon as you push to master under the latest
tag.
You when creating a git tag readthedocs can also build that one.
Writing a model¶
Getting started¶
In aztk/models
create a new file with the name of your model my_model.py
In aztk/models/__init__.py
add from .my_model import MyModel
Create a new class MyModel
that inherit ConfigurationBase
from aztk.internal import ConfigurationBase
class MyModel(ConfigurationBase):
"""
MyModel is an sample model
Args:
input1 (str): This is the first input
"""
def __init__(self, input1: str):
self.input1 = input1
def validate(self):
pass
Add validation¶
In def validate
do any kind of checks and raise a InvalidModelError
if there is any problems with the values
Validate required¶
To validate required attributes call the parent _validate_required
method. Method takes a list of attributes which should not be None
def validate(self) -> bool:
self._validate_required(["input1"])
Custom validation¶
def validate(self) -> bool:
if "foo" in self.input1:
raise InvalidModelError("foo cannot be in input1")
Convert dict to model¶
When inheriting from ConfigurationBase
it comes with a from_dict
class method which allows to convert a dict to this class
It works great for simple case where values are simple types(str, int, etc). If however you need to process it you can override the _from_dict
method.
** Important: Do not override the from_dict
method as this one will handle error and display them nicely **
@classmethod
def _from_dict(cls, args: dict):
if "input1" in args:
args["input1"] = MyInput1Model.from_dict(args["input1"])
return super()._from_dict(args)
Tests¶
AZTK comes with a testing library that can be used for verification, and debugging. Please note that some tests will provision and test real resources in Azure, and as a result, will cost money to run. See Integration Tests for more details.
Integration Tests¶
Integration tests use the credentials given in your .aztk/secrets.yaml
file to spin up real Clusters and Jobs to verify the functionality of the library. Please note that these tests will cost money to run. All created Clusters nad Jobs will be deleted when the test completes.
Since each integration test spins up a Cluster or Job, you may want to run the tests in parallel to reduce the time needed to complete the testing library:
pytest $path_to_repo_root -n <5>
Note: $path_to_repo_root represents the path to the root of the aztk repository, and is only required if you are running the tests from a different location.
Please note that the number passed to the -n
flag determines the number of tests you wish to run in parallel. Parallelizing the tests will increase the number of CPU cores used at one time, so please verify that you have the available core quota in your Batch account.