Skip to content

Commit

Permalink
Merge pull request #30 from kostrykin/dev/benchmark
Browse files Browse the repository at this point in the history
Bring stopwatch functionality back to the `Task` class
  • Loading branch information
kostrykin committed Sep 2, 2024
2 parents c156e69 + 0ea08b8 commit 1570c23
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ extend-ignore = E221,E211,E222,E202,F541,E201,E203,E125,E251
per-file-ignores =
repype/typing.py:F405

exclude = tests/*.py tests/textual/*.py
exclude = tests/*.py tests/textual/*.py docs/source/conf.py
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ examples/**/.sha.json
examples/**/.task.json
examples/**/data.dill.gz
examples/**/*.png
examples/**/times.csv

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
3 changes: 3 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@
('py:class', 'textual.widgets._progress_bar.ProgressBar'),
('py:class', 'textual.widgets._text_area.TextArea'),

# Pandas
('py:class', 'pandas.core.frame.DataFrame'),

]
9 changes: 9 additions & 0 deletions docs/source/repype.benchmark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repype.benchmark
================

.. automodule:: repype.benchmark
:members:
:undoc-members:
:show-inheritance:
:special-members:
:exclude-members: __dict__, __weakref__, __repr__, __annotations__, __init__, __hash__, __module__, __orig_bases__, __parameters__, __bound__, __constraints__, __contravariant__, __covariant__, __name__
1 change: 1 addition & 0 deletions docs/source/repype.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ repype
.. toctree::
repype.__main__
repype.batch
repype.benchmark
repype.cli
repype.config
repype.pipeline
Expand Down
115 changes: 115 additions & 0 deletions repype/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pathlib

import pandas as pd

from repype.typing import (
Generic,
InputID,
Iterable,
PathLike,
Self,
Tuple,
TypeVar,
)

ValueType = TypeVar('ValueType')
"""
Represents the type of the benchmark values.
"""


class Benchmark(Generic[ValueType]):
"""
Represents the benchmark data for a task.
Arguments:
filepath: The path to the file where the benchmark data is persisted.
Example:
.. runblock:: pycon
>>> import tempfile
>>> from repype.benchmark import Benchmark
>>>
>>> with tempfile.TemporaryDirectory() as tmp_path:
... benchmark = Benchmark[float](tmp_path + '/benchmark.csv')
... benchmark['stage1', 'input-1'] = 10
... benchmark.save()
... del benchmark
... benchmark = Benchmark[float](tmp_path + '/benchmark.csv')
... print(benchmark['stage1', 'input-1'])
"""

df: pd.DataFrame
"""
The benchmark data dataframe.
"""

filepath: pathlib.Path
"""
The path to the file where the benchmark data is persisted.
"""

def __init__(self, filepath: PathLike):
self.filepath = pathlib.Path(filepath)
if self.filepath.is_file():
self.df = pd.read_csv(self.filepath, index_col = 0)
else:
self.df = pd.DataFrame()

def set(self, other: Self) -> Self:
"""
Set the benchmark data to the benchmark data of another instance.
"""
self.df = other.df.copy()
return self

def __getitem__(self, where: Tuple[str, InputID]) -> ValueType:
"""
Get the benchmark value for a stage and an input.
"""
stage_id, input_id = where
return self.df.at[stage_id, input_id]

def __setitem__(self, where: Tuple[str, InputID], value: ValueType) -> Self:
"""
Set the benchmark `value` for a stage and an input.
"""
stage_id, input_id = where
self.df.at[stage_id, input_id] = value
return self

def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self:
"""
Retain only the benchmark data for the specified `stage_ids` and `input_ids`.
"""

# Keep only those `stage_ids` and `input_ids` that are present in the dataframe,
stage_ids = frozenset(stage_ids) & frozenset(self.df.index)
input_ids = frozenset(input_ids) & frozenset(self.df.columns)

# Ensure that the order of the `stage_ids` and `input_ids` is preserved
stage_ids = sorted(stage_ids, key = lambda stage_id: self.df.index.get_loc(stage_id))
input_ids = sorted(input_ids, key = lambda input_id: self.df.columns.get_loc(input_id))

# Select the subset of the dataframe corresponding to the `stage_ids` and `input_ids`
self.df = self.df[list(input_ids)].transpose()[list(stage_ids)].transpose()
return self

def save(self) -> None:
"""
Persist the benchmark data to :attr:`filepath`.
"""
self.df.to_csv(self.filepath)

