Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring stopwatch functionality back to the Task class #30

Merged
merged 5 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ extend-ignore = E221,E211,E222,E202,F541,E201,E203,E125,E251
per-file-ignores =
repype/typing.py:F405

exclude = tests/*.py tests/textual/*.py
exclude = tests/*.py tests/textual/*.py docs/source/conf.py
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ examples/**/.sha.json
examples/**/.task.json
examples/**/data.dill.gz
examples/**/*.png
examples/**/times.csv

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
3 changes: 3 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@
('py:class', 'textual.widgets._progress_bar.ProgressBar'),
('py:class', 'textual.widgets._text_area.TextArea'),

# Pandas
('py:class', 'pandas.core.frame.DataFrame'),

]
9 changes: 9 additions & 0 deletions docs/source/repype.benchmark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repype.benchmark
================

.. automodule:: repype.benchmark
:members:
:undoc-members:
:show-inheritance:
:special-members:
:exclude-members: __dict__, __weakref__, __repr__, __annotations__, __init__, __hash__, __module__, __orig_bases__, __parameters__, __bound__, __constraints__, __contravariant__, __covariant__, __name__
1 change: 1 addition & 0 deletions docs/source/repype.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ repype
.. toctree::
repype.__main__
repype.batch
repype.benchmark
repype.cli
repype.config
repype.pipeline
Expand Down
115 changes: 115 additions & 0 deletions repype/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pathlib

import pandas as pd

from repype.typing import (
Generic,
InputID,
Iterable,
PathLike,
Self,
Tuple,
TypeVar,
)

ValueType = TypeVar('ValueType')
"""
Represents the type of the benchmark values.
"""


class Benchmark(Generic[ValueType]):
"""
Represents the benchmark data for a task.

Arguments:
filepath: The path to the file where the benchmark data is persisted.

Example:

.. runblock:: pycon

>>> import tempfile
>>> from repype.benchmark import Benchmark
>>>
>>> with tempfile.TemporaryDirectory() as tmp_path:
... benchmark = Benchmark[float](tmp_path + '/benchmark.csv')
... benchmark['stage1', 'input-1'] = 10
... benchmark.save()
... del benchmark
... benchmark = Benchmark[float](tmp_path + '/benchmark.csv')
... print(benchmark['stage1', 'input-1'])
"""

df: pd.DataFrame
"""
The benchmark data dataframe.
"""

filepath: pathlib.Path
"""
The path to the file where the benchmark data is persisted.
"""

def __init__(self, filepath: PathLike):
self.filepath = pathlib.Path(filepath)
if self.filepath.is_file():
self.df = pd.read_csv(self.filepath, index_col = 0)
else:
self.df = pd.DataFrame()

def set(self, other: Self) -> Self:
"""
Set the benchmark data to the benchmark data of another instance.
"""
self.df = other.df.copy()
return self

def __getitem__(self, where: Tuple[str, InputID]) -> ValueType:
"""
Get the benchmark value for a stage and an input.
"""
stage_id, input_id = where
return self.df.at[stage_id, input_id]

def __setitem__(self, where: Tuple[str, InputID], value: ValueType) -> Self:
"""
Set the benchmark `value` for a stage and an input.
"""
stage_id, input_id = where
self.df.at[stage_id, input_id] = value
return self

def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self:
"""
Retain only the benchmark data for the specified `stage_ids` and `input_ids`.
"""

# Keep only those `stage_ids` and `input_ids` that are present in the dataframe,
stage_ids = frozenset(stage_ids) & frozenset(self.df.index)
input_ids = frozenset(input_ids) & frozenset(self.df.columns)

# Ensure that the order of the `stage_ids` and `input_ids` is preserved
stage_ids = sorted(stage_ids, key = lambda stage_id: self.df.index.get_loc(stage_id))
input_ids = sorted(input_ids, key = lambda input_id: self.df.columns.get_loc(input_id))

# Select the subset of the dataframe corresponding to the `stage_ids` and `input_ids`
self.df = self.df[list(input_ids)].transpose()[list(stage_ids)].transpose()
return self

def save(self) -> None:
"""
Persist the benchmark data to :attr:`filepath`.
"""
self.df.to_csv(self.filepath)

