import time
from collections import defaultdict
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set, Union
from ..data_model import *
from ..exceptions import *
from .service_client import ServiceClient
if TYPE_CHECKING:
from rich.progress import Progress, TaskID
[docs]class JobClient(ServiceClient):
"""
Accessed via :data:`Beaker.job <beaker.Beaker.job>`.
"""
[docs] def get(self, job_id: str) -> Job:
"""
Get information about a job.
:param job_id: The ID of the Beaker job.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
return Job.from_json(
self.request(
f"jobs/{job_id}",
exceptions_for_status={404: JobNotFound(job_id)},
).json()
)
[docs] def list(
self,
cluster: Optional[Union[str, Cluster]] = None,
experiment: Optional[Union[str, Experiment]] = None,
finalized: bool = False,
kind: Optional[JobKind] = None,
node: Optional[Union[str, Node]] = None,
) -> List[Job]:
"""
List jobs.
:param cluster: List jobs on a cluster.
:param experiment: List jobs in an experiment.
:param finalized: List only finalized or non-finalized jobs.
:param kind: List jobs of a certain kind.
:param node: List jobs on a node.
.. important::
Either ``cluster``, ``experiment``, or ``node`` must be specified.
If ``node`` is specified, neither ``cluster`` nor ``experiment`` can be
specified.
:raises ValueError: If the arguments are invalid, e.g. both ``node`` and
``cluster`` are specified.
:raises ClusterNotFound: If the specified cluster doesn't exist.
:raises ExperimentNotFound: If the specified experiment doesn't exist.
:raises NodeNotFound: If the specified node doesn't exist.
"""
# Validate arguments.
if node is not None:
if cluster is not None:
raise ValueError("You cannot specify both 'node' and 'cluster'")
if experiment is not None:
raise ValueError("You cannot specify both 'node' and 'experiment'")
else:
if cluster is None and experiment is None:
raise ValueError("You must specify one of 'node', 'experiment', or 'cluster'")
jobs: List[Job] = []
# Build request options.
request_opts: Dict[str, Any] = {}
if cluster is not None:
cluster_id = (
cluster.id if isinstance(cluster, Cluster) else self.beaker.cluster.get(cluster).id
)
request_opts["cluster"] = cluster_id
if node is not None:
node_id = node.id if isinstance(node, Node) else self.beaker.node.get(node).id
request_opts["node"] = node_id
if experiment is not None:
exp_id = (
experiment.id
if isinstance(experiment, Experiment)
else self.beaker.experiment.get(experiment).id
)
request_opts["experiment"] = exp_id
if kind is not None:
request_opts["kind"] = kind.value
request_opts["finalized"] = finalized
# Gather jobs, page by page.
while True:
page = Jobs.from_json(self.request("jobs", method="GET", query=request_opts).json())
if page.data:
jobs.extend(page.data)
if not page.next and not page.next_cursor:
break
else:
request_opts["cursor"] = page.next or page.next_cursor
return jobs
[docs] def logs(
self,
job: Union[str, Job],
quiet: bool = False,
since: Optional[Union[str, datetime, timedelta]] = None,
) -> Generator[bytes, None, None]:
"""
Download the logs for a job.
Returns a generator with the streaming bytes from the download.
The generator should be exhausted, otherwise the logs downloaded will be incomplete.
.. seealso::
:meth:`Beaker.experiment.logs() <ExperimentClient.logs>`
.. seealso::
:meth:`follow()`
:param job: The Beaker job ID or object.
:param quiet: If ``True``, progress won't be displayed.
:param since: Only show logs since a particular time. Could be a :class:`~datetime.datetime` object
(naive datetimes will be treated as UTC), a timestamp string in the form of RFC 3339
(e.g. "2013-01-02T13:23:37Z"), or a relative time
(e.g. a :class:`~datetime.timedelta` or a string like "42m").
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
opts = {}
if since is not None:
from ..util import format_since
opts["since"] = format_since(since)
response = self.request(
f"jobs/{job_id}/logs",
method="GET",
exceptions_for_status={404: JobNotFound(job_id)},
stream=True,
query=opts,
)
# TODO: currently beaker doesn't provide the Content-Length header, update this if they do.
# content_length = response.headers.get("Content-Length")
# total = int(content_length) if content_length is not None else None
from ..progress import get_logs_progress
with get_logs_progress(quiet) as progress:
task_id = progress.add_task("Downloading:")
total = 0
for chunk in response.iter_content(chunk_size=1024):
if chunk:
advance = len(chunk)
total += advance
progress.update(task_id, total=total + 1, advance=advance)
yield chunk
[docs] def metrics(self, job: Union[str, Job]) -> Optional[Dict[str, Any]]:
"""
Get the metrics from a job.
.. seealso::
:meth:`Beaker.experiment.metrics() <ExperimentClient.metrics>`
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
return self.request(
f"jobs/{job_id}/results",
method="GET",
exceptions_for_status={404: JobNotFound(job_id)},
).json()["metrics"]
[docs] def results(self, job: Union[str, Job]) -> Optional[Dataset]:
"""
Get the results from a job.
.. seealso::
:meth:`Beaker.experiment.results() <ExperimentClient.results>`
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job = job if isinstance(job, Job) else self.get(job)
if job.result is None or job.result.beaker is None:
return None
else:
try:
return self.beaker.dataset.get(job.result.beaker)
except DatasetNotFound:
return None
[docs] def finalize(self, job: Union[str, Job]) -> Job:
"""
Finalize a job.
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
return Job.from_json(
self.request(
f"jobs/{job_id}",
method="PATCH",
exceptions_for_status={404: JobNotFound(job_id)},
data=JobPatch(status=JobStatusUpdate(finalized=True)),
).json()
)
[docs] def preempt(self, job: Union[str, Job]) -> Job:
"""
Preempt a job.
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
return Job.from_json(
self.request(
f"jobs/{job_id}",
method="PATCH",
exceptions_for_status={404: JobNotFound(job_id)},
data=JobPatch(
status=JobStatusUpdate(
canceled=True,
canceled_code=CanceledCode.user_preemption,
canceled_for=f"Preempted by user '{self.beaker.account.name}'",
)
),
).json()
)
[docs] def stop(self, job: Union[str, Job]) -> Job:
"""
Stop a job.
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
return Job.from_json(
self.request(
f"jobs/{job_id}",
method="PATCH",
exceptions_for_status={404: JobNotFound(job_id)},
data=JobPatch(
status=JobStatusUpdate(
canceled=True, canceled_for=f"Stopped by user '{self.beaker.account.name}'"
)
),
).json()
)
[docs] def wait_for(
self,
*jobs: Union[str, Job],
timeout: Optional[float] = None,
poll_interval: float = 1.0,
quiet: bool = False,
strict: bool = False,
) -> List[Job]:
"""
Wait for jobs to finalize, returning the completed jobs as a list in the same order
they were given as input.
.. caution::
This method is experimental and may change or be removed in future releases.
.. seealso::
:meth:`as_completed()`
.. seealso::
:meth:`follow()`
.. seealso::
:meth:`Beaker.experiment.wait_for() <ExperimentClient.wait_for>`
:param jobs: Job ID, name, or object.
:param timeout: Maximum amount of time to wait for (in seconds).
:param poll_interval: Time to wait between polling each job's status (in seconds).
:param quiet: If ``True``, progress won't be displayed.
:param strict: If ``True``, the exit code of each job will be checked, and a
:class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes.
:raises JobNotFound: If any job can't be found.
:raises JobTimeoutError: If the ``timeout`` expires.
:raises DuplicateJobError: If the same job is given as an argument more than once.
:raises JobFailedError: If ``strict=True`` and any job finishes with a non-zero exit code.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id_to_position: Dict[str, int] = {}
jobs_to_wait_on: List[Job] = []
for i, job_ in enumerate(jobs):
job = job_ if isinstance(job_, Job) else self.get(job_)
jobs_to_wait_on.append(job)
if job.id in job_id_to_position:
raise DuplicateJobError(job.display_name)
job_id_to_position[job.id] = i
completed_jobs: List[Job] = list(
self.as_completed(
*jobs_to_wait_on,
timeout=timeout,
poll_interval=poll_interval,
quiet=quiet,
strict=strict,
)
)
return sorted(completed_jobs, key=lambda job: job_id_to_position[job.id])
[docs] def as_completed(
self,
*jobs: Union[str, Job],
timeout: Optional[float] = None,
poll_interval: float = 1.0,
quiet: bool = False,
strict: bool = False,
) -> Generator[Job, None, None]:
"""
Wait for jobs to finalize, returning an iterator that yields jobs as they complete.
.. caution::
This method is experimental and may change or be removed in future releases.
.. seealso::
:meth:`wait_for()`
.. seealso::
:meth:`follow()`
.. seealso::
:meth:`Beaker.experiment.as_completed() <ExperimentClient.as_completed>`
:param jobs: Job ID, name, or object.
:param timeout: Maximum amount of time to wait for (in seconds).
:param poll_interval: Time to wait between polling each job's status (in seconds).
:param quiet: If ``True``, progress won't be displayed.
:param strict: If ``True``, the exit code of each job will be checked, and a
:class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes.
:raises JobNotFound: If any job can't be found.
:raises JobTimeoutError: If the ``timeout`` expires.
:raises DuplicateJobError: If the same job is given as an argument more than once.
:raises JobFailedError: If ``strict=True`` and any job finishes with a non-zero exit code.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
yield from self._as_completed(
*jobs,
timeout=timeout,
poll_interval=poll_interval,
quiet=quiet,
strict=strict,
)
[docs] def follow(
self,
job: Union[str, Job],
timeout: Optional[float] = None,
strict: bool = False,
include_timestamps: bool = True,
) -> Generator[bytes, None, Job]:
"""
Follow a job live, creating a generator that produces log lines (as bytes) from the job
as they become available. The return value of the generator is the finalized
:class:`~beaker.data_model.job.Job` object.
.. seealso::
:meth:`logs()`
.. seealso::
:meth:`wait_for()`
.. seealso::
:meth:`as_completed()`
.. seealso::
:meth:`Beaker.experiment.follow() <ExperimentClient.follow>`
:param job: Job ID, name, or object.
:param timeout: Maximum amount of time to follow job for (in seconds).
:param strict: If ``True``, the exit code of each job will be checked, and a
:class:`~beaker.exceptions.JobFailedError` will be raised for non-zero exit codes.
:param include_timestamps: If ``True`` (the default) timestamps from the Beaker logs
will be included in the output.
:raises JobNotFound: If any job can't be found.
:raises JobTimeoutError: If the ``timeout`` expires.
:raises JobFailedError: If ``strict=True`` and any job finishes with a non-zero exit code.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
:examples:
>>> job = beaker.experiment.latest_job(hello_world_experiment_name)
>>> for line in beaker.job.follow(job):
... # Every log line from Beaker starts with an RFC 3339 UTC timestamp
... # (e.g. '2021-12-07T19:30:24.637600011Z'). If we don't want to print
... # the timestamps we can split them off like this:
... line = line[line.find(b"Z ")+2:]
... print(line.decode(errors="ignore"), end="")
<BLANKLINE>
Hello from Docker!
This message shows that your installation appears to be working correctly.
<BLANKLINE>
...
"""
from ..util import log_and_wait, split_timestamp
if timeout is not None and timeout <= 0:
raise ValueError("'timeout' must be a positive number")
start = time.monotonic()
last_timestamp: Optional[str] = None
lines_for_timestamp: Dict[str, Set[bytes]] = defaultdict(set)
def get_line_to_yield(line: bytes) -> Optional[bytes]:
nonlocal last_timestamp, lines_for_timestamp
timestamp = split_timestamp(line)
if timestamp is not None and timestamp != last_timestamp:
last_timestamp = timestamp
if include_timestamps:
return line
else:
return line[len(timestamp) + 1 :]
elif timestamp is None and last_timestamp is not None:
if line not in lines_for_timestamp[last_timestamp]:
lines_for_timestamp[last_timestamp].add(line)
return line
return None
def pull_logs_since(updated_job: Job, final: bool = False):
retries = 0
while True:
try:
buffer = b""
for chunk in self.logs(updated_job, quiet=True, since=last_timestamp):
lines = (buffer + chunk).splitlines(keepends=True)
if chunk.endswith(b"\n"):
buffer = b""
elif lines:
# Last line in chunk is not a complete line.
lines, buffer = lines[:-1], lines[-1]
for line in lines:
line_to_yield = get_line_to_yield(line)
if line_to_yield is not None:
yield line_to_yield
if final and buffer:
line_to_yield = get_line_to_yield(buffer + b"\n")
if line_to_yield is not None:
yield line_to_yield
break
except RequestException as err:
if retries < self.beaker.MAX_RETRIES:
log_and_wait(retries, err)
retries += 1
else:
raise
updated_job: Job
while True:
updated_job = self.get(job.id if isinstance(job, Job) else job)
# Pull and yield log lines.
for line in pull_logs_since(updated_job):
yield line
# Check status of job, finish if job is no-longer running.
if updated_job.is_finalized:
break
# Check timeout if we're still waiting for job to complete.
if timeout is not None and time.monotonic() - start >= timeout:
raise JobTimeoutError(updated_job.id)
time.sleep(1.0)
for line in pull_logs_since(updated_job, final=True):
yield line
if strict:
updated_job.check()
return updated_job
def _as_completed(
self,
*jobs: Union[str, Job],
timeout: Optional[float] = None,
poll_interval: float = 1.0,
quiet: bool = False,
strict: bool = False,
_progress: Optional["Progress"] = None,
) -> Generator[Job, None, None]:
if timeout is not None and timeout <= 0:
raise ValueError("'timeout' must be a positive number")
exp_id_to_name: Dict[str, str] = {}
task_id_to_name: Dict[str, str] = {}
def display_name(j: Job) -> str:
if j.execution is None:
return f"[i]{j.id}[/]"
else:
if j.execution.experiment not in exp_id_to_name:
exp = self.beaker.experiment.get(j.execution.experiment)
exp_id_to_name[exp.id] = exp.name if exp.name is not None else exp.id
if j.execution.task not in task_id_to_name:
for task in self.beaker.experiment.tasks(j.execution.experiment):
if task.id not in task_id_to_name:
task_id_to_name[task.id] = (
task.name if task.name is not None else task.id
)
return (
f"[b cyan]{exp_id_to_name[j.execution.experiment]}[/] "
f"\N{rightwards arrow} [i]{task_id_to_name[j.execution.task]}[/]"
)
from ..progress import get_jobs_progress
job_ids: List[str] = []
start = time.monotonic()
owned_progress = _progress is None
progress = _progress or get_jobs_progress(quiet)
if owned_progress:
progress.start()
try:
job_id_to_progress_task: Dict[str, "TaskID"] = {}
for job_ in jobs:
job = job_ if isinstance(job_, Job) else self.get(job_)
job_ids.append(job.id)
if job.id in job_id_to_progress_task:
raise DuplicateJobError(job.id)
job_id_to_progress_task[job.id] = progress.add_task(f"{display_name(job)}:")
polls = 0
while True:
if not job_id_to_progress_task:
yield from []
return
polls += 1
# Poll each experiment and update the progress line.
for job_id in list(job_id_to_progress_task):
task_id = job_id_to_progress_task[job_id]
job = self.get(job_id)
if not job.is_finalized:
progress.update(task_id, total=polls + 1, advance=1)
else:
# Ensure job was successful if `strict==True`.
if strict:
job.check()
progress.update(
task_id,
total=polls + 1,
completed=polls + 1,
)
progress.stop_task(task_id)
del job_id_to_progress_task[job_id]
yield job
elapsed = time.monotonic() - start
if timeout is not None and elapsed >= timeout:
raise JobTimeoutError
time.sleep(poll_interval)
finally:
if owned_progress:
progress.stop()
[docs] def url(self, job: Union[str, Job]) -> str:
job_id = job.id if isinstance(job, Job) else job
return f"{self.config.agent_address}/job/{self.url_quote(job_id)}"