from typing import Dict, List, Optional, Union
from ..data_model import *
from ..exceptions import *
from .service_client import ServiceClient
[docs]class ClusterClient(ServiceClient):
"""
Accessed via :data:`Beaker.cluster <beaker.Beaker.cluster>`.
"""
[docs] def get(self, cluster: str) -> Cluster:
"""
Get information about the cluster.
:param cluster: The cluster name or ID.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
def _get(id: str) -> Cluster:
return Cluster.from_json(
self.request(
f"clusters/{id}",
exceptions_for_status={404: ClusterNotFound(self._not_found_err_msg(id))},
).json()
)
try:
# Could be a cluster ID, so we try that first before trying to resolve the name.
return _get(cluster)
except ClusterNotFound:
try:
cluster_name = self.resolve_cluster_name(cluster)
return _get(cluster_name)
except (ValueError, OrganizationNotSet, ClusterNotFound):
# If the name was invalid, we'll just raise the original error.
pass
raise
[docs] def create(
self,
name: str,
max_size: int = 1,
preemptible: bool = False,
cpus: Optional[float] = None,
gpus: int = 0,
gpu_type: Optional[str] = None,
memory: Optional[str] = None,
) -> Cluster:
"""
Create a new Beaker cloud cluster.
.. note::
For creating on-premise clusters you should still use the `Beaker CLI
<https://github.com/allenai/beaker>`_.
:param name: The name to assign to the new cluster.
If :data:`Config.default_org <beaker.Config.default_org>` is not set,
the name should start with the name of an organization:
"{organization}/{cluster_name}", e.g. "ai2/my-new-cluster".
:param max_size: The maximum number of nodes the cluster can scale up to.
:param preemptible: Use preemptible cloud machines for the nodes.
:param cpus: The number of virtual CPU available to each node.
:param gpus: The number of GPUs available to each node.
:param gpu_type: The type of GPU available to each node.
:param memory: The amount of memory available to each node, specified as a number
with a unit suffix. E.g. "2.5GiB".
:raises ValueError: If the cluster name or requested resources are invalid.
:raises ClusterConflict: If a cluster by that name already exists.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
organization, cluster_name = self.resolve_cluster_name(name).split("/", 1)
if not cpus and not gpus and not gpu_type and not memory:
raise ValueError("Cloud clusters must specify at least 1 resource")
return Cluster.from_json(
self.request(
f"clusters/{self.url_quote(organization)}",
method="POST",
data=ClusterSpec(
name=cluster_name,
capacity=max_size,
preemptible=preemptible,
spec=NodeResources(
cpu_count=cpus, gpu_count=gpus, gpu_type=gpu_type, memory=memory
),
),
exceptions_for_status={409: ClusterConflict(cluster_name)},
).json()
)
[docs] def update(
self,
cluster: Union[str, Cluster],
max_size: Optional[int] = None,
allow_preemptible: Optional[bool] = None,
) -> Cluster:
"""
Modify a cluster.
:param cluster: The cluster ID, full name, or object.
:param max_size: The maximum number of nodes.
:param allow_preemptible: Allow or disallow preemptible jobs.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster_name = self.resolve_cluster(cluster).full_name
return Cluster.from_json(
self.request(
f"clusters/{cluster_name}",
method="PATCH",
data=ClusterPatch(
capacity=max_size, allow_preemptible_restriction_exceptions=allow_preemptible
),
exceptions_for_status={404: ClusterNotFound(self._not_found_err_msg(cluster))},
).json()
)
[docs] def delete(self, cluster: Union[str, Cluster]):
"""
Delete a cluster.
:param cluster: The cluster ID, full name, or object.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster_name = self.resolve_cluster(cluster).full_name
self.request(
f"clusters/{cluster_name}",
method="DELETE",
exceptions_for_status={404: ClusterNotFound(self._not_found_err_msg(cluster))},
)
[docs] def list(self, org: Optional[Union[str, Organization]] = None) -> List[Cluster]:
"""
List clusters under an organization.
:param org: The organization name or object. If not specified,
:data:`Beaker.config.default_org <beaker.Config.default_org>` is used.
:raises OrganizationNotFound: If the organization doesn't exist.
:raises OrganizationNotSet: If neither ``org`` nor
:data:`Beaker.config.default_org <beaker.Config.default_org>` are set.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
org_id = self.resolve_org(org).id
return [
Cluster.from_json(d)
for d in self.request(
f"clusters/{org_id}",
method="GET",
exceptions_for_status={404: OrganizationNotFound(org_id)},
).json()["data"]
]
[docs] def nodes(self, cluster: Union[str, Cluster]) -> List[Node]:
"""
List the nodes in a cluster.
:param cluster: The cluster ID, full name, or object.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster_name = self.resolve_cluster(cluster).full_name
return [
Node.from_json(d)
for d in self.request(
f"clusters/{cluster_name}/nodes",
method="GET",
exceptions_for_status={404: ClusterNotFound(self._not_found_err_msg(cluster))},
).json()["data"]
]
[docs] def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
"""
Get current utilization stats for each node in a cluster.
:param cluster: The cluster ID, full name, or object.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster = self.resolve_cluster(cluster)
nodes = self.nodes(cluster)
running_jobs = 0
queued_jobs = 0
running_preemptible_jobs = 0
jobs: List[Job] = []
node_to_util: Dict[str, Dict[str, Union[int, float]]] = {
node.id: {
"running_jobs": 0,
"running_preemptible_jobs": 0,
"gpus_used": 0,
"cpus_used": 0.0,
}
for node in nodes
}
for job in self.beaker.job.list(cluster=cluster, finalized=False):
if job.is_running:
if job.node not in node_to_util:
continue
running_jobs += 1
if job.is_preemptible:
running_preemptible_jobs += 1
elif job.is_queued:
queued_jobs += 1
jobs.append(job)
if job.node is not None:
if job.node not in node_to_util:
continue # unlikely
node_util = node_to_util[job.node]
node_util["running_jobs"] += 1
if job.is_preemptible:
node_util["running_preemptible_jobs"] += 1
if job.limits is not None:
if job.limits.gpus is not None:
node_util["gpus_used"] += len(job.limits.gpus)
if job.limits.cpu_count is not None:
node_util["cpus_used"] += job.limits.cpu_count
node_utilizations = []
for node in nodes:
node_util = node_to_util[node.id]
gpu_count = node.limits.gpu_count
gpus_used = None if gpu_count is None else int(min(gpu_count, node_util["gpus_used"]))
gpus_free = (
None if gpu_count is None else int(max(0, gpu_count - node_util["gpus_used"]))
)
cpu_count = node.limits.cpu_count
cpus_used = None if cpu_count is None else int(min(cpu_count, node_util["cpus_used"]))
cpus_free = (
None if cpu_count is None else int(max(0, cpu_count - node_util["cpus_used"]))
)
node_utilizations.append(
NodeUtilization(
id=node.id,
hostname=node.hostname,
limits=node.limits,
running_jobs=int(node_util["running_jobs"]),
running_preemptible_jobs=int(node_util["running_preemptible_jobs"]),
used=NodeResources(
gpu_count=gpus_used,
cpu_count=cpus_used,
gpu_type=node.limits.gpu_type,
),
free=NodeResources(
gpu_count=gpus_free,
cpu_count=cpus_free,
gpu_type=node.limits.gpu_type,
),
cordoned=node.cordoned is not None,
)
)
return ClusterUtilization(
cluster=cluster,
running_jobs=running_jobs,
queued_jobs=queued_jobs,
running_preemptible_jobs=running_preemptible_jobs,
nodes=tuple(node_utilizations),
jobs=tuple(jobs),
)
[docs] def filter_available(
self, resources: TaskResources, *clusters: Union[str, Cluster]
) -> List[ClusterUtilization]:
"""
Filter out clusters that don't have enough available resources, returning
a list of :class:`ClusterUtilization <beaker.data_model.cluster.ClusterUtilization>` for each
cluster that has sufficient resources.
This can be used, for example, to automatically find an on-premise cluster with enough
free resources to run a particular task.
.. caution::
This method is experimental and may change or be removed in future releases.
:param resources: The requested resources.
:param clusters: Clusters to inspect and filter.
:raises ClusterNotFound: If one of the clusters doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
def node_is_compat(node_shape: NodeResources) -> bool:
if resources.gpu_count and (
node_shape.gpu_count is None or node_shape.gpu_count < resources.gpu_count
):
return False
if resources.cpu_count and (
node_shape.cpu_count is None or node_shape.cpu_count < resources.cpu_count
):
return False
# TODO: check memory too
return True
def cluster_is_available(cluster_: Union[str, Cluster]) -> Optional[ClusterUtilization]:
cluster: Cluster = self.resolve_cluster(cluster_)
if cluster.node_shape is not None and not node_is_compat(cluster.node_shape):
return None
cluster_utilization = self.utilization(cluster)
if cluster.autoscale and len(cluster_utilization.nodes) < cluster.capacity:
available.append(cluster_utilization)
else:
for node_util in cluster_utilization.nodes:
if not node_util.cordoned and node_is_compat(node_util.free):
return cluster_utilization
return None
available: List[ClusterUtilization] = []
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for cluster_ in clusters:
futures.append(executor.submit(cluster_is_available, cluster_))
for future in concurrent.futures.as_completed(futures):
cluster_util = future.result()
if cluster_util is not None:
available.append(cluster_util)
return sorted(available, key=lambda util: (util.queued_jobs, util.running_jobs))
[docs] def url(self, cluster: Union[str, Cluster]) -> str:
"""
Get the URL for a cluster.
:param cluster: The cluster ID, full name, or object.
:raises ClusterNotFound: If the cluster doesn't exist.
"""
cluster_name = self.resolve_cluster(cluster).full_name
return f"{self.config.agent_address}/cl/{cluster_name}/details"
[docs] def preempt_jobs(
self, cluster: Union[str, Cluster], ignore_failures: bool = False
) -> List[Job]:
"""
Preempt all preemptible jobs on the cluster.
:param cluster: The cluster ID, full name, or object.
:param ignore_failures: If ``True``, any jobs that fail to preempt will be ignored.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster = self.resolve_cluster(cluster)
nodes = set(n.id for n in self.nodes(cluster))
current_jobs = self.beaker.job.list(cluster=cluster, finalized=False)
preempted_jobs = []
for job in current_jobs:
if job.node not in nodes:
continue
if job.execution is None:
continue
if job.status.current not in {CurrentJobStatus.running, CurrentJobStatus.idle}:
continue
if job.execution.spec.context.priority != Priority.preemptible:
continue
try:
preempted_jobs.append(self.beaker.job.preempt(job))
except BeakerPermissionsError:
if ignore_failures:
self.logger.warning(
"Failed to preempt job '%s': insufficient permissions", job.id
)
else:
raise
return preempted_jobs
def _not_found_err_msg(self, cluster: Union[str, Cluster]) -> str:
cluster = cluster if isinstance(cluster, str) else cluster.id
return (
f"'{cluster}': Make sure you're using a valid ID or *full* name of the cluster "
f"(with the organization prefix, e.g. 'org/cluster_name')"
)