Source code for beaker.services.experiment

import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

from ..aliases import PathOrStr
from ..data_model import *
from ..exceptions import *
from .service_client import ServiceClient

if TYPE_CHECKING:
    from rich.progress import TaskID


[docs]class ExperimentClient(ServiceClient): """ Accessed via :data:`Beaker.experiment <beaker.Beaker.experiment>`. """
[docs] def get(self, experiment: str) -> Experiment: """ Get info about an experiment. :param experiment: The experiment ID or name. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ def _get(id: str) -> Experiment: return Experiment.from_json( self.request( f"experiments/{self.url_quote(id)}", exceptions_for_status={404: ExperimentNotFound(self._not_found_err_msg(id))}, ).json() ) try: # Could be an ID or full name, so we try that first. return _get(experiment) except ExperimentNotFound: if "/" not in experiment: # Try with adding the account name. try: return _get(f"{self.beaker.account.name}/{experiment}") except ExperimentNotFound: pass # Try searching the default workspace. if self.config.default_workspace is not None: matches = self.beaker.workspace.experiments(match=experiment, limit=1) if matches: return matches[0] raise
def _parse_create_args( self, *args, **kwargs ) -> Tuple[Union[ExperimentSpec, PathOrStr], Optional[str], Optional[Union[Workspace, str]]]: spec: Optional[Union[ExperimentSpec, PathOrStr]] = kwargs.pop("spec", None) name: Optional[str] = kwargs.pop("name", None) workspace: Optional[Union[Workspace, str]] = kwargs.pop("workspace", None) if len(args) == 3: if name is not None or spec is not None or workspace is not None: raise TypeError( "ExperimentClient.create() got an unexpected number of positional arguments" ) if ( isinstance(args[0], str) and isinstance(args[1], (ExperimentSpec, Path, str)) and isinstance( args[2], ( str, Workspace, ), ) ): name, spec, workspace = args elif ( isinstance(args[0], (ExperimentSpec, Path, str)) and isinstance(args[1], str) and isinstance( args[2], ( str, Workspace, ), ) ): spec, name, workspace = args else: raise TypeError("ExperimentClient.create() got an unexpected positional argument") elif len(args) == 2: if name is not None or spec is not None: raise TypeError( "ExperimentClient.create() got an unexpected number of positional arguments" ) if isinstance(args[0], str) and isinstance(args[1], (ExperimentSpec, Path, str)): name, spec = args elif isinstance(args[0], (ExperimentSpec, Path, str)) and isinstance(args[1], str): spec, name = args else: raise TypeError("ExperimentClient.create() got an unexpected positional argument") elif len(args) == 1: if spec is None: spec = args[0] elif name is None: name = args[0] else: raise TypeError("ExperimentClient.create() got an unexpected positional argument") elif len(args) != 0: raise TypeError( "ExperimentClient.create() got an unexpected number of positional arguments" ) elif spec is None: raise TypeError("ExperimentClient.create() is missing 1 required argument: 'spec'") if kwargs: raise TypeError( f"ExperimentClient.create() got unexpected keyword arguments {tuple(kwargs.keys())}" ) assert spec is not None return spec, name, workspace
[docs] def list( self, *, author: Optional[Union[str, Account]] = None, cluster: Optional[Union[str, Cluster]] = None, finalized: bool = False, node: Optional[Union[str, Node]] = None, ) -> List[Experiment]: """ List experiments. :param author: List only experiments by particular author. :param cluster: List experiments on a particular cluster. :param finalized: List only finalized or non-finalized experiments. :param node: List experiments on a particular node. .. important:: Either ``cluster``, ``author``, ``experiment``, or ``node`` must be specified. If ``node`` is specified, neither ``cluster`` nor ``experiment`` can be specified. :raises ValueError: If the arguments are invalid, e.g. both ``node`` and ``cluster`` are specified. :raises AccountNotFound: If the specified author doesn't exist. :raises ClusterNotFound: If the specified cluster doesn't exist. :raises NodeNotFound: If the specified node doesn't exist. """ jobs = self.beaker.job.list( author=author, cluster=cluster, finalized=finalized, kind=JobKind.execution, node=node ) experiments: List[Experiment] = [] exp_ids: Set[str] = set() for job in jobs: assert job.execution is not None if (exp_id := job.execution.experiment) not in exp_ids: exp_ids.add(exp_id) experiments.append(self.get(exp_id)) return experiments
[docs] def create( self, *args, **kwargs, ) -> Experiment: """ Create a new Beaker experiment with the given ``spec``. :param spec: The spec for the Beaker experiment. This can either be an :class:`~beaker.data_model.experiment_spec.ExperimentSpec` instance or the path to a YAML spec file. :type spec: :class:`~beaker.data_model.experiment_spec.ExperimentSpec` | :class:`~pathlib.Path` | :class:`str` :param name: An optional name to assign the experiment. Must be unique. :type name: :class:`str`, optional :param workspace: An optional workspace to create the experiment under. If not specified, :data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used. :type workspace: :class:`~beaker.data_model.workspace.Workspace` | :class:`str`, optional :raises ValueError: If the name is invalid. :raises ExperimentConflict: If an experiment with the given name already exists. :raises WorkspaceNotSet: If neither ``workspace`` nor :data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ spec, name, workspace = self._parse_create_args(*args, **kwargs) # For backwards compatibility we parse out the arguments like this to allow for `create(name, spec)` # or just `create(spec)`. if name is not None: self.validate_beaker_name(name) if not isinstance(spec, ExperimentSpec): spec = ExperimentSpec.from_file(spec) spec.validate() json_spec = spec.to_json() workspace = self.resolve_workspace(workspace) self._validate_spec(spec, workspace) experiment_data = self.request( f"workspaces/{workspace.id}/experiments", method="POST", query=None if name is None else {"name": name}, data=json_spec, exceptions_for_status=None if name is None else {409: ExperimentConflict(name)}, ).json() return self.get(experiment_data["id"])
[docs] def spec(self, experiment: Union[str, Experiment]) -> ExperimentSpec: """ Get the :class:`spec <beaker.data_model.experiment_spec.ExperimentSpec>` of an experiment. :param experiment: The experiment ID, name, or object. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ experiment_id = self.resolve_experiment(experiment).id spec_json = self.request( f"experiments/{self.url_quote(experiment_id)}/spec", query={"version": SpecVersion.v2.value}, headers={"Accept": "application/json"}, ).json() if "budget" not in spec_json: spec_json["budget"] = "" return ExperimentSpec.from_json(spec_json)
[docs] def stop(self, experiment: Union[str, Experiment]): """ Stop an experiment. :param experiment: The experiment ID, name, or object. :raises ExperimentNotFound: If the experiment can't be found. :raises ExperimentConflict: If the experiment was already stopped. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ experiment_id = self.resolve_experiment(experiment).id self.request( f"experiments/{self.url_quote(experiment_id)}/stop", method="PUT", exceptions_for_status={ 404: ExperimentNotFound(self._not_found_err_msg(experiment)), 409: ExperimentConflict("Experiment already stopped"), }, )
[docs] def resume(self, experiment: Union[str, Experiment]): """ Resume a preempted experiment. :param experiment: The experiment ID, name, or object. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ experiment_id = self.resolve_experiment(experiment).id self.request( f"experiments/{self.url_quote(experiment_id)}/resume", method="POST", exceptions_for_status={404: ExperimentNotFound(self._not_found_err_msg(experiment))}, )
[docs] def delete(self, experiment: Union[str, Experiment], delete_results_datasets: bool = True): """ Delete an experiment. :param experiment: The experiment ID, name, or object. :param delete_results_datasets: Also delete the experiment's results datasets. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ experiment = self.resolve_experiment(experiment) if delete_results_datasets: for task in self.tasks(experiment): for job in task.jobs: try: dataset = self.beaker.job.results(job) if dataset is not None: self.beaker.dataset.delete(dataset) except DatasetNotFound: pass self.request( f"experiments/{self.url_quote(experiment.id)}", method="DELETE", exceptions_for_status={404: ExperimentNotFound(self._not_found_err_msg(experiment.id))}, )
[docs] def rename(self, experiment: Union[str, Experiment], name: str) -> Experiment: """ Rename an experiment. :param experiment: The experiment ID, name, or object. :param name: The new name for the experiment. :raises ValueError: If the new name is invalid. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ self.validate_beaker_name(name) experiment_id = self.resolve_experiment(experiment).id return Experiment.from_json( self.request( f"experiments/{self.url_quote(experiment_id)}", method="PATCH", data=ExperimentPatch(name=name), exceptions_for_status={ 404: ExperimentNotFound(self._not_found_err_msg(experiment)), 409: ExperimentConflict(name), }, ).json() )
[docs] def tasks(self, experiment: Union[str, Experiment]) -> Tasks: """ List the tasks in an experiment. :param experiment: The experiment ID, name, or object. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. :examples: >>> task = beaker.experiment.tasks(hello_world_experiment_name)["main"] """ experiment_id = self.resolve_experiment(experiment).id tasks = [ Task.from_json(d) for d in self.request( f"experiments/{self.url_quote(experiment_id)}/tasks", method="GET", exceptions_for_status={ 404: ExperimentNotFound(self._not_found_err_msg(experiment)) }, ).json() ] return Tasks(tasks)
[docs] def logs( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None, quiet: bool = False, since: Optional[Union[str, "datetime", "timedelta"]] = None, ) -> Generator[bytes, None, None]: """ Download the logs for an experiment. Returns a generator with the streaming bytes from the download. The generator should be exhausted, otherwise the logs downloaded will be incomplete. .. important:: When there are multiple jobs for the given experiment / task, the logs for the latest job will be returned. .. seealso:: :meth:`Beaker.job.logs() <JobClient.logs>` :param experiment: The experiment ID, name, or object. :param task: The task ID, name, or object of a specific task from the Beaker experiment to fetch logs for. Required if there are multiple tasks in the experiment. :param quiet: If ``True``, progress won't be displayed. :param since: Only show logs since a particular time. Could be a :class:`~datetime.datetime` object (naive datetimes will be treated as UTC), a timestamp string in the form of RFC 3339 (e.g. "2013-01-02T13:23:37Z"), or a :class:`~datetime.timedelta` (e.g. `timedelta(seconds=60)`, which will show you the logs beginning 60 seconds ago). :raises ValueError: The experiment has no tasks or jobs, or the experiment has multiple tasks but ``task`` is not specified. :raises TaskNotFound: If the given task doesn't exist. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ exp = self.resolve_experiment(experiment) job = self.latest_job(exp, task=task, ensure_finalized=False) if job is None: if task is None: raise ValueError(f"Experiment {exp.id} has no jobs") else: raise ValueError( f"Experiment {exp.id} has no jobs for task " f"'{task if isinstance(task, str) else task.display_name}'" ) return self.beaker.job.logs(job.id, quiet=quiet, since=since)
[docs] def metrics( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None ) -> Optional[Dict[str, Any]]: """ Get the metrics from a task in an experiment. .. important:: When there are multiple jobs for the given experiment / task, the metrics for the latest finalized job will be returned. .. seealso:: :meth:`Beaker.job.metrics() <JobClient.metrics>` :param experiment: The experiment ID, name, or object. :param task: The task ID, name, or object of a specific task from the Beaker experiment to fetch metrics for. Required if there are multiple tasks in the experiment. :raises ValueError: The experiment has no tasks, or the experiment has multiple tasks but ``task`` is not specified. :raises TaskNotFound: If the given task doesn't exist. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ exp = self.resolve_experiment(experiment) job = self.latest_job(exp, task=task, ensure_finalized=True) return None if job is None else self.beaker.job.metrics(job.id)
[docs] def results( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None ) -> Optional[Dataset]: """ Get the result dataset from a task in an experiment. .. important:: When there are multiple jobs for the given experiment / task, the results for dataset the latest finalized job will be returned. .. seealso:: :meth:`Beaker.job.results() <JobClient.results>` :param experiment: The experiment ID, name, or object. :param task: The task ID, name, or object of a specific task from the Beaker experiment to fetch results for. Required if there are multiple tasks in the experiment. :raises ValueError: The experiment has no tasks, or the experiment has multiple tasks but ``task`` is not specified. :raises TaskNotFound: If the given task doesn't exist. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ exp = self.resolve_experiment(experiment) job = self.latest_job(exp, task=task, ensure_finalized=True) if job is None: return None else: return self.beaker.job.results(job)
[docs] def wait_for( self, *experiments: Union[str, Experiment], timeout: Optional[float] = None, poll_interval: float = 1.0, quiet: bool = False, strict: bool = False, ) -> List[Experiment]: """ Wait for experiments to finalize, returning the completed experiments as a list in the same order they were given as input. .. caution:: This method is experimental and may change or be removed in future releases. .. seealso:: :meth:`as_completed()` .. seealso:: :meth:`follow()` .. seealso:: :meth:`Beaker.job.wait_for() <JobClient.wait_for>` :param experiments: Experiment ID, name, or object. :param timeout: Maximum amount of time to wait for (in seconds). :param poll_interval: Time to wait between polling the experiment (in seconds). :param quiet: If ``True``, progress won't be displayed. :param strict: If ``True``, the exit code of each job will be checked, and a :class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes. :raises ExperimentNotFound: If any experiment can't be found. :raises JobTimeoutError: If the ``timeout`` expires. :raises DuplicateExperimentError: If the same experiment is given as an argument more than once. :raises JobFailedError: If ``strict=True`` and any job finishes with a non-zero exit code. :raises TaskStoppedError: If ``strict=True`` and a task is stopped before a corresponding job is initialized. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ exp_id_to_position: Dict[str, int] = {} exps_to_wait_on: List[Experiment] = [] for i, exp_ in enumerate(experiments): exp = exp_ if isinstance(exp_, Experiment) else self.get(exp_) exps_to_wait_on.append(exp) if exp.id in exp_id_to_position: raise DuplicateExperimentError(exp.display_name) exp_id_to_position[exp.id] = i completed_exps: List[Experiment] = list( self.as_completed( *exps_to_wait_on, timeout=timeout, poll_interval=poll_interval, quiet=quiet, strict=strict, ) ) return sorted(completed_exps, key=lambda exp: exp_id_to_position[exp.id])
[docs] def as_completed( self, *experiments: Union[str, Experiment], timeout: Optional[float] = None, poll_interval: float = 1.0, quiet: bool = False, strict: bool = False, ) -> Generator[Experiment, None, None]: """ Wait for experiments to finalize, returning an iterator that yields experiments as they complete. .. caution:: This method is experimental and may change or be removed in future releases. .. seealso:: :meth:`wait_for()` .. seealso:: :meth:`follow()` .. seealso:: :meth:`Beaker.job.as_completed() <JobClient.as_completed>` :param experiments: Experiment ID, name, or object. :param timeout: Maximum amount of time to wait for (in seconds). :param poll_interval: Time to wait between polling the experiment (in seconds). :param quiet: If ``True``, progress won't be displayed. :param strict: If ``True``, the exit code of each job will be checked, and a :class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes. :raises ExperimentNotFound: If any experiment can't be found. :raises JobTimeoutError: If the ``timeout`` expires. :raises DuplicateExperimentError: If the same experiment is given as an argument more than once. :raises JobFailedError: If ``strict=True`` and any job finishes with a non-zero exit code. :raises TaskStoppedError: If ``strict=True`` and a task is stopped before a corresponding job is initialized. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ if timeout is not None and timeout <= 0: raise ValueError("'timeout' must be a positive number") from ..progress import get_exps_and_jobs_progress start = time.monotonic() # Initialize progress trackers. live_display, experiments_progress, jobs_progress = get_exps_and_jobs_progress(quiet) # Keeps track of the IDs of each job for each task in each experiment. exp_to_task_to_job: Dict[str, Dict[str, Optional[str]]] = {} # Keeps track of the progress tracker "TaskID"s for each experiment. exp_to_progress_task: Dict[str, "TaskID"] = {} # Keeps track of experiments that haven't been returned yet. incomplete_exps: Set[str] = set() # Keeps track of jobs that have been finalized. finalized_jobs: Set[str] = set() # Keeps track of tasks that were stopped before a job was created. stopped_tasks: Set[str] = set() def num_completed_tasks(exp_id: str) -> int: return len( [ job_id for _, job_id in exp_to_task_to_job[exp_id].items() if job_id is not None and job_id in finalized_jobs ] ) def num_stopped_tasks(exp_id: str) -> int: return len( [ task_id for task_id in exp_to_task_to_job[exp_id].keys() if task_id in stopped_tasks ] ) def total_tasks(exp_id: str) -> int: return len(exp_to_task_to_job[exp_id]) def experiment_finalized(exp_id: str) -> bool: return (num_stopped_tasks(exp_id) + num_completed_tasks(exp_id)) == total_tasks(exp_id) def complete_experiment(exp_id: str) -> Experiment: incomplete_exps.remove(exp_id) experiments_progress.update(exp_to_progress_task[exp_id], completed=total_tasks(exp_id)) return self.get(exp_id) with live_display: # Populate progress trackers and state variables, also yielding # any experiments that are already complete. for e in experiments: experiment = self.get(e.id if isinstance(e, Experiment) else e) exp_id = experiment.id incomplete_exps.add(exp_id) # Ensure experiment is unique. if exp_id in exp_to_task_to_job: raise DuplicateExperimentError(experiment.display_name) # Get state of experiment. exp_to_task_to_job[exp_id] = {} tasks = self.tasks(experiment) for task in tasks: latest_job = self._latest_job(task.jobs) exp_to_task_to_job[exp_id][task.id] = ( None if latest_job is None else latest_job.id ) if not task.jobs and not task.schedulable: stopped_tasks.add(task.id) # Add to progress tracker. exp_to_progress_task[exp_id] = experiments_progress.add_task( experiment.display_name, total=total_tasks(exp_id), ) # Now wait for the incomplete experiments to finalize. while incomplete_exps: # Collect (registered) incomplete jobs and also yield any experiments # that have been finalized or stopped. incomplete_jobs: List[str] = [] for exp_id, task_to_job in exp_to_task_to_job.items(): if exp_id in incomplete_exps: if strict: for task_id in task_to_job: if task_id in stopped_tasks: raise TaskStoppedError(task_id) if not experiment_finalized(exp_id): for task_id, job_id in task_to_job.items(): if job_id is not None and job_id not in finalized_jobs: incomplete_jobs.append(job_id) else: # Experiment has just completed, yield it. yield complete_experiment(exp_id) # Check for timeout. elapsed = time.monotonic() - start if timeout is not None and elapsed >= timeout: raise JobTimeoutError if incomplete_jobs: # Wait for current stack of incomplete jobs to finalize. for job in self.beaker.job._as_completed( *incomplete_jobs, timeout=None if timeout is None else timeout - elapsed, poll_interval=poll_interval, quiet=quiet, _progress=jobs_progress, ): assert job.execution is not None exp_id = job.execution.experiment task_id = job.execution.task if job.was_preempted: # Job was preempted. Another job will start soon so we just stop # tracking this one. exp_to_task_to_job[exp_id][task_id] = None else: finalized_jobs.add(job.id) # Ensure job was successful if `strict==True`. if strict: job.check() # Update progress display. experiments_progress.advance(exp_to_progress_task[exp_id]) # Check if corresponding experiment is now finalized. if experiment_finalized(exp_id) and exp_id in incomplete_exps: # Experiment has just completed, yield it. yield complete_experiment(exp_id) else: # Wait for `poll_interval` to give Beaker a chance to register jobs. time.sleep(poll_interval) # Now check for jobs that haven't been registered yet. for exp_id, task_to_job in exp_to_task_to_job.items(): if experiment_finalized(exp_id): # Experiment already finalized, no need for anything. continue if all(job_id is not None for job_id in task_to_job.values()): # All tasks already have registered jobs. continue for task in self.tasks(exp_id): if task_to_job[task.id] is not None: continue if not task.jobs and not task.schedulable: # Task was stopped before a job was created. stopped_tasks.add(task.id) elif task.jobs: latest_job = self._latest_job(task.jobs) assert latest_job is not None task_to_job[task.id] = latest_job.id
[docs] def follow( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None, timeout: Optional[float] = None, strict: bool = False, include_timestamps: bool = True, since: Optional[Union[str, datetime, timedelta]] = None, ) -> Generator[bytes, None, Experiment]: """ Follow an experiment live, creating a generator that produces log lines (as bytes) from the task's job as they become available. The return value of the generator is the final :class:`~beaker.data_model.experiment.Experiment` object. .. seealso:: :meth:`logs()` .. seealso:: :meth:`wait_for()` .. seealso:: :meth:`as_completed()` .. seealso:: :meth:`Beaker.job.follow() <JobClient.follow>` :param experiment: Experiment ID, name, or object. :param task: The task ID, name, or object of a specific task from the Beaker experiment to follow. Required if there are multiple tasks in the experiment. :param timeout: Maximum amount of time to wait for (in seconds). :param strict: If ``True``, the exit code of the job will be checked, and a :class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes. :param include_timestamps: If ``True`` (the default) timestamps from the Beaker logs will be included in the output. :param since: Only show logs since a particular time. Could be a :class:`~datetime.datetime` object (naive datetimes will be treated as UTC), a timestamp string in the form of RFC 3339 (e.g. "2013-01-02T13:23:37Z"), or a :class:`~datetime.timedelta` (e.g. `timedelta(seconds=60)`, which will show you the logs beginning 60 seconds ago). :raises ExperimentNotFound: If any experiment can't be found. :raises ValueError: The experiment has no tasks or jobs, or the experiment has multiple tasks but ``task`` is not specified. :raises TaskNotFound: If the given task doesn't exist. :raises JobTimeoutError: If the ``timeout`` expires. :raises JobFailedError: If ``strict=True`` and the task's job finishes with a non-zero exit code. :raises TaskStoppedError: If ``strict=True`` and a task is stopped before a corresponding job is initialized. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. :examples: >>> for line in beaker.experiment.follow(hello_world_experiment_name): ... # Every log line from Beaker starts with an RFC 3339 UTC timestamp ... # (e.g. '2021-12-07T19:30:24.637600011Z'). If we don't want to print ... # the timestamps we can split them off like this: ... line = line[line.find(b"Z ")+2:] ... print(line.decode(errors="ignore"), end="") <BLANKLINE> Hello from Docker! This message shows that your installation appears to be working correctly. <BLANKLINE> ... """ if timeout is not None and timeout <= 0: raise ValueError("'timeout' must be a positive number") start = time.monotonic() job: Optional[Job] = None while job is None: actual_task = self._task(experiment, task=task) if actual_task.jobs: job = self.latest_job(experiment, task=actual_task) elif not actual_task.schedulable: if strict: raise TaskStoppedError( task.id if isinstance(task, Task) else task, task=actual_task ) else: return self.get( experiment.id if isinstance(experiment, Experiment) else experiment ) if timeout is not None and time.monotonic() - start >= timeout: raise JobTimeoutError( "Job for task failed to initialize within '{timeout}' seconds" ) time.sleep(2.0) assert job is not None # for mypy yield from self.beaker.job.follow( job, strict=strict, include_timestamps=include_timestamps, since=since ) return self.get(experiment.id if isinstance(experiment, Experiment) else experiment)
[docs] def url( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None ) -> str: """ Get the URL for an experiment. :param experiment: The experiment ID, name, or object. :param task: The task ID, name, or object of a specific task from the Beaker experiment to get the url for. :raises TaskNotFound: If the given task doesn't exist. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ experiment = self.resolve_experiment(experiment) experiment_url = f"{self.config.agent_address}/ex/{self.url_quote(experiment.id)}" if task is None: return experiment_url else: task_id: str if isinstance(task, Task): task_id = task.id else: for t in self.tasks(experiment): if t.name == task or t.id == task: task_id = t.id break else: raise TaskNotFound(f"No task '{task}' in experiment {experiment.id}") return f"{experiment_url}/tasks/{task_id}"
[docs] def latest_job( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None, ensure_finalized: bool = False, ) -> Optional[Job]: """ Get the latest job that ran for a task in an experiment. :param experiment: The experiment ID, name, or object. :param task: The take ID, name, or object. :param ensure_finalized: Consider only finalized jobs. :raises ValueError: The experiment has no tasks, or the experiment has multiple tasks but ``task`` is not specified. :raises TaskNotFound: If the given task doesn't exist. :raises ExperimentNotFound: If the experiment can't be found. :raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur. :raises RequestException: Any other exception that can occur when contacting the Beaker server. """ return self._latest_job( self._task(experiment, task).jobs, ensure_finalized=ensure_finalized )
def _task( self, experiment: Union[str, Experiment], task: Optional[Union[str, Task]] = None ) -> Task: tasks = list(self.tasks(experiment)) exp_id = experiment if isinstance(experiment, str) else experiment.id if not tasks: raise ValueError(f"Experiment '{exp_id}' has no tasks") else: if task is None: if len(tasks) == 1: return tasks[0] else: raise ValueError( f"'task' required since experiment '{exp_id}' has multiple tasks" ) else: task_name_or_id = task.id if isinstance(task, Task) else task tasks = [t for t in tasks if t.name == task_name_or_id or t.id == task_name_or_id] if tasks: return tasks[0] else: raise TaskNotFound(f"No task '{task_name_or_id}' in experiment '{exp_id}'") def _not_found_err_msg(self, experiment: Union[str, Experiment]) -> str: experiment = experiment if isinstance(experiment, str) else experiment.id return ( f"'{experiment}': Make sure you're using a valid Beaker experiment ID or the " f"*full* name of the experiment (with the account prefix, e.g. 'username/experiment_name')" ) def _validate_spec(self, spec: ExperimentSpec, workspace: Workspace) -> None: for task in spec.tasks: # Make sure image exists. if task.image.beaker is not None: self.beaker.image.get(task.image.beaker) # Make sure all beaker data sources exist. for data_mount in task.datasets or []: source = data_mount.source if source.beaker is not None: self.beaker.dataset.get(source.beaker) if source.secret is not None: self.beaker.secret.get(source.secret, workspace=workspace) if source.result is not None: if source.result not in {t.name for t in spec.tasks}: raise ValueError( f"Data mount result source '{source.result}' not found in spec" ) # Make sure secrets in env variables exist. for env_var in task.env_vars or []: if env_var.secret is not None: self.beaker.secret.get(env_var.secret, workspace=workspace) # Make sure cluster exists. if task.context.cluster: self.beaker.cluster.get(task.context.cluster) def _latest_job(self, jobs: Sequence[Job], ensure_finalized: bool = False) -> Optional[Job]: if ensure_finalized: jobs = [ job for job in jobs if job.status.current == CurrentJobStatus.finalized and job.execution is not None ] if not jobs: return None return sorted(jobs, key=lambda job: job.status.created)[-1]