from typing import TYPE_CHECKING, Dict, Optional, Union, cast
from docker.models.images import Image as DockerImage
from ..data_model import *
from ..exceptions import *
from .service_client import ServiceClient
if TYPE_CHECKING:
from rich.progress import TaskID
[docs]class ImageClient(ServiceClient):
"""
Accessed via :data:`Beaker.image <beaker.Beaker.image>`.
"""
[docs] def get(self, image: str) -> Image:
"""
Get info about an image on Beaker.
:param image: The Beaker image ID or name.
:raises ImageNotFound: If the image can't be found on Beaker.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
def _get(id: str) -> Image:
return Image.from_json(
self.request(
f"images/{self.url_quote(id)}",
exceptions_for_status={404: ImageNotFound(self._not_found_err_msg(id))},
).json()
)
try:
# Could be an ID or full name, so we try that first.
return _get(image)
except ImageNotFound:
if "/" not in image:
# Try with adding the account name.
try:
return _get(f"{self.beaker.account.name}/{image}")
except ImageNotFound:
pass
# Try searching the default workspace.
if self.config.default_workspace is not None:
matches = self.beaker.workspace.images(match=image, limit=1)
if matches:
return matches[0]
raise
[docs] def create(
self,
name: str,
image_tag: str,
workspace: Optional[Union[Workspace, str]] = None,
description: Optional[str] = None,
quiet: bool = False,
commit: bool = True,
) -> Image:
"""
Upload a Docker image to Beaker.
:param name: The name to assign to the image on Beaker.
:param image_tag: The tag of the local image you're uploading.
:param workspace: The workspace to upload the image to. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
:param description: Text description of the image.
:param quiet: If ``True``, progress won't be displayed.
:param commit: Whether to commit the image after successful upload.
:raises ValueError: If the image name is invalid.
:raises ImageConflict: If an image with the given name already exists.
:raises WorkspaceNotSet: If neither ``workspace`` nor
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
self.validate_beaker_name(name)
workspace = self.resolve_workspace(workspace)
# Get local Docker image object.
image = cast(DockerImage, self.docker.images.get(image_tag))
# Create new image on Beaker.
image_id = self.request(
"images",
method="POST",
data=ImageSpec(
workspace=workspace.id,
image_id=image.id,
image_tag=image_tag,
description=description,
),
query={"name": name},
exceptions_for_status={409: ImageConflict(name)},
).json()["id"]
# Get the repo data for the Beaker image.
repo = ImageRepo.from_json(
self.request(f"images/{image_id}/repository", query={"upload": True}).json()
)
# Tag the local image with the new tag for the Beaker image.
image.tag(repo.image_tag)
# Push the image to Beaker.
from ..progress import get_image_upload_progress
with get_image_upload_progress(quiet) as progress:
layer_id_to_task: Dict[str, "TaskID"] = {}
for layer_state_data in self.docker.api.push(
repo.image_tag,
stream=True,
decode=True,
auth_config={
"username": repo.auth.user,
"password": repo.auth.password,
"server_address": repo.auth.server_address,
},
):
if "id" not in layer_state_data or "status" not in layer_state_data:
continue
layer_state = DockerLayerUploadState.from_json(layer_state_data)
# Get progress task ID for layer, initializing if it doesn't already exist.
task_id: "TaskID"
if layer_state.id not in layer_id_to_task:
task_id = progress.add_task(layer_state.id, start=True, total=1)
layer_id_to_task[layer_state.id] = task_id
else:
task_id = layer_id_to_task[layer_state.id]
# Update task progress description.
progress.update(
task_id, description=f"{layer_state.id}: {layer_state.status.title()}"
)
# Update task progress total and completed.
if (
layer_state.progress_detail.total is not None
and layer_state.progress_detail.current is not None
):
progress.update(
task_id,
total=layer_state.progress_detail.total,
completed=layer_state.progress_detail.current,
)
elif layer_state.status in {
DockerLayerUploadStatus.preparing,
DockerLayerUploadStatus.waiting,
}:
progress.update(
task_id,
total=1,
completed=0,
)
elif layer_state.status in {
DockerLayerUploadStatus.pushed,
DockerLayerUploadStatus.already_exists,
}:
progress.update(
task_id,
total=1,
completed=1,
)
if commit:
return self.commit(image_id)
else:
return self.get(image_id)
[docs] def commit(self, image: Union[str, Image]) -> Image:
"""
Commit an image.
:param image: The Beaker image ID, name, or object.
:raises ImageNotFound: If the image can't be found on Beaker.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
image_id = self.resolve_image(image).id
return Image.from_json(
self.request(
f"images/{image_id}",
method="PATCH",
data=ImagePatch(commit=True),
exceptions_for_status={404: ImageNotFound(self._not_found_err_msg(image))},
).json()
)
[docs] def delete(self, image: Union[str, Image]):
"""
Delete an image on Beaker.
:param image: The Beaker image ID, name, or object.
:raises ImageNotFound: If the image can't be found on Beaker.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
image_id = self.resolve_image(image).id
self.request(
f"images/{self.url_quote(image_id)}",
method="DELETE",
exceptions_for_status={404: ImageNotFound(self._not_found_err_msg(image))},
)
[docs] def rename(self, image: Union[str, Image], name: str) -> Image:
"""
Rename an image on Beaker.
:param image: The Beaker image ID, name, or object.
:param name: The new name for the image.
:raises ImageNotFound: If the image can't be found on Beaker.
:raises ValueError: If the image name is invalid.
:raises ImageConflict: If an image with the given name already exists.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
self.validate_beaker_name(name)
image_id = self.resolve_image(image).id
return Image.from_json(
self.request(
f"images/{image_id}",
method="PATCH",
data=ImagePatch(name=name),
exceptions_for_status={404: ImageNotFound(self._not_found_err_msg(image))},
).json()
)
[docs] def pull(self, image: Union[str, Image], quiet: bool = False) -> DockerImage:
"""
Pull an image from Beaker.
.. important::
This method returns a Docker :class:`~docker.models.images.Image`, not
a Beaker :class:`~beaker.data_model.image.Image`.
:param image: The Beaker image ID, name, or object.
:param quiet: If ``True``, progress won't be displayed.
:raises ImageNotFound: If the image can't be found on Beaker.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
image_id = self.resolve_image(image).id
repo = ImageRepo.from_json(self.request(f"images/{image_id}/repository").json())
from ..progress import get_image_download_progress
with get_image_download_progress(quiet) as progress:
layer_id_to_task: Dict[str, "TaskID"] = {}
for layer_state_data in self.docker.api.pull(
repo.image_tag,
stream=True,
decode=True,
auth_config={
"username": repo.auth.user,
"password": repo.auth.password,
"server_address": repo.auth.server_address,
},
):
if "id" not in layer_state_data or "status" not in layer_state_data:
continue
if layer_state_data["status"].lower().startswith("pulling "):
continue
layer_state = DockerLayerDownloadState.from_json(layer_state_data)
# Get progress task ID for layer, initializing if it doesn't already exist.
task_id: "TaskID"
if layer_state.id not in layer_id_to_task:
task_id = progress.add_task(layer_state.id, start=True, total=1)
layer_id_to_task[layer_state.id] = task_id
else:
task_id = layer_id_to_task[layer_state.id]
# Update task progress description.
progress.update(
task_id, description=f"{layer_state.id}: {layer_state.status.title()}"
)
# Update task progress total and completed.
if (
layer_state.progress_detail.total is not None
and layer_state.progress_detail.current is not None
):
progress.update(
task_id,
total=layer_state.progress_detail.total,
completed=layer_state.progress_detail.current,
)
elif layer_state.status in {
DockerLayerDownloadStatus.waiting,
DockerLayerDownloadStatus.extracting,
DockerLayerDownloadStatus.verifying_checksum,
}:
progress.update(
task_id,
total=1,
completed=0,
)
elif layer_state.status in {
DockerLayerDownloadStatus.download_complete,
DockerLayerDownloadStatus.pull_complete,
DockerLayerDownloadStatus.already_exists,
}:
progress.update(
task_id,
total=1,
completed=1,
)
local_image = cast(DockerImage, self.docker.images.get(repo.image_tag))
return local_image
[docs] def url(self, image: Union[str, Image]) -> str:
"""
Get the URL for an image.
:param image: The Beaker image ID, name, or object.
:raises ImageNotFound: If the image can't be found on Beaker.
"""
image_id = self.resolve_image(image).id
return f"{self.config.agent_address}/im/{self.url_quote(image_id)}"
def _not_found_err_msg(self, image: Union[str, Image]) -> str:
image = image if isinstance(image, str) else image.id
return (
f"'{image}': Make sure you're using a valid Beaker image ID or the "
f"*full* name of the image (with the account prefix, e.g. 'username/image_name')"
)