{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"align-center\">\n",
    "<a href=\"https://oumi.ai/\"><img src=\"https://oumi.ai/docs/en/latest/_static/logo/header_logo.png\" height=\"200\"></a>\n",
    "\n",
    "[![Documentation](https://img.shields.io/badge/Documentation-latest-blue.svg)](https://oumi.ai/docs/en/latest/index.html)\n",
    "[![Discord](https://img.shields.io/discord/1286348126797430814?label=Discord)](https://discord.gg/oumi)\n",
    "[![GitHub Repo stars](https://img.shields.io/github/stars/oumi-ai/oumi)](https://github.com/oumi-ai/oumi)\n",
    "<a target=\"_blank\" href=\"https://colab.research.google.com/github/oumi-ai/oumi/blob/main/notebooks/Oumi - Launching Jobs on Custom Clusters.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>\n",
    "</div>\n",
    "\n",
    "👋 Welcome to Open Universal Machine Intelligence (Oumi)!\n",
    "\n",
    "🚀 Oumi is a fully open-source platform that streamlines the entire lifecycle of foundation models - from [data preparation](https://oumi.ai/docs/en/latest/resources/datasets/datasets.html) and [training](https://oumi.ai/docs/en/latest/user_guides/train/train.html) to [evaluation](https://oumi.ai/docs/en/latest/user_guides/evaluate/evaluate.html) and [deployment](https://oumi.ai/docs/en/latest/user_guides/launch/launch.html). Whether you're developing on a laptop, launching large scale experiments on a cluster, or deploying models in production, Oumi provides the tools and workflows you need.\n",
    "\n",
    "🤝 Make sure to join our [Discord community](https://discord.gg/oumi) to get help, share your experiences, and contribute to the project! If you are interested in joining one of the community's open-science efforts, check out our [open collaboration](https://oumi.ai/community) page.\n",
    "\n",
    "⭐ If you like Oumi and you would like to support it, please give it a star on [GitHub](https://github.com/oumi-ai/oumi)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Custom Clusters\n",
    "\n",
    "In this tutorial, we'll cover how you can launch Oumi jobs on custom clusters that are not supported out of the box.\n",
    "\n",
    "Specifically, this tutorial is geared towards individuals who have access to a compute cluster that's not hosted on a common cloud provider (e.g. University compute clusters).\n",
    "\n",
    "We'll cover the following topics:\n",
    "1. Prerequisites\n",
    "1. The Oumi Launcher Hierarchy\n",
    "1. Creating a CustomClient Class\n",
    "1. Creating a CustomCluster Class\n",
    "1. Creating a CustomCloud Class\n",
    "1. Registering Your CustomCloud\n",
    "1. Running a Job on Your Cloud"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Prerequisites\n",
    "## Oumi Installation\n",
    "\n",
    "First, let's install Oumi. You can find more detailed instructions [here](https://oumi.ai/docs/en/latest/get_started/installation.html). \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install oumi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The Oumi Launcher Hierarchy"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Preface\n",
    "Before diving into this tutorial, lets discuss the hierarchy of the Oumi Launcher. At this point, it's worth reading through our tutorial on [Running Jobs Remotely](https://github.com/oumi-ai/oumi/blob/main/notebooks/Oumi%20-%20Running%20Jobs%20Remotely.ipynb) to better understand the end-to-end flow of the launcher. Already read it? Great!\n",
    "\n",
    "### Overview\n",
    "At a high level, the Oumi Launcher is composed of 3 tiers of objects: `Clouds`, `Clusters`, and `Clients`. The Launcher holds an instance of each unique `Cloud`. These `Clouds`, in turn, are responsible for creating compute `Clusters`. And `Clusters` coordinate running jobs. All communication with remote APIs happens via the `Client`.\n",
    "\n",
    "#### Clouds\n",
    "A Cloud class must implement the [`BaseCloud`](https://github.com/oumi-ai/oumi/blob/main/src/oumi/core/types/base_cloud.py) abstract class. The Launcher will only create one instance of each Cloud, so it's important that a single Cloud object is capable of turning up and down multiple clusters.\n",
    "\n",
    "You can find several implementations of Clouds [here](https://github.com/oumi-ai/oumi/tree/main/src/oumi/launcher/clouds).\n",
    "\n",
    "#### Clusters\n",
    "A Cluster class must implement the [`BaseCluster`](https://github.com/oumi-ai/oumi/blob/main/src/oumi/core/types/base_cluster.py) abstract class. A cluster represents a single instance of hardware. For a custom clusters (such as having a single super computer), it may be the case that you only need 1 cluster to represent your hardware setup.\n",
    "\n",
    "You can find several implementations of Clusters [here](https://github.com/oumi-ai/oumi/tree/main/src/oumi/launcher/clusters).\n",
    "\n",
    "#### Clients\n",
    "Clients are a completely optional but highly encouraged class. Clients should encapsulate all logic that calls remote APIs related to your cloud. While this logic could be encapsulated with your Cluster and Cloud classes, having a dedicated class for this purpose greatly simplifies your Cloud and Cluster logic.\n",
    "\n",
    "You can find several implementations of Clients [here](https://github.com/oumi-ai/oumi/tree/main/src/oumi/launcher/clients)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating a CustomClient Class\n",
    "Let's get started by creating a client for our new cloud, `Foobar`. Let's create a simple client that randomly sets the state of the job on submission. It also supports canceling jobs, and turning down clusters:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "from enum import Enum\n",
    "\n",
    "from oumi.core.configs import JobConfig\n",
    "from oumi.core.launcher import JobState, JobStatus\n",
    "\n",
    "\n",
    "class _JobState(Enum):\n",
    "    \"\"\"An enumeration of the possible states of a job.\"\"\"\n",
    "\n",
    "    QUEUED = \"QUEUED\"\n",
    "    RUNNING = \"RUNNING\"\n",
    "    COMPLETED = \"COMPLETED\"\n",
    "    FAILED = \"FAILED\"\n",
    "    CANCELED = \"CANCELED\"\n",
    "\n",
    "\n",
    "class CustomClient:\n",
    "    \"\"\"A client for running jobs locally in a subprocess.\"\"\"\n",
    "\n",
    "    def __init__(self):\n",
    "        \"\"\"Initializes a new instance of the CustomClient class.\"\"\"\n",
    "        self._jobs = []\n",
    "\n",
    "    def submit_job(self, job: JobConfig) -> JobStatus:\n",
    "        \"\"\"Pretends to run the specified job on this cluster.\"\"\"\n",
    "        job_id = str(len(self._jobs))\n",
    "        name = job.name if job.name else job_id\n",
    "        # Pick a random status\n",
    "        status = random.choice([state for state in _JobState])\n",
    "        job_status = JobStatus(\n",
    "            name=name,\n",
    "            id=job_id,\n",
    "            status=status.value,\n",
    "            cluster=\"\",\n",
    "            metadata=\"\",\n",
    "            done=False,\n",
    "            state=JobState.PENDING,\n",
    "        )\n",
    "        self._jobs.append(job_status)\n",
    "        return job_status\n",
    "\n",
    "    def list_jobs(self) -> list[JobStatus]:\n",
    "        \"\"\"Returns a list of job statuses.\"\"\"\n",
    "        return self._jobs\n",
    "\n",
    "    def get_job(self, job_id: str) -> JobStatus | None:\n",
    "        \"\"\"Gets the specified job's status.\n",
    "\n",
    "        Args:\n",
    "            job_id: The ID of the job to get.\n",
    "\n",
    "        Returns:\n",
    "            The job status if found, None otherwise.\n",
    "        \"\"\"\n",
    "        job_list = self.list_jobs()\n",
    "        for job in job_list:\n",
    "            if job.id == job_id:\n",
    "                return job\n",
    "        return None\n",
    "\n",
    "    def cancel(self, job_id) -> JobStatus | None:\n",
    "        \"\"\"Cancels the specified job.\n",
    "\n",
    "        Args:\n",
    "            job_id: The ID of the job to cancel.\n",
    "\n",
    "        Returns:\n",
    "            The job status if found, None otherwise.\n",
    "        \"\"\"\n",
    "        int_id = int(job_id)\n",
    "        if int_id > len(self._jobs):\n",
    "            return None\n",
    "        job_status = self._jobs[int_id]\n",
    "        job_status.status = _JobState.CANCELED.value\n",
    "        return job_status\n",
    "\n",
    "    def turndown_cluster(self, cluster_name: str):\n",
    "        \"\"\"Turns down the cluster.\"\"\"\n",
    "        print(f\"Turning down cluster {cluster_name}...\")\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating a CustomCluster Class\n",
    "Now that we have a client that talk's to our API, we can use the Client to build a Cluster!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Any\n",
    "\n",
    "from oumi.core.launcher import BaseCluster\n",
    "\n",
    "\n",
    "class CustomCluster(BaseCluster):\n",
    "    \"\"\"A custom cluster implementation.\"\"\"\n",
    "\n",
    "    def __init__(self, name: str, client: CustomClient) -> None:\n",
    "        \"\"\"Initializes a new instance of the CustomCluster class.\"\"\"\n",
    "        self._name = name\n",
    "        self._client = client\n",
    "\n",
    "    def __eq__(self, other: Any) -> bool:\n",
    "        \"\"\"Checks if two LocalClusters are equal.\"\"\"\n",
    "        if not isinstance(other, CustomCluster):\n",
    "            return False\n",
    "        return self.name() == other.name()\n",
    "\n",
    "    def name(self) -> str:\n",
    "        \"\"\"Gets the name of the cluster.\"\"\"\n",
    "        return self._name\n",
    "\n",
    "    def get_job(self, job_id: str) -> JobStatus | None:\n",
    "        \"\"\"Gets the jobs on this cluster if it exists, else returns None.\"\"\"\n",
    "        for job in self.get_jobs():\n",
    "            if job.id == job_id:\n",
    "                return job\n",
    "        return None\n",
    "\n",
    "    def get_jobs(self) -> list[JobStatus]:\n",
    "        \"\"\"Lists the jobs on this cluster.\"\"\"\n",
    "        jobs = self._client.list_jobs()\n",
    "        for job in jobs:\n",
    "            job.cluster = self._name\n",
    "        return jobs\n",
    "\n",
    "    def cancel_job(self, job_id: str) -> JobStatus:\n",
    "        \"\"\"Cancels the specified job on this cluster.\"\"\"\n",
    "        self._client.cancel(job_id)\n",
    "        job = self.get_job(job_id)\n",
    "        if job is None:\n",
    "            raise RuntimeError(f\"Job {job_id} not found.\")\n",
    "        return job\n",
    "\n",
    "    def run_job(self, job: JobConfig) -> JobStatus:\n",
    "        \"\"\"Runs the specified job on this cluster.\n",
    "\n",
    "        Args:\n",
    "            job: The job to run.\n",
    "\n",
    "        Returns:\n",
    "            The job status.\n",
    "        \"\"\"\n",
    "        job_status = self._client.submit_job(job)\n",
    "        job_status.cluster = self._name\n",
    "        return job_status\n",
    "\n",
    "    def down(self) -> None:\n",
    "        \"\"\"Cancel all jobs and turn down the cluster.\"\"\"\n",
    "        for job in self.get_jobs():\n",
    "            self.cancel_job(job.id)\n",
    "        self._client.turndown_cluster(self._name)\n",
    "\n",
    "    def stop(self) -> None:\n",
    "        \"\"\"Cancel all jobs and turn down the cluster.\"\"\"\n",
    "        self.down()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating a CustomCloud Class\n",
    "Let's create a CustomCloud to manage our clusters:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from oumi.core.launcher import BaseCloud\n",
    "\n",
    "\n",
    "class CustomCloud(BaseCloud):\n",
    "    \"\"\"A resource pool for managing Local jobs.\"\"\"\n",
    "\n",
    "    # The default cluster name. Used when no cluster name is provided.\n",
    "    _DEFAULT_CLUSTER = \"custom\"\n",
    "\n",
    "    def __init__(self):\n",
    "        \"\"\"Initializes a new instance of the LocalCloud class.\"\"\"\n",
    "        # A mapping from cluster names to Local Cluster instances.\n",
    "        self._clusters = {}\n",
    "\n",
    "    def _get_or_create_cluster(self, name: str) -> CustomCluster:\n",
    "        \"\"\"Gets the cluster with the specified name, or creates one if it doesn't exist.\n",
    "\n",
    "        Args:\n",
    "            name: The name of the cluster.\n",
    "\n",
    "        Returns:\n",
    "            LocalCluster: The cluster instance.\n",
    "        \"\"\"\n",
    "        if name not in self._clusters:\n",
    "            self._clusters[name] = CustomCluster(name, CustomClient())\n",
    "        return self._clusters[name]\n",
    "\n",
    "    def up_cluster(self, job: JobConfig, name: str | None) -> JobStatus:\n",
    "        \"\"\"Creates a cluster and starts the provided Job.\"\"\"\n",
    "        # The default cluster.\n",
    "        cluster_name = name or self._DEFAULT_CLUSTER\n",
    "        cluster = self._get_or_create_cluster(cluster_name)\n",
    "        job_status = cluster.run_job(job)\n",
    "        if not job_status:\n",
    "            raise RuntimeError(\"Failed to start job.\")\n",
    "        return job_status\n",
    "\n",
    "    def get_cluster(self, name) -> BaseCluster | None:\n",
    "        \"\"\"Gets the cluster with the specified name, or None if not found.\"\"\"\n",
    "        clusters = self.list_clusters()\n",
    "        for cluster in clusters:\n",
    "            if cluster.name() == name:\n",
    "                return cluster\n",
    "        return None\n",
    "\n",
    "    def list_clusters(self) -> list[BaseCluster]:\n",
    "        \"\"\"Lists the active clusters on this cloud.\"\"\"\n",
    "        return list(self._clusters.values())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now all that's left to do is register your CustomCloud!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Registering Your CustomCloud\n",
    "By implementing the BaseCloud class, you are now ready to register your cloud with Oumi. First, let's take a look at the clouds that are already registered:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['local', 'polaris', 'runpod', 'gcp', 'lambda', 'aws', 'azure']\n"
     ]
    }
   ],
   "source": [
    "import oumi.launcher as launcher\n",
    "\n",
    "print(launcher.which_clouds())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can register your cloud by implementing a builder method. This method must take no arguments and must return a new instance of your CustomCloud:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from oumi.core.registry import register_cloud_builder\n",
    "\n",
    "\n",
    "@register_cloud_builder(\"custom\")\n",
    "def Local_cloud_builder() -> CustomCloud:\n",
    "    \"\"\"Builds a LocalCloud instance.\"\"\"\n",
    "    return CustomCloud()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's take another look at our registered clouds now:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['local', 'polaris', 'runpod', 'gcp', 'lambda', 'aws', 'azure', 'custom']\n"
     ]
    }
   ],
   "source": [
    "print(launcher.which_clouds())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Great, our CustomCloud is there!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Running a Job on Your Cloud\n",
    "\n",
    "Let's take our new Cloud for a spin:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "JobStatus(name='test', id='0', status='FAILED', cluster='first_cluster', metadata='', done=False)\n",
      "JobStatus(name='test', id='0', status='RUNNING', cluster='second_cluster', metadata='', done=False)\n",
      "Canceling the first job...\n",
      "JobStatus(name='test', id='0', status='CANCELED', cluster='first_cluster', metadata='', done=False)\n"
     ]
    }
   ],
   "source": [
    "job = launcher.JobConfig(name=\"test\")\n",
    "job.resources.cloud = \"custom\"\n",
    "\n",
    "first_cluster, job_status = launcher.up(job, \"first_cluster\")\n",
    "print(job_status)\n",
    "second_cluster, second_job_status = launcher.up(job, \"second_cluster\")\n",
    "print(second_job_status)\n",
    "\n",
    "print(\"Canceling the first job...\")\n",
    "print(launcher.cancel(job_status.id, job.resources.cloud, job_status.cluster))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And now let's turn down our clusters:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Turning down cluster first_cluster...\n",
      "Cluster first_cluster is down. Listing jobs...\n",
      "[JobStatus(name='test', id='0', status='CANCELED', cluster='first_cluster', metadata='', done=False)]\n",
      "Turning down cluster second_cluster...\n",
      "Cluster second_cluster is down. Listing jobs...\n",
      "[JobStatus(name='test', id='0', status='CANCELED', cluster='second_cluster', metadata='', done=False)]\n"
     ]
    }
   ],
   "source": [
    "for cluster in launcher.get_cloud(\"custom\").list_clusters():\n",
    "    cluster.down()\n",
    "    print(f\"Cluster {cluster.name()} is down. Listing jobs...\")\n",
    "    print(cluster.get_jobs())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 🧭 What's Next?\n",
    "\n",
    "Congrats on finishing this notebook! Feel free to check out our other [notebooks](https://github.com/oumi-ai/oumi/tree/main/notebooks) in the [Oumi GitHub](https://github.com/oumi-ai/oumi), and give us a star! You can also join the Oumi community over on [Discord](https://discord.gg/oumi).\n",
    "\n",
    "📰 Want to keep up with news from Oumi? Subscribe to our [Substack](https://blog.oumi.ai/) and [Youtube](https://www.youtube.com/@Oumi_AI)!\n",
    "\n",
    "⚡ Interested in building custom AI in hours, not months? Apply to get [early access](https://oumi.ai/contact?utm_source=oumi_oss_tutorial_custom_cluster) to the Oumi Platform, or [chat with us](https://oumi.ai/book?utm_source=oumi_oss_tutorial_custom_cluster) to learn more!"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "oumi",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