def __eq__(self, other: object) -> bool:
"""
Check if the benchmark data is equal to another instance.
"""
return all(
(
isinstance(other, Benchmark),
self.df.equals(other.df),
)
)
14 changes: 7 additions & 7 deletions repype/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ def process(
status: A status object to report the progress of the computations.
Returns:
Tuple ``(data, config, timings)``, where ``data`` is the *pipeline data object* comprising all final and
intermediate results, ``config`` are the finally used hyperparameters, and ``timings`` is a dictionary
containing the run time of each individual pipeline stage (in seconds).
Tuple `(data, config, times)`, where `data` is the *pipeline data object* comprising all final and
intermediate results, `config` are the finally used hyperparameters, and `times` is a dictionary with the
run times of each individual pipeline stage (in seconds).
Raises:
StageError: If an error occurs during the run of a pipeline stage.
Expand Down Expand Up @@ -224,20 +224,20 @@ def process(
ctrl = ProcessingControl(first_stage, last_stage)

# Run the stages of the pipeline
timings = {}
times = dict()
for stage in self.stages:
if ctrl.step(stage.id) or stage.id in extra_stages:
stage_config = config.get(stage.id, {})
try:
dt = stage.run(self, data, stage_config, status = status, **kwargs)
except: # noqa: E722
raise StageError(stage)
timings[stage.id] = dt
times[stage.id] = dt
else:
stage.skip(data, status = status, **kwargs)

# Return the pipeline data object, the final config, and timings
return data, config, timings
# Return the pipeline data object, the final config, and stage run times
return data, config, times

def get_extra_stages(
self,
Expand Down
42 changes: 38 additions & 4 deletions repype/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import mergedeep
import yaml

import repype.benchmark
import repype.config
import repype.pipeline
import repype.stage
Expand Down Expand Up @@ -239,6 +240,20 @@ def digest(self) -> Mapping[str, Any]:
with self.digest_task_filepath.open('r') as digest_task_file:
return frozendict.deepfreeze(json.load(digest_task_file))

@property
def times_filepath(self) -> pathlib.Path:
"""
The path to the CSV file with the run times of the task completion.
"""
return self.resolve_path('times.csv')

@property
def times(self) -> repype.benchmark.Benchmark[float]:
"""
The run times of the task completion.
"""
return repype.benchmark.Benchmark[float](self.times_filepath)

@property
def parents(self) -> Iterator[Self]:
"""
Expand Down Expand Up @@ -455,14 +470,21 @@ def strip_marginals(self, pipeline: repype.pipeline.Pipeline, data_chunk: TaskDa
field: data_chunk[field] for field in data_chunk if field not in marginal_fields
}

def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repype.config.Config) -> None:
def store(
self,
pipeline: repype.pipeline.Pipeline,
data: TaskData,
config: repype.config.Config,
times: repype.benchmark.Benchmark[float],
) -> None:
"""
Store the computed *task data object*.
Arguments:
pipeline: The pipeline used to compute `data`.
data: The *task data object*.
config: The hyperparameters used to vcompute `data`.
times: The run times of the pipeline stages.
"""
assert self.runnable
assert frozenset(data.keys()) == frozenset(self.input_ids)
Expand All @@ -488,6 +510,13 @@ def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repy
with self.digest_sha_filepath.open('w') as digest_sha_file:
json.dump(hashes, digest_sha_file)

# Store the run times of the pipeline stages
assert (
times.filepath == self.times_filepath
), f'Benchmark file path mismatch: "{times.filepath}" != "{self.times_filepath}"'
times.retain((stage.id for stage in pipeline.stages), self.input_ids)
times.save()

def find_first_diverging_stage(
self,
pipeline: repype.pipeline.Pipeline,
Expand Down Expand Up @@ -611,13 +640,15 @@ def run(
pickup_info = self.find_pickup_task(pipeline, config)
if pickup_info['task'] is not None:
data = pickup_info['task'].load(pipeline)
times = self.times.set(pickup_info['task'].times)
first_stage = pickup_info['first_diverging_stage']
else:
pickup = False

# If there is no task to pick up from, run the pipeline from the beginning
if not pickup:
data = dict()
times = repype.benchmark.Benchmark[float](self.times_filepath)
first_stage = None

# Announce the status of the task
Expand Down Expand Up @@ -648,17 +679,20 @@ def run(

# Process the input
data_chunk = data.get(input_id, dict())
data_chunk, final_config, _ = pipeline.process(
data_chunk, final_config, times_chunk = pipeline.process(
input_id = input_id,
data = data_chunk,
config = input_config,
first_stage = first_stage.id if first_stage else None,
status = input_status,
)

if strip_marginals:
data_chunk = self.strip_marginals(pipeline, data_chunk)

# Update the times benchmark
for stage_id, time in times_chunk.items():
times[stage_id, input_id] = time

# Store the final configuration used for the input, if a corresponding scope is defined
if final_config and (final_config_filepath := pipeline.resolve('config', input_id)):
final_config_filepath.parent.mkdir(parents = True, exist_ok = True)
Expand All @@ -669,7 +703,7 @@ def run(

# Store the results for later pick up
repype.status.update(status, info = 'storing', intermediate = True)
self.store(pipeline, data, config)
self.store(pipeline, data, config, times)
repype.status.update(
status = status,
info = 'completed',
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ frozendict~=2.4
setuptools>=24.2.0
pip>=9.0.0
watchdog~=4.0.2
mergedeep~=1.3.4
mergedeep~=1.3.4
pandas>=2,<3
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def strip_raw(rst):
'pyyaml>=6.0.1',
'watchdog>=4.0.2',
'textual[syntax]==0.76.0',
'pandas>=2,<3',
],
description = 'Reproducible batch processing using pipelines for scientific computing.',
long_description = strip_raw(open('README.rst').read()),
Expand Down
Loading

0 comments on commit 1570c23

Please sign in to comment.