Source code for beaker.services.group

from typing import Generator, List, Optional, Union

from ..data_model import *
from ..exceptions import *
from .service_client import ServiceClient


[docs]class GroupClient(ServiceClient): """ Accessed via :data:`Beaker.group <beaker.Beaker.group>`. """
[docs] def get(self, group: str) -> Group: """ Get info about a group. :param group: The group ID or name. :raises GroupNotFound: If the group can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ def _get(id: str) -> Group: return Group.from_json( self.request( f"groups/{self.url_quote(id)}", exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(id))}, ).json() ) try: # Could be an ID or full name, so we try that first. return _get(group) except GroupNotFound: if "/" not in group: # Try with adding the account name. try: return _get(f"{self.beaker.account.name}/{group}") except GroupNotFound: pass raise
[docs] def create( self, name: str, *experiments: Union[str, Experiment], description: Optional[str] = None, workspace: Optional[Union[Workspace, str]] = None, ) -> Group: """ :param name: The name to assign the group. :param experiments: Experiments to add to the group. :param description: Group description. :param workspace: The workspace to create the group under. If not specified, :data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used. :raises ValueError: If the name is invalid. :raises GroupConflict: If a group with the given name already exists. :raises ExperimentNotFound: If any of the given experiments don't exist. :raises WorkspaceNotSet: If neither ``workspace`` nor :data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ self.validate_beaker_name(name) workspace = self.resolve_workspace(workspace) exp_ids: List[str] = list( set([self.resolve_experiment(experiment).id for experiment in experiments]) ) group_data = self.request( "groups", method="POST", data=GroupSpec( name=name, description=description, workspace=workspace.full_name, experiments=exp_ids, ), exceptions_for_status={409: GroupConflict(name)}, ).json() return self.get(group_data["id"])
[docs] def delete(self, group: Union[str, Group]): """ Delete a group. :param group: The group ID, name, or object. :raises GroupNotFound: If the group can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ group_id = self.resolve_group(group).id self.request( f"groups/{self.url_quote(group_id)}", method="DELETE", exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(group))}, )
[docs] def rename(self, group: Union[str, Group], name: str) -> Group: """ Rename a group. :param group: The group ID, name, or object. :param name: The new name for the group. :raises ValueError: If the new name is invalid. :raises GroupNotFound: If the group can't be found. :raises GroupConflict: If a group by that name already exists. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ self.validate_beaker_name(name) group_id = self.resolve_group(group).id return Group.from_json( self.request( f"groups/{self.url_quote(group_id)}", method="PATCH", data=GroupPatch(name=name), exceptions_for_status={ 404: GroupNotFound(self._not_found_err_msg(group)), 409: GroupConflict(name), }, ).json() )
[docs] def add_experiments(self, group: Union[str, Group], *experiments: Union[str, Experiment]): """ Add experiments to a group. :param group: The group ID, name, or object. :param experiments: Experiments to add to the group. :raises GroupNotFound: If the group can't be found. :raises ExperimentNotFound: If any of the given experiments don't exist. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ group_id = self.resolve_group(group).id exp_ids: List[str] = list( set([self.resolve_experiment(experiment).id for experiment in experiments]) ) self.request( f"groups/{self.url_quote(group_id)}", method="PATCH", data=GroupPatch(add_experiments=exp_ids), exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(group))}, )
[docs] def remove_experiments(self, group: Union[str, Group], *experiments: Union[str, Experiment]): """ Remove experiments from a group. :param group: The group ID, name, or object. :param experiments: Experiments to remove from the group. :raises GroupNotFound: If the group can't be found. :raises ExperimentNotFound: If any of the given experiments don't exist. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ group_id = self.resolve_group(group).id exp_ids: List[str] = list( set([self.resolve_experiment(experiment).id for experiment in experiments]) ) self.request( f"groups/{self.url_quote(group_id)}", method="PATCH", data=GroupPatch(remove_experiments=exp_ids), exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(group))}, )
[docs] def list_experiments(self, group: Union[str, Group]) -> List[Experiment]: """ List experiments in a group. :param group: The group ID, name, or object. :raises GroupNotFound: If the group can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ group_id = self.resolve_group(group).id exp_ids = self.request( f"groups/{self.url_quote(group_id)}/experiments", method="GET", exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(group))}, ).json() # TODO: make these requests concurrently. return [self.beaker.experiment.get(exp_id) for exp_id in exp_ids or []]
[docs] def export_experiments( self, group: Union[str, Group], quiet: bool = False ) -> Generator[bytes, None, None]: """ Export all experiments and metrics in a group as a CSV. Returns a generator that should be exhausted to get the complete file. :param group: The group ID, name, or object. :param quiet: If ``True``, progress won't be displayed. :raises GroupNotFound: If the group can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ group_id = self.resolve_group(group).id resp = self.request( f"groups/{self.url_quote(group_id)}/export.csv", method="GET", exceptions_for_status={404: GroupNotFound(self._not_found_err_msg(group))}, stream=True, ).iter_content(chunk_size=1024) from ..progress import get_group_experiments_progress with get_group_experiments_progress(quiet) as progress: task_id = progress.add_task("Downloading:") total = 0 for chunk in resp: if chunk: advance = len(chunk) total += advance progress.update(task_id, total=total + 1, advance=advance) yield chunk
[docs] def url(self, group: Union[str, Group]) -> str: """ Get the URL for a group. :param group: The group ID, name, or object. :raises GroupNotFound: If the group can't be found. """ group_id = self.resolve_group(group).id return f"{self.config.agent_address}/gr/{self.url_quote(group_id)}/compare"
def _not_found_err_msg(self, group: Union[str, Group]) -> str: group = group if isinstance(group, str) else group.id return ( f"'{group}': Make sure you're using a valid Beaker group ID or the " f"*full* name of the group (with the account prefix, e.g. 'username/group_name')" )