Source code for aztk.models.toolkit

from aztk.error import InvalidModelError
from aztk.utils import constants, deprecate
from aztk.core.models import Model, fields

class ToolkitDefinition:
    def __init__(self, versions, environments):
        self.versions = versions
        self.environments = environments

class ToolkitEnvironmentDefinition:
    def __init__(self, versions=None, default=""):
        self.versions = versions or [""]
        self.default = default

TOOLKIT_MAP = dict(
    spark=ToolkitDefinition(
        versions=["1.6.3", "2.1.0", "2.2.0", "2.3.0"],
        environments=dict(
            base=ToolkitEnvironmentDefinition(),
            r=ToolkitEnvironmentDefinition(),
            miniconda=ToolkitEnvironmentDefinition(),
            anaconda=ToolkitEnvironmentDefinition(),
        )
    ),
)


[docs]class Toolkit(Model): """ Toolkit for a cluster. This will help pick the docker image needed Args: software (str): Name of the toolkit(spark) version (str): Version of the toolkit environment (str): Which environment to use for this toolkit environment_version (str): If there is multiple version for an environment you can specify which one docker_repo (str): Optional docker repo """ software = fields.String() version = fields.String() environment = fields.String(default=None) environment_version = fields.String(default=None) docker_repo = fields.String(default=None) def __validate__(self): if self.software not in TOOLKIT_MAP: raise InvalidModelError("Toolkit '{0}' is not in the list of allowed toolkits {1}".format( self.software, list(TOOLKIT_MAP.keys()))) toolkit_def = TOOLKIT_MAP[self.software] if self.version not in toolkit_def.versions: raise InvalidModelError("Toolkit '{0}' with version '{1}' is not available. Use one of: {2}".format( self.software, self.version, toolkit_def.versions)) if self.version == "1.6": deprecate("0.9.0", "Spark version 1.6 is being deprecated for Aztk.", "Please use 2.1 and above.") if self.environment: if self.environment not in toolkit_def.environments: raise InvalidModelError("Environment '{0}' for toolkit '{1}' is not available. Use one of: {2}".format( self.environment, self.software, list(toolkit_def.environments.keys()))) env_def = toolkit_def.environments[self.environment] if self.environment_version and self.environment_version not in env_def.versions: raise InvalidModelError( "Environment '{0}' version '{1}' for toolkit '{2}' is not available. Use one of: {3}".format( self.environment, self.environment_version, self.software, env_def.versions)) def get_docker_repo(self, gpu: bool): if self.docker_repo: return self.docker_repo repo = "aztk/{0}".format(self.software) return "{repo}:{tag}".format( repo=repo, tag=self._get_docker_tag(gpu), ) def _get_docker_tag(self, gpu: bool): environment = self.environment or "base" environment_def = self._get_environment_definition() environment_version = self.environment_version or (environment_def and environment_def.default) array = [ "v{docker_image_version}".format(docker_image_version=constants.DOCKER_IMAGE_VERSION), "{toolkit}{version}".format(toolkit=self.software, version=self.version), ] if self.environment: array.append("{0}{1}".format(environment, environment_version)) array.append("gpu" if gpu else "base") return '-'.join(array) def _get_environment_definition(self) -> ToolkitEnvironmentDefinition: toolkit = TOOLKIT_MAP.get(self.software) if toolkit: return toolkit.environments.get(self.environment or "base") return None