def __eq__(self, other: object) -> bool:
"""
Check if the benchmark data is equal to another instance.
"""
return all(
(
isinstance(other, Benchmark),
self.df.equals(other.df),
)
)
14 changes: 7 additions & 7 deletions repype/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ def process(
status: A status object to report the progress of the computations.

Returns:
Tuple ``(data, config, timings)``, where ``data`` is the *pipeline data object* comprising all final and
intermediate results, ``config`` are the finally used hyperparameters, and ``timings`` is a dictionary
containing the run time of each individual pipeline stage (in seconds).
Tuple `(data, config, times)`, where `data` is the *pipeline data object* comprising all final and
intermediate results, `config` are the finally used hyperparameters, and `times` is a dictionary with the
run times of each individual pipeline stage (in seconds).

Raises:
StageError: If an error occurs during the run of a pipeline stage.
Expand Down Expand Up @@ -224,20 +224,20 @@ def process(
ctrl = ProcessingControl(first_stage, last_stage)

# Run the stages of the pipeline
timings = {}
times = dict()
for stage in self.stages:
if ctrl.step(stage.id) or stage.id in extra_stages:
stage_config = config.get(stage.id, {})
try:
dt = stage.run(self, data, stage_config, status = status, **kwargs)
except: # noqa: E722
raise StageError(stage)
timings[stage.id] = dt
times[stage.id] = dt
else:
stage.skip(data, status = status, **kwargs)

# Return the pipeline data object, the final config, and timings
return data, config, timings
# Return the pipeline data object, the final config, and stage run times
return data, config, times

def get_extra_stages(
self,
Expand Down
42 changes: 38 additions & 4 deletions repype/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import mergedeep
import yaml

import repype.benchmark
import repype.config
import repype.pipeline
import repype.stage
Expand Down Expand Up @@ -239,6 +240,20 @@ def digest(self) -> Mapping[str, Any]:
with self.digest_task_filepath.open('r') as digest_task_file:
return frozendict.deepfreeze(json.load(digest_task_file))

@property
def times_filepath(self) -> pathlib.Path:
"""
The path to the CSV file with the run times of the task completion.
"""
return self.resolve_path('times.csv')

@property
def times(self) -> repype.benchmark.Benchmark[float]:
"""
The run times of the task completion.
"""
return repype.benchmark.Benchmark[float](self.times_filepath)

@property
def parents(self) -> Iterator[Self]:
"""
Expand Down Expand Up @@ -455,14 +470,21 @@ def strip_marginals(self, pipeline: repype.pipeline.Pipeline, data_chunk: TaskDa
field: data_chunk[field] for field in data_chunk if field not in marginal_fields
}

def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repype.config.Config) -> None:
def store(
self,
pipeline: repype.pipeline.Pipeline,
data: TaskData,
config: repype.config.Config,
times: repype.benchmark.Benchmark[float],
) -> None:
"""
Store the computed *task data object*.

Arguments:
pipeline: The pipeline used to compute `data`.
data: The *task data object*.
config: The hyperparameters used to vcompute `data`.
times: The run times of the pipeline stages.
"""
assert self.runnable
assert frozenset(data.keys()) == frozenset(self.input_ids)
Expand All @@ -488,6 +510,13 @@ def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repy
with self.digest_sha_filepath.open('w') as digest_sha_file:
json.dump(hashes, digest_sha_file)

# Store the run times of the pipeline stages
assert (
times.filepath == self.times_filepath
), f'Benchmark file path mismatch: "{times.filepath}" != "{self.times_filepath}"'
times.retain((stage.id for stage in pipeline.stages), self.input_ids)
times.save()

def find_first_diverging_stage(
self,
pipeline: repype.pipeline.Pipeline,
Expand Down Expand Up @@ -611,13 +640,15 @@ def run(
pickup_info = self.find_pickup_task(pipeline, config)
if pickup_info['task'] is not None:
data = pickup_info['task'].load(pipeline)
times = self.times.set(pickup_info['task'].times)
first_stage = pickup_info['first_diverging_stage']
else:
pickup = False

# If there is no task to pick up from, run the pipeline from the beginning
if not pickup:
data = dict()
times = repype.benchmark.Benchmark[float](self.times_filepath)
first_stage = None

# Announce the status of the task
Expand Down Expand Up @@ -648,17 +679,20 @@ def run(

# Process the input
data_chunk = data.get(input_id, dict())
data_chunk, final_config, _ = pipeline.process(
data_chunk, final_config, times_chunk = pipeline.process(
input_id = input_id,
data = data_chunk,
config = input_config,
first_stage = first_stage.id if first_stage else None,
status = input_status,
)

if strip_marginals:
data_chunk = self.strip_marginals(pipeline, data_chunk)

# Update the times benchmark
for stage_id, time in times_chunk.items():
times[stage_id, input_id] = time

# Store the final configuration used for the input, if a corresponding scope is defined
if final_config and (final_config_filepath := pipeline.resolve('config', input_id)):
final_config_filepath.parent.mkdir(parents = True, exist_ok = True)
Expand All @@ -669,7 +703,7 @@ def run(

# Store the results for later pick up
repype.status.update(status, info = 'storing', intermediate = True)
self.store(pipeline, data, config)
self.store(pipeline, data, config, times)
repype.status.update(
status = status,
info = 'completed',
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ frozendict~=2.4
setuptools>=24.2.0
pip>=9.0.0
watchdog~=4.0.2
mergedeep~=1.3.4
mergedeep~=1.3.4
pandas>=2,<3
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def strip_raw(rst):
'pyyaml>=6.0.1',
'watchdog>=4.0.2',
'textual[syntax]==0.76.0',
'pandas>=2,<3',
],
description = 'Reproducible batch processing using pipelines for scientific computing.',
long_description = strip_raw(open('README.rst').read()),
Expand Down
Loading