From 01f25c13aadd7daaabc5205afa59ad370a881340 Mon Sep 17 00:00:00 2001 From: Leonid Kostrykin Date: Mon, 2 Sep 2024 14:13:39 +0200 Subject: [PATCH 1/5] Add repype.benchmark and tests --- repype/benchmark.py | 63 +++++++++++++++++++++++ requirements.txt | 3 +- setup.py | 1 + tests/test_benchmark.py | 110 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 repype/benchmark.py create mode 100644 tests/test_benchmark.py diff --git a/repype/benchmark.py b/repype/benchmark.py new file mode 100644 index 0000000..f64f553 --- /dev/null +++ b/repype/benchmark.py @@ -0,0 +1,63 @@ +import pathlib + +import pandas as pd + +from repype.typing import ( + Generic, + InputID, + Iterable, + PathLike, + Self, + Tuple, + TypeVar, +) + +ValueType = TypeVar('ValueType') + + +class Benchmark(Generic[ValueType]): + + df: pd.DataFrame + """ + """ + + filepath: pathlib.Path + """ + """ + + def __init__(self, filepath: PathLike): + self.filepath = pathlib.Path(filepath) + if self.filepath.is_file(): + self.df = pd.read_csv(self.filepath, index_col = 0) + else: + self.df = pd.DataFrame() + + def set(self, other: Self) -> Self: + self.df = other.df.copy() + return self + + def __getitem__(self, where: Tuple[str, InputID]) -> ValueType: + stage_id, input_id = where + return self.df.at[stage_id, input_id] + + def __setitem__(self, where: Tuple[str, InputID], value: ValueType) -> Self: + stage_id, input_id = where + self.df.at[stage_id, input_id] = value + return self + + def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self: + stage_ids = list(stage_ids) + input_ids = list(input_ids) + + # Keep only those `stage_ids` and `input_ids` that are present in the dataframe, + # ensure that the order of the stage_ids and input_ids is preserved + stage_ids = sorted(frozenset(stage_ids) & frozenset(self.df.index), key = lambda val: stage_ids.index(val)) + input_ids = sorted(frozenset(input_ids) & frozenset(self.df.columns), key = lambda val: input_ids.index(val)) + + # Select the subset of the dataframe corresponding to the `stage_ids` and `input_ids` + self.df = self.df[list(input_ids)].transpose()[list(stage_ids)].transpose() + return self + + def save(self) -> Self: + self.df.to_csv(self.filepath) + return self diff --git a/requirements.txt b/requirements.txt index ec9d1a0..fa50645 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ frozendict~=2.4 setuptools>=24.2.0 pip>=9.0.0 watchdog~=4.0.2 -mergedeep~=1.3.4 \ No newline at end of file +mergedeep~=1.3.4 +pandas>=2,<3 \ No newline at end of file diff --git a/setup.py b/setup.py index d965f7c..d62d8d4 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,7 @@ def strip_raw(rst): 'pyyaml>=6.0.1', 'watchdog>=4.0.2', 'textual[syntax]==0.76.0', + 'pandas>=2,<3', ], description = 'Reproducible batch processing using pipelines for scientific computing.', long_description = strip_raw(open('README.rst').read()), diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py new file mode 100644 index 0000000..f545501 --- /dev/null +++ b/tests/test_benchmark.py @@ -0,0 +1,110 @@ +import pathlib +import tempfile +import unittest + +import pandas as pd + +import repype.benchmark + +from . import testsuite + + +class Benchmark__init__(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__new__float(self, dirpath): + benchmark = repype.benchmark.Benchmark[float](dirpath / 'benchmark.csv') + pd.testing.assert_frame_equal(benchmark.df, pd.DataFrame()) + + @testsuite.with_temporary_paths(1) + def test__loaded__float(self, dirpath): + df = pd.DataFrame() + df.at['stage1', 'input-1'] = 10 + df.to_csv(dirpath / 'benchmark.csv') + benchmark = repype.benchmark.Benchmark[float](dirpath / 'benchmark.csv') + pd.testing.assert_frame_equal(benchmark.df, df) + + +class Benchmark__save(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__float(self, dirpath): + benchmark = repype.benchmark.Benchmark[float](dirpath / 'benchmark.csv') + benchmark.df.at['stage1', 'input-1'] = 10 + benchmark.save() + df = pd.read_csv(benchmark.filepath, index_col = 0) + pd.testing.assert_frame_equal(benchmark.df, df) + + +class Benchmark_set(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__float(self, dirpath): + benchmark1 = repype.benchmark.Benchmark[float](dirpath / 'benchmark1.csv') + benchmark2 = repype.benchmark.Benchmark[float](dirpath / 'benchmark2.csv') + benchmark1['stage1', 'input-1'] = 10 + ret = benchmark2.set(benchmark1) + benchmark1['stage1', 'input-1'] = 0 + self.assertIs(ret, benchmark2) + self.assertEqual(benchmark2['stage1', 'input-1'], 10.0) + + +class Benchmark__setitem__(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__new__float(self, dirpath): + benchmark = repype.benchmark.Benchmark[float](dirpath / 'benchmark.csv') + benchmark['stage1', 'input-1'] = 10 + self.assertEqual(benchmark.df.at['stage1', 'input-1'], 10.0) + + +class Benchmark__getitem__(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__new__float(self, dirpath): + benchmark = repype.benchmark.Benchmark[float](dirpath / 'benchmark.csv') + benchmark.df.at['stage1', 'input-1'] = 10 + self.assertEqual(benchmark['stage1', 'input-1'], 10.0) + + +class Benchmark__retain__float(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.TemporaryDirectory() + self.dirpath = pathlib.Path(self.tempdir.name) + self.benchmark = repype.benchmark.Benchmark[float](self.dirpath / 'benchmark.csv') + self.benchmark.df.at['stage1', 'input-1'] = 10 + self.benchmark.df.at['stage1', 'input-2'] = 20 + self.benchmark.df.at['stage1', 'input-3'] = 30 + self.benchmark.df.at['stage2', 'input-1'] = 40 + self.benchmark.df.at['stage2', 'input-2'] = 50 + self.benchmark.df.at['stage2', 'input-3'] = 60 + + def tearDown(self): + self.tempdir.cleanup() + + def test_subset(self): + ret = self.benchmark.retain(['stage1'], ['input-1', 'input-2']) + self.assertIs(ret, self.benchmark) + pd.testing.assert_frame_equal( + self.benchmark.df, + pd.DataFrame( + { + 'input-1': {'stage1': 10.0}, + 'input-2': {'stage1': 20.0}, + } + ), + ) + + def test_unrelated(self): + ret = self.benchmark.retain(['stage1', 'stage3'], ['input-0', 'input-1', 'input-2']) + self.assertIs(ret, self.benchmark) + pd.testing.assert_frame_equal( + self.benchmark.df, + pd.DataFrame( + { + 'input-1': {'stage1': 10.0}, + 'input-2': {'stage1': 20.0}, + } + ), + ) From daa69411885b9064f0cc5d5bb0bc729b3a7c793f Mon Sep 17 00:00:00 2001 From: Leonid Kostrykin Date: Mon, 2 Sep 2024 15:09:58 +0200 Subject: [PATCH 2/5] Fix bug in `Benchmark.retain` --- repype/benchmark.py | 13 ++++++++----- tests/test_benchmark.py | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/repype/benchmark.py b/repype/benchmark.py index f64f553..05759ac 100644 --- a/repype/benchmark.py +++ b/repype/benchmark.py @@ -46,13 +46,16 @@ def __setitem__(self, where: Tuple[str, InputID], value: ValueType) -> Self: return self def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self: - stage_ids = list(stage_ids) - input_ids = list(input_ids) + """ + """ # Keep only those `stage_ids` and `input_ids` that are present in the dataframe, - # ensure that the order of the stage_ids and input_ids is preserved - stage_ids = sorted(frozenset(stage_ids) & frozenset(self.df.index), key = lambda val: stage_ids.index(val)) - input_ids = sorted(frozenset(input_ids) & frozenset(self.df.columns), key = lambda val: input_ids.index(val)) + stage_ids = frozenset(stage_ids) & frozenset(self.df.index) + input_ids = frozenset(input_ids) & frozenset(self.df.columns) + + # Ensure that the order of the `stage_ids` and `input_ids` is preserved + stage_ids = sorted(stage_ids, key = lambda stage_id: self.df.index.get_loc(stage_id)) + input_ids = sorted(input_ids, key = lambda input_id: self.df.columns.get_loc(input_id)) # Select the subset of the dataframe corresponding to the `stage_ids` and `input_ids` self.df = self.df[list(input_ids)].transpose()[list(stage_ids)].transpose() diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index f545501..ee8159e 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -84,7 +84,7 @@ def tearDown(self): self.tempdir.cleanup() def test_subset(self): - ret = self.benchmark.retain(['stage1'], ['input-1', 'input-2']) + ret = self.benchmark.retain(['stage1'], ['input-2', 'input-1']) self.assertIs(ret, self.benchmark) pd.testing.assert_frame_equal( self.benchmark.df, From 6de830be0a38c3ff794cf9869cd1a9ff6e1c4760 Mon Sep 17 00:00:00 2001 From: Leonid Kostrykin Date: Mon, 2 Sep 2024 15:36:14 +0200 Subject: [PATCH 3/5] Add `Benchmark.__eq__` --- repype/benchmark.py | 8 ++++++++ tests/test_benchmark.py | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/repype/benchmark.py b/repype/benchmark.py index 05759ac..4a3fd9e 100644 --- a/repype/benchmark.py +++ b/repype/benchmark.py @@ -64,3 +64,11 @@ def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self def save(self) -> Self: self.df.to_csv(self.filepath) return self + + def __eq__(self, other: object) -> bool: + return all( + ( + isinstance(other, Benchmark), + self.df.equals(other.df), + ) + ) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index ee8159e..c3892fd 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -108,3 +108,22 @@ def test_unrelated(self): } ), ) + + +class Benchmark__eq__(unittest.TestCase): + + @testsuite.with_temporary_paths(1) + def test__equal__float(self, dirpath): + benchmark1 = repype.benchmark.Benchmark[float](dirpath / 'benchmark1.csv') + benchmark2 = repype.benchmark.Benchmark[float](dirpath / 'benchmark2.csv') + benchmark1['stage1', 'input-1'] = 10 + benchmark2['stage1', 'input-1'] = 10 + self.assertEqual(benchmark1, benchmark2) + + @testsuite.with_temporary_paths(1) + def test__not_equal__float(self, dirpath): + benchmark1 = repype.benchmark.Benchmark[float](dirpath / 'benchmark1.csv') + benchmark2 = repype.benchmark.Benchmark[float](dirpath / 'benchmark2.csv') + benchmark1['stage1', 'input-1'] = 10 + benchmark2['stage1', 'input-1'] = 20 + self.assertNotEqual(benchmark1, benchmark2) From f97c791b43b464e5dbca8f80571dd57c20dc06ec Mon Sep 17 00:00:00 2001 From: Leonid Kostrykin Date: Mon, 2 Sep 2024 15:51:21 +0200 Subject: [PATCH 4/5] Add `Task.times_filepath`, `Task.times`, and tests --- .gitignore | 1 + repype/pipeline.py | 14 +++++++------- repype/task.py | 42 ++++++++++++++++++++++++++++++++++++++---- tests/test_repype.py | 33 ++++++++++++++++++++++++++++++++- tests/test_task.py | 13 ++++++++++--- 5 files changed, 88 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index d352d7a..1e7e263 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ examples/**/.sha.json examples/**/.task.json examples/**/data.dill.gz examples/**/*.png +examples/**/times.csv # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/repype/pipeline.py b/repype/pipeline.py index 1de8978..e769f56 100644 --- a/repype/pipeline.py +++ b/repype/pipeline.py @@ -188,9 +188,9 @@ def process( status: A status object to report the progress of the computations. Returns: - Tuple ``(data, config, timings)``, where ``data`` is the *pipeline data object* comprising all final and - intermediate results, ``config`` are the finally used hyperparameters, and ``timings`` is a dictionary - containing the run time of each individual pipeline stage (in seconds). + Tuple `(data, config, times)`, where `data` is the *pipeline data object* comprising all final and + intermediate results, `config` are the finally used hyperparameters, and `times` is a dictionary with the + run times of each individual pipeline stage (in seconds). Raises: StageError: If an error occurs during the run of a pipeline stage. @@ -224,7 +224,7 @@ def process( ctrl = ProcessingControl(first_stage, last_stage) # Run the stages of the pipeline - timings = {} + times = dict() for stage in self.stages: if ctrl.step(stage.id) or stage.id in extra_stages: stage_config = config.get(stage.id, {}) @@ -232,12 +232,12 @@ def process( dt = stage.run(self, data, stage_config, status = status, **kwargs) except: # noqa: E722 raise StageError(stage) - timings[stage.id] = dt + times[stage.id] = dt else: stage.skip(data, status = status, **kwargs) - # Return the pipeline data object, the final config, and timings - return data, config, timings + # Return the pipeline data object, the final config, and stage run times + return data, config, times def get_extra_stages( self, diff --git a/repype/task.py b/repype/task.py index 8320b86..e22f099 100644 --- a/repype/task.py +++ b/repype/task.py @@ -11,6 +11,7 @@ import mergedeep import yaml +import repype.benchmark import repype.config import repype.pipeline import repype.stage @@ -239,6 +240,20 @@ def digest(self) -> Mapping[str, Any]: with self.digest_task_filepath.open('r') as digest_task_file: return frozendict.deepfreeze(json.load(digest_task_file)) + @property + def times_filepath(self) -> pathlib.Path: + """ + The path to the CSV file with the run times of the task completion. + """ + return self.resolve_path('times.csv') + + @property + def times(self) -> repype.benchmark.Benchmark[float]: + """ + The run times of the task completion. + """ + return repype.benchmark.Benchmark[float](self.times_filepath) + @property def parents(self) -> Iterator[Self]: """ @@ -455,7 +470,13 @@ def strip_marginals(self, pipeline: repype.pipeline.Pipeline, data_chunk: TaskDa field: data_chunk[field] for field in data_chunk if field not in marginal_fields } - def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repype.config.Config) -> None: + def store( + self, + pipeline: repype.pipeline.Pipeline, + data: TaskData, + config: repype.config.Config, + times: repype.benchmark.Benchmark[float], + ) -> None: """ Store the computed *task data object*. @@ -463,6 +484,7 @@ def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repy pipeline: The pipeline used to compute `data`. data: The *task data object*. config: The hyperparameters used to vcompute `data`. + times: The run times of the pipeline stages. """ assert self.runnable assert frozenset(data.keys()) == frozenset(self.input_ids) @@ -488,6 +510,13 @@ def store(self, pipeline: repype.pipeline.Pipeline, data: TaskData, config: repy with self.digest_sha_filepath.open('w') as digest_sha_file: json.dump(hashes, digest_sha_file) + # Store the run times of the pipeline stages + assert ( + times.filepath == self.times_filepath + ), f'Benchmark file path mismatch: "{times.filepath}" != "{self.times_filepath}"' + times.retain((stage.id for stage in pipeline.stages), self.input_ids) + times.save() + def find_first_diverging_stage( self, pipeline: repype.pipeline.Pipeline, @@ -611,6 +640,7 @@ def run( pickup_info = self.find_pickup_task(pipeline, config) if pickup_info['task'] is not None: data = pickup_info['task'].load(pipeline) + times = self.times.set(pickup_info['task'].times) first_stage = pickup_info['first_diverging_stage'] else: pickup = False @@ -618,6 +648,7 @@ def run( # If there is no task to pick up from, run the pipeline from the beginning if not pickup: data = dict() + times = repype.benchmark.Benchmark[float](self.times_filepath) first_stage = None # Announce the status of the task @@ -648,17 +679,20 @@ def run( # Process the input data_chunk = data.get(input_id, dict()) - data_chunk, final_config, _ = pipeline.process( + data_chunk, final_config, times_chunk = pipeline.process( input_id = input_id, data = data_chunk, config = input_config, first_stage = first_stage.id if first_stage else None, status = input_status, ) - if strip_marginals: data_chunk = self.strip_marginals(pipeline, data_chunk) + # Update the times benchmark + for stage_id, time in times_chunk.items(): + times[stage_id, input_id] = time + # Store the final configuration used for the input, if a corresponding scope is defined if final_config and (final_config_filepath := pipeline.resolve('config', input_id)): final_config_filepath.parent.mkdir(parents = True, exist_ok = True) @@ -669,7 +703,7 @@ def run( # Store the results for later pick up repype.status.update(status, info = 'storing', intermediate = True) - self.store(pipeline, data, config) + self.store(pipeline, data, config, times) repype.status.update( status = status, info = 'completed', diff --git a/tests/test_repype.py b/tests/test_repype.py index 2f6b734..283778b 100644 --- a/tests/test_repype.py +++ b/tests/test_repype.py @@ -11,6 +11,7 @@ import skimage import skimage.segmentation +import repype.benchmark import repype.cli import repype.pipeline import repype.stage @@ -188,4 +189,34 @@ def test(self): # Load and verify the segmentation result for `sigma=2` segmentation = skimage.io.imread(self.root_path / 'task' / 'sigma=2' / 'seg' / 'B2--W00026--P00001--Z00000--T00000--dapi.tif.png') n_onjects = ndi.label(segmentation)[1] - self.assertEqual(n_onjects, 435) \ No newline at end of file + self.assertEqual(n_onjects, 435) + + # Load and verify the times for `sigma=1` + times1 = repype.benchmark.Benchmark(self.root_path / 'task' / 'times.csv') + self.assertEqual(times1.df.shape, (4, 1)) + self.assertGreater(float(times1['download', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + self.assertGreater(float(times1['unzip', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + self.assertGreater(float(times1['segmentation', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + self.assertGreater(float(times1['output', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + + # Load and verify the times for `sigma=2` + times2 = repype.benchmark.Benchmark(self.root_path / 'task' / 'sigma=2' / 'times.csv') + self.assertEqual(times1.df.shape, (4, 1)) + self.assertEqual( + times1['download', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + times2['download', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + ) + self.assertEqual( + times1['unzip', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + times2['unzip', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + ) + self.assertGreater(float(times2['segmentation', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + self.assertGreater(float(times2['output', 'B2--W00026--P00001--Z00000--T00000--dapi.tif']), 0) + self.assertNotEqual( + times1['segmentation', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + times2['segmentation', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + ) + self.assertNotEqual( + times1['output', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + times2['output', 'B2--W00026--P00001--Z00000--T00000--dapi.tif'], + ) \ No newline at end of file diff --git a/tests/test_task.py b/tests/test_task.py index 95ad963..caa7744 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -12,6 +12,7 @@ import dill import yaml +import repype.benchmark import repype.pipeline import repype.status import repype.task @@ -713,7 +714,12 @@ def test(self, path): 'output3.1': 'value3.1', }, } - task.store(pipeline, data, config) + times = repype.benchmark.Benchmark[float](task.times_filepath) + times['stage1', 'file-0'] = 1.0 + times['stage2', 'file-0'] = 2.5 + times['stage3', 'file-0'] = 3.0 + times['stage4', 'file-0'] = 3.5 + task.store(pipeline, data, config, times) # Load the stored data with gzip.open(task.data_filepath, 'rb') as data_file: @@ -734,6 +740,7 @@ def test(self, path): }, ) self.assertEqual(task_digest, task.full_spec) + self.assertEqual(task.times, times) class Task__load(unittest.TestCase): @@ -1013,7 +1020,7 @@ def test_not_runnable(self, *args): self.task.run(self.config) def test_nothing_to_pickup(self, mock_create_pipeline, mock_load, mock_store): - mock_create_pipeline.return_value.process.return_value = (dict(), None, None) + mock_create_pipeline.return_value.process.return_value = (dict(), None, dict()) self.task.run(self.config) mock_load.assert_not_called() mock_create_pipeline.assert_called_once_with() @@ -1035,7 +1042,7 @@ def test_nothing_to_pickup(self, mock_create_pipeline, mock_load, mock_store): mock_store.assert_called_once() def test_with_pickup(self, mock_create_pipeline, mock_load, mock_store): - mock_create_pipeline.return_value.process.return_value = (dict(), None, None) + mock_create_pipeline.return_value.process.return_value = (dict(), None, dict()) mock_load.return_value = { 'file-0': dict(output = 'value1'), 'file-1': dict(output = 'value2'), From 0ea08b8d6393bc8811c0b6a780bbcff4be247d40 Mon Sep 17 00:00:00 2001 From: Leonid Kostrykin Date: Mon, 2 Sep 2024 16:20:32 +0200 Subject: [PATCH 5/5] Add docstrings for `Benchmark` --- .flake8 | 2 +- docs/source/conf.py | 3 +++ docs/source/repype.benchmark.rst | 9 +++++++ docs/source/repype.rst | 1 + repype/benchmark.py | 45 ++++++++++++++++++++++++++++++-- 5 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 docs/source/repype.benchmark.rst diff --git a/.flake8 b/.flake8 index 645288a..d4f266f 100644 --- a/.flake8 +++ b/.flake8 @@ -7,4 +7,4 @@ extend-ignore = E221,E211,E222,E202,F541,E201,E203,E125,E251 per-file-ignores = repype/typing.py:F405 -exclude = tests/*.py tests/textual/*.py \ No newline at end of file +exclude = tests/*.py tests/textual/*.py docs/source/conf.py \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 4b215c9..9896c4a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -80,4 +80,7 @@ ('py:class', 'textual.widgets._progress_bar.ProgressBar'), ('py:class', 'textual.widgets._text_area.TextArea'), + # Pandas + ('py:class', 'pandas.core.frame.DataFrame'), + ] \ No newline at end of file diff --git a/docs/source/repype.benchmark.rst b/docs/source/repype.benchmark.rst new file mode 100644 index 0000000..d7e797a --- /dev/null +++ b/docs/source/repype.benchmark.rst @@ -0,0 +1,9 @@ +repype.benchmark +================ + +.. automodule:: repype.benchmark + :members: + :undoc-members: + :show-inheritance: + :special-members: + :exclude-members: __dict__, __weakref__, __repr__, __annotations__, __init__, __hash__, __module__, __orig_bases__, __parameters__, __bound__, __constraints__, __contravariant__, __covariant__, __name__ diff --git a/docs/source/repype.rst b/docs/source/repype.rst index 60c9f87..ca65182 100644 --- a/docs/source/repype.rst +++ b/docs/source/repype.rst @@ -4,6 +4,7 @@ repype .. toctree:: repype.__main__ repype.batch + repype.benchmark repype.cli repype.config repype.pipeline diff --git a/repype/benchmark.py b/repype/benchmark.py index 4a3fd9e..b3991e7 100644 --- a/repype/benchmark.py +++ b/repype/benchmark.py @@ -13,16 +13,42 @@ ) ValueType = TypeVar('ValueType') +""" +Represents the type of the benchmark values. +""" class Benchmark(Generic[ValueType]): + """ + Represents the benchmark data for a task. + + Arguments: + filepath: The path to the file where the benchmark data is persisted. + + Example: + + .. runblock:: pycon + + >>> import tempfile + >>> from repype.benchmark import Benchmark + >>> + >>> with tempfile.TemporaryDirectory() as tmp_path: + ... benchmark = Benchmark[float](tmp_path + '/benchmark.csv') + ... benchmark['stage1', 'input-1'] = 10 + ... benchmark.save() + ... del benchmark + ... benchmark = Benchmark[float](tmp_path + '/benchmark.csv') + ... print(benchmark['stage1', 'input-1']) + """ df: pd.DataFrame """ + The benchmark data dataframe. """ filepath: pathlib.Path """ + The path to the file where the benchmark data is persisted. """ def __init__(self, filepath: PathLike): @@ -33,20 +59,30 @@ def __init__(self, filepath: PathLike): self.df = pd.DataFrame() def set(self, other: Self) -> Self: + """ + Set the benchmark data to the benchmark data of another instance. + """ self.df = other.df.copy() return self def __getitem__(self, where: Tuple[str, InputID]) -> ValueType: + """ + Get the benchmark value for a stage and an input. + """ stage_id, input_id = where return self.df.at[stage_id, input_id] def __setitem__(self, where: Tuple[str, InputID], value: ValueType) -> Self: + """ + Set the benchmark `value` for a stage and an input. + """ stage_id, input_id = where self.df.at[stage_id, input_id] = value return self def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self: """ + Retain only the benchmark data for the specified `stage_ids` and `input_ids`. """ # Keep only those `stage_ids` and `input_ids` that are present in the dataframe, @@ -61,11 +97,16 @@ def retain(self, stage_ids: Iterable[str], input_ids: Iterable[InputID]) -> Self self.df = self.df[list(input_ids)].transpose()[list(stage_ids)].transpose() return self - def save(self) -> Self: + def save(self) -> None: + """ + Persist the benchmark data to :attr:`filepath`. + """ self.df.to_csv(self.filepath) - return self def __eq__(self, other: object) -> bool: + """ + Check if the benchmark data is equal to another instance. + """ return all( ( isinstance(other, Benchmark),