Skip to content

Commit

Permalink
Merge #127
Browse files Browse the repository at this point in the history
127: fix: fixed for the issue (#123) r=myConsciousness a=myConsciousness

# 1. Description

<!-- Provide a description of what this PR is doing.
If you're modifying existing behavior, describe the existing behavior, how this PR is changing it,
and what motivated the change. If this is a breaking change, specify explicitly which APIs have been
changed. -->

## 1.1. Checklist

<!-- Before you create this PR confirm that it meets all requirements listed below by checking the
relevant checkboxes (`[x]`). This will ensure a smooth and quick review process. -->

- [x] The title of my PR starts with a [Conventional Commit] prefix (`fix:`, `feat:`, `docs:` etc).
- [x] I have read the [Contributor Guide] and followed the process outlined for submitting PRs.
- [x] I have updated/added tests for ALL new/updated/fixed functionality.
- [x] I have updated/added relevant documentation in `docs` and added dartdoc comments with `///`.
- [x] I have updated/added relevant examples in `examples`.

## 1.2. Breaking Change

<!-- Does your PR require batch.dart users to manually update their apps to accommodate your change?

If the PR is a breaking change this should be indicated with suffix "!"  (for example, `feat!:`, `fix!:`). See [Conventional Commit] for details.
-->

- [ ] Yes, this is a breaking change.
- [x] No, this is _not_ a breaking change.

## 1.3. Related Issues

<!-- Provide a list of issues related to this PR from the [issue database].
Indicate which of these issues are resolved or fixed by this PR, i.e. Fixes #xxxx* !-->

<!-- Links -->

[issue database]: https://github.com/batch-dart/batch.dart/issues
[contributor guide]: https://github.com/batch-dart/batch.dart/blob/main/CONTRIBUTING.md
[batch.dart style guide]: https://github.com/batch-dart/batch.dart/blob/main/STYLEGUIDE.md
[conventional commit]: https://conventionalcommits.org


Co-authored-by: myConsciousness <kato.shinya.dev@gmail.com>
  • Loading branch information
bors[bot] and myConsciousness committed Apr 11, 2022
2 parents c7513a9 + 81844af commit eab3014
Show file tree
Hide file tree
Showing 20 changed files with 148 additions and 243 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Allows parallel processing to use the main thread `ExecutionContext`. ([#121](https://github.com/batch-dart/batch.dart/issues/121))
- Removed `trace`, `debug`, `info`, `warn`, `error`, and `fatal` from convenient methods of logging feature. Make sure to access the logger from `log`. ([#125](https://github.com/batch-dart/batch.dart/issues/125)
- Changed specification regarding tasks to be set in `Step`. Under the new specification, there will always be only one task that can be set in a single step. ([#123](https://github.com/batch-dart/batch.dart/issues/123))

## v0.9.0

Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void main() => BatchApplication()
// Step phase
..nextStep(Step(name: 'Step')
// Task phase
..nextTask(DoSomethingTask()
..registerTask(DoSomethingTask()
),
),
)
Expand Down Expand Up @@ -162,7 +162,7 @@ void main() => BatchApplication()
// Step phase
..nextStep(Step(name: 'Step')
// Parallel task phase
..nextParallel(
..registerParallel(
Parallel(
name: 'Parallel Tasks',
tasks: [
Expand Down Expand Up @@ -308,14 +308,14 @@ Creating a branch for each event is very easy.
```dart
Step(name: 'Step')
// Assume that this task will change the branch status.
..nextTask(ChangeBranchStatusTask())
..registerTask(ChangeBranchStatusTask())
// Pass an event object to "to" argument that you want to execute when you enter this branch.
..branchOnSucceeded(to: Step(name: 'Step on succeeded')..nextTask(somethingTask))
..branchOnFailed(to: Step(name: 'Step on failed')..nextTask(somethingTask))
..branchOnSucceeded(to: Step(name: 'Step on succeeded')..registerTask(somethingTask))
..branchOnFailed(to: Step(name: 'Step on failed')..registerTask(somethingTask))
// Branches that are "branchOnCompleted" are always executed regardless of branch status.
..branchOnCompleted(to: Step(name: 'Step on completed'))..nextTask(somethingTask);
..branchOnCompleted(to: Step(name: 'Step on completed'))..registerTask(somethingTask);
```

And the conditional branching of `Batch.dart` is controlled by changing the `BranchStatus` of each `Execution`s that can be referenced from the `ExecutionContext`.
Expand All @@ -330,7 +330,6 @@ class ChangeBranchStatusTask extends Task<ChangeBranchStatusTask> {
// You can easily manage branch status through methods as below.
context.jobExecution!.branchToSucceeded();
context.stepExecution!.branchToFailed();
context.taskExecution!.branchToSucceeded();
}
}
```
Expand Down
50 changes: 11 additions & 39 deletions example/example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ Job get _testJob1 => Job(
skipConfig: SkipConfiguration(
skippableExceptions: [Exception()],
),
)
..nextTask(
)..registerTask(
RetryTask(
// You can define callbacks for each processing phase.
onStarted: (context) => log.info(
Expand All @@ -87,34 +86,23 @@ Job get _testJob1 => Job(
},
),
),
)
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
),
)
..nextStep(
Step(name: 'Step2')
..nextTask(TestTask())
..nextTask(SayHelloTask())
..nextTask(SayWorldTask())
..registerTask(TestTask())
..branchOnSucceeded(
to: Step(name: 'Step3')
..nextTask(TestTask())
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
to: Step(name: 'Step3')..registerTask(SayHelloTask()),
)
..branchOnFailed(
to: Step(name: 'Step4')
..nextTask(TestTask())
..nextTask(SayHelloTask())
..registerTask(SayHelloTask())
..branchOnCompleted(
to: Step(
name: 'Step6',
// You can set any preconditions to run Step.
precondition: () => false,
)
..nextTask(TestTask())
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
)..registerTask(SayWorldTask()),
),
)
..branchOnCompleted(
Expand All @@ -125,10 +113,7 @@ Job get _testJob1 => Job(
.info('\n--------------- Step5 has started! ---------------'),
onCompleted: (context) => log.info(
'\n--------------- Step5 has completed! ---------------'),
)
..nextTask(TestTask())
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
)..registerTask(SayHelloTask()),
),
);

Expand All @@ -142,18 +127,10 @@ Job get _testJob2 => Job(
Step(
name: 'Step1',
precondition: () => true,
)
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
)..registerTask(SayWorldTask()),
)
..branchOnSucceeded(
to: Job(name: 'Job3')
..nextStep(
Step(name: 'Step1')
..nextTask(SayHelloTask())
..nextTask(SayWorldTask())
..shutdown(),
),
to: Job(name: 'Job3')..nextStep(Step(name: 'Step1')..shutdown()),
);

Job get _testJob4 => Job(
Expand All @@ -165,8 +142,7 @@ Job get _testJob4 => Job(
Step(
name: 'Parallel Step',
precondition: () => true,
)
..nextParallel(
)..registerParallel(
Parallel(
name: 'Parallel Tasks',
tasks: [
Expand All @@ -176,18 +152,14 @@ Job get _testJob4 => Job(
TestParallelTask(),
],
),
)
..nextTask(SayHelloTask())
..nextTask(SayWorldTask()),
),
);

class TestTask extends Task<TestTask> {
@override
void execute(ExecutionContext context) {
// This parameter is shared just in this job.
context.jobParameters['key'] = 'job_parameter';
// This parameter is shared just in this step.
context.stepParameters['key'] = 'step_parameter';

// You can use shared parameters in any places.
log.info(context.sharedParameters['key1']);
Expand Down
1 change: 1 addition & 0 deletions lib/batch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export 'package:batch/src/job/event/parallel.dart';
export 'package:batch/src/job/task/parallel_task.dart';
export 'package:batch/src/job/context/execution_context.dart';
export 'package:batch/src/job/execution.dart';
export 'package:batch/src/job/execution_type.dart';
export 'package:batch/src/job/schedule/parser/cron_parser.dart';
export 'package:batch/src/job/config/skip_configuration.dart';
export 'package:batch/src/job/config/retry_configuration.dart';
Expand Down
16 changes: 2 additions & 14 deletions lib/src/diagnostics/boot_diagnostics.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class _BootDiagnostics implements BootDiagnostics {
}

void _checkStepRecursively({required Job job, required Step step}) {
if (step.tasks.isEmpty) {
if (!step.hasTask) {
throw ArgumentError(
'The task or parallel to be launched is required.',
);
Expand All @@ -75,19 +75,7 @@ class _BootDiagnostics implements BootDiagnostics {
'You cannot set Skip and Retry at the same time in Step [name=${step.name}].');
}

if (step.tasks.isNotEmpty) {
for (final task in step.tasks) {
if (task.hasSkipPolicy && task.hasRetryPolicy) {
throw ArgumentError(
'You cannot set Skip and Retry at the same time in Task [name=${task.name}].');
}
}
}

final relation = NameRelation(
job: job.name,
step: step.name,
);
final relation = NameRelation(job: job.name, step: step.name);

if (_nameRelations.has(relation)) {
throw UniqueConstraintError(
Expand Down
24 changes: 21 additions & 3 deletions lib/src/job/context/context_support.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'package:batch/src/job/event/event.dart';
import 'package:batch/src/job/event/job.dart';
import 'package:batch/src/job/event/step.dart';
import 'package:batch/src/job/execution.dart';
import 'package:batch/src/job/execution_type.dart';
import 'package:batch/src/job/parameter/shared_parameters.dart';
import 'package:batch/src/job/process_status.dart';
import 'package:batch/src/log/logger_provider.dart';
Expand Down Expand Up @@ -42,7 +43,7 @@ abstract class ContextSupport<T extends Event<T>> {
} else {
context.taskExecution = _newExecution(name);
log.info(
'Task: [name=$name] launched with the following step parameters: ${context.stepParameters}');
'Task: [name=$name] launched with the following job parameters: ${context.jobParameters}');
}
}

Expand Down Expand Up @@ -75,20 +76,27 @@ abstract class ContextSupport<T extends Event<T>> {
} else {
context.taskExecution = _finishedExecution(status: status);
log.info(
'Task: [name=${context.taskExecution!.name}] finished with the following step parameters: ${context.stepParameters} and the status: [${status.name}]');
'Task: [name=${context.taskExecution!.name}] finished with the following job parameters: ${context.jobParameters} and the status: [${status.name}]');
}
}

dynamic _newExecution(final String name) {
final execution = Execution(name: name, startedAt: DateTime.now());
final execution = Execution(
type: _executionType,
name: name,
startedAt: DateTime.now(),
);

_executionStack.push(execution);

return execution;
}

dynamic _finishedExecution({required ProcessStatus? status}) {
final execution = _executionStack.pop();

return Execution(
type: _executionType,
name: execution.name,
status: status ?? ProcessStatus.completed,
startedAt: execution.startedAt,
Expand All @@ -97,6 +105,16 @@ abstract class ContextSupport<T extends Event<T>> {
);
}

ExecutionType get _executionType {
if (T == Job) {
return ExecutionType.job;
} else if (T == Step) {
return ExecutionType.step;
}

return ExecutionType.task;
}

/// Returns the branch status.
BranchStatus get branchStatus {
final execution = _executionStack.pop();
Expand Down
3 changes: 0 additions & 3 deletions lib/src/job/context/execution_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,4 @@ class ExecutionContext {

/// The job parameters
final Parameters jobParameters = Parameters();

/// The step parameters
final Parameters stepParameters = Parameters();
}
23 changes: 13 additions & 10 deletions lib/src/job/event/step.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ class Step extends Event<Step> {
retryConfig: retryConfig,
);

/// The tasks include parallels
final List<dynamic> _tasks = [];
/// The task include parallel
dynamic _task;

/// Returns the copied tasks.
List<dynamic> get tasks => List.from(_tasks);
/// Returns the copied task.
dynamic get task => _task;

/// Adds next [Task].
void nextTask(final Task task) => _tasks.add(task);
/// Registers [Task].
void registerTask(final Task task) => _task = task;

/// Stores next [Parallel].
void nextParallel(final Parallel parallel) => _tasks.add(parallel);
/// Registers [Parallel].
void registerParallel(final Parallel parallel) => _task = parallel;

/// Add a task to shutdown this application.
void shutdown() => _tasks.add(ShutdownTask());
/// Registers a task to shutdown this application.
void shutdown() => _task = ShutdownTask();

/// Returns true if this step has a task, otherwise false.
bool get hasTask => _task != null;
}
15 changes: 15 additions & 0 deletions lib/src/job/event/task.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,19 @@ abstract class Task<T extends Task<T>> extends Event<Task> {
BatchInstance.updateStatus(BatchStatus.shuttingDown);
log.warn('The shutdown command was notified by Task: [name=$name]');
}

@override
@Deprecated('not supported operation and always UnsupportedError throws')
void branchOnSucceeded({required Task to}) =>
throw UnsupportedError('Branch feature is not supported for task.');

@override
@Deprecated('not supported and always UnsupportedError throws')
void branchOnFailed({required Task to}) =>
throw UnsupportedError('Branch feature is not supported for task.');

@override
@Deprecated('not supported and always UnsupportedError throws')
void branchOnCompleted({required Task to}) =>
throw UnsupportedError('Branch feature is not supported for task.');
}
9 changes: 9 additions & 0 deletions lib/src/job/execution.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

// Project imports:
import 'package:batch/src/job/branch/branch_status.dart';
import 'package:batch/src/job/execution_type.dart';
import 'package:batch/src/job/process_status.dart';

class Execution {
/// Returns the new instance of [Execution].
Execution({
required this.type,
required this.name,
ProcessStatus status = ProcessStatus.running,
required this.startedAt,
Expand All @@ -18,6 +20,9 @@ class Execution {
_status = status,
_updatedAt = updatedAt;

/// The execution type
final ExecutionType type;

/// The name
final String name;

Expand Down Expand Up @@ -61,6 +66,10 @@ class Execution {
bool get isCompleted => _status == ProcessStatus.completed;

void _updateBranchStatus(final BranchStatus status) {
if (type == ExecutionType.task) {
throw UnsupportedError('Branches in Task execution are not supported.');
}

_branchStatus = status;
_updatedAt = DateTime.now();
}
Expand Down
9 changes: 9 additions & 0 deletions lib/src/job/execution_type.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright 2022 Kato Shinya. All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided the conditions.

enum ExecutionType {
job,
step,
task,
}
Loading

0 comments on commit eab3014

Please sign in to comment.