Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make job safe, add JobController #453

Merged
merged 8 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ The `ZeebeWorker` class gets jobs from the gateway and runs them.
```python
import asyncio

from pyzeebe import ZeebeWorker, Job, create_insecure_channel
from pyzeebe import ZeebeWorker, Job, JobController, create_insecure_channel


channel = create_insecure_channel(hostname="localhost", port=26500) # Create grpc channel
worker = ZeebeWorker(channel) # Create a zeebe worker


async def on_error(exception: Exception, job: Job):
async def on_error(exception: Exception, job: Job, job_controller: JobController):
"""
on_error will be called when the task fails
"""
print(exception)
await job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")
await job_controller.set_error_status(job, f"Failed to handle job {job}. Error: {str(exception)}")


@worker.task(task_type="example", exception_handler=on_error)
Expand Down
2 changes: 0 additions & 2 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.NoVariableNameGivenError

.. autoexception:: pyzeebe.errors.NoZeebeAdapterError

.. autoexception:: pyzeebe.errors.DuplicateTaskTypeError

.. autoexception:: pyzeebe.errors.ActivateJobsRequestInvalidError
Expand Down
6 changes: 6 additions & 0 deletions docs/worker_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ This means that all methods that :py:class:`ZeebeTaskRouter` has will also appea
.. autoclass:: pyzeebe.Job
:members:
:undoc-members:
:member-order: bysource


.. autoclass:: pyzeebe.JobController
:members:
:undoc-members:


.. autoclass:: pyzeebe.JobStatus
Expand Down
14 changes: 4 additions & 10 deletions docs/worker_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ To add an exception handler to a task:

.. code-block:: python

from pyzeebe import Job
from pyzeebe import Job, JobController


async def my_exception_handler(exception: Exception, job: Job) -> None:
async def my_exception_handler(exception: Exception, job: Job, job_controller: JobController) -> None:
print(exception)
await job.set_failure_status(message=str(exception))
await job_controller.set_failure_status(job, message=str(exception))


@worker.task(task_type="my_task", exception_handler=my_exception_handler)
Expand All @@ -81,7 +81,7 @@ To add an exception handler to a task:

Now every time ``my_task`` is called (and then fails), ``my_exception_handler`` is called.

*What does job.set_failure_status do?*
*What does job_controller.set_failure_status do?*

This tells Zeebe that the job failed. The job will then be retried (if configured in process definition).

Expand Down Expand Up @@ -153,9 +153,3 @@ Example:
async def my_task(job: Job):
print(job.process_instance_key)
return {**job.custom_headers}

.. note::

Do not set the status for the job (set_success_status, set_failure_status or set_error_status) inside the task.
This will cause pyzeebe to raise an :py:class:`pyzeebe.errors.ActivateJobsRequestInvalidError`.

5 changes: 0 additions & 5 deletions docs/zeebe_adapter_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ Zeebe GRPC Responses
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployProcessResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployResourceResponse
:members:
:undoc-members:
Expand Down
7 changes: 4 additions & 3 deletions examples/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
create_secure_channel,
)
from pyzeebe.errors import BusinessError
from pyzeebe.job.job import JobController


# Use decorators to add functionality before and after tasks. These will not fail the task
Expand Down Expand Up @@ -68,7 +69,7 @@ async def add_one(x: int) -> int:
return x + 1


# The default exception handler will call job.set_error_status
# The default exception handler will call job_controller.set_error_status
# on raised BusinessError, and propagate its error_code
# so the specific business error can be caught in the Zeebe process
@worker.task(task_type="business_exception_task")
Expand All @@ -77,10 +78,10 @@ def exception_task():


# Define a custom exception_handler for a task like so:
async def example_exception_handler(exception: Exception, job: Job) -> None:
async def example_exception_handler(exception: Exception, job: Job, job_controller: JobController) -> None:
print(exception)
print(job)
await job.set_failure_status(f"Failed to run task {job.type}. Reason: {exception}")
await job_controller.set_failure_status(job, f"Failed to run task {job.type}. Reason: {exception}")


@worker.task(task_type="exception_task", exception_handler=example_exception_handler)
Expand Down
3 changes: 2 additions & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyzeebe.credentials.base import CredentialsABC
from pyzeebe.credentials.camunda_identity import CamundaIdentityCredentials
from pyzeebe.credentials.plugins import AuthMetadataPlugin
from pyzeebe.job.job import Job
from pyzeebe.job.job import Job, JobController
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler
from pyzeebe.task.task_config import TaskConfig
Expand All @@ -23,6 +23,7 @@
"ZeebeClient",
"SyncZeebeClient",
"Job",
"JobController",
"JobStatus",
"ExceptionHandler",
"TaskConfig",
Expand Down
4 changes: 0 additions & 4 deletions pyzeebe/errors/pyzeebe_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def __init__(self, task_type: str):
self.task_type = task_type


class NoZeebeAdapterError(PyZeebeError):
pass


class DuplicateTaskTypeError(PyZeebeError):
def __init__(self, task_type: str):
super().__init__(f"Task with type {task_type} already exists")
Expand Down
6 changes: 3 additions & 3 deletions pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import types
from typing import AsyncGenerator, Iterable, Optional

import grpc
Expand Down Expand Up @@ -68,13 +69,12 @@ def _create_job_from_raw_job(self, response: ActivatedJob) -> Job:
process_definition_key=response.processDefinitionKey,
element_id=response.elementId,
element_instance_key=response.elementInstanceKey,
custom_headers=json.loads(response.customHeaders),
custom_headers=types.MappingProxyType(json.loads(response.customHeaders)),
worker=response.worker,
retries=response.retries,
deadline=response.deadline,
variables=json.loads(response.variables),
variables=types.MappingProxyType(json.loads(response.variables)),
tenant_id=response.tenantId,
zeebe_adapter=self, # type: ignore[arg-type]
)

async def complete_job(self, job_key: int, variables: Variables) -> CompleteJobResponse:
Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/grpc_internals/zeebe_process_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import os
from typing import Any, Callable, Dict, Iterable, List, NoReturn, Optional, Union
from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Union

import aiofiles
import grpc
Expand Down Expand Up @@ -99,7 +99,7 @@ async def create_process_instance_with_result(
)

async def _create_process_errors(
self, grpc_error: grpc.aio.AioRpcError, bpmn_process_id: str, version: int, variables: Dict[str, Any]
self, grpc_error: grpc.aio.AioRpcError, bpmn_process_id: str, version: int, variables: Variables
) -> NoReturn:
if is_error_status(grpc_error, grpc.StatusCode.NOT_FOUND):
raise ProcessDefinitionNotFoundError(bpmn_process_id=bpmn_process_id, version=version) from grpc_error
Expand Down
105 changes: 34 additions & 71 deletions pyzeebe/job/job.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Optional

from pyzeebe.errors import NoZeebeAdapterError
from pyzeebe.job.job_status import JobStatus
from pyzeebe.types import Variables
from pyzeebe.types import Headers, Variables

if TYPE_CHECKING:
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter


@dataclass
@dataclass(frozen=True)
class Job:
key: int
type: str
Expand All @@ -20,47 +18,46 @@ class Job:
process_definition_key: int
element_id: str
element_instance_key: int
custom_headers: Dict[str, Any]
custom_headers: Headers
worker: str
retries: int
deadline: int
variables: Variables
tenant_id: Optional[str] = None
status: JobStatus = JobStatus.Running
zeebe_adapter: Optional["ZeebeAdapter"] = None

def _set_status(self, value: JobStatus) -> None:
object.__setattr__(self, "status", value)

def __eq__(self, other: object) -> bool:
if not isinstance(other, Job):
return NotImplemented
return self.key == other.key


class JobController:
def __init__(self, job: Job, zeebe_adapter: "ZeebeAdapter") -> None:
self._job = job
self._zeebe_adapter = zeebe_adapter

async def set_running_after_decorators_status(self) -> None:
"""
RunningAfterDecorators status means that the task has been completed as intended and the after decorators will now run.

Raises:
NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error

"""
if self.zeebe_adapter:
self.status = JobStatus.RunningAfterDecorators
else:
raise NoZeebeAdapterError()
self._job._set_status(JobStatus.RunningAfterDecorators)

async def set_success_status(self) -> None:
async def set_success_status(self, variables: Optional[Variables] = None) -> None:
"""
Success status means that the job has been completed as intended.

Raises:
NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error

"""
if self.zeebe_adapter:
self.status = JobStatus.Completed
await self.zeebe_adapter.complete_job(job_key=self.key, variables=self.variables)
else:
raise NoZeebeAdapterError()
self._job._set_status(JobStatus.Completed)
await self._zeebe_adapter.complete_job(job_key=self._job.key, variables=variables or {})

async def set_failure_status(
self,
Expand All @@ -79,23 +76,19 @@ async def set_failure_status(
the local scope of the job's associated task. Must be JSONable. New in Zeebe 8.2.

Raises:
NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error

"""
if self.zeebe_adapter:
self.status = JobStatus.Failed
await self.zeebe_adapter.fail_job(
job_key=self.key,
retries=self.retries - 1,
message=message,
retry_back_off_ms=retry_back_off_ms,
variables=variables or {},
)
else:
raise NoZeebeAdapterError()
self._job._set_status(JobStatus.Failed)
await self._zeebe_adapter.fail_job(
job_key=self._job.key,
retries=self._job.retries - 1,
message=message,
retry_back_off_ms=retry_back_off_ms,
variables=variables or {},
)

async def set_error_status(
self,
Expand All @@ -115,42 +108,12 @@ async def set_error_status(
the local scope of the job's associated task. Must be JSONable. New in Zeebe 8.2.

Raises:
NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error

"""
if self.zeebe_adapter:
self.status = JobStatus.ErrorThrown
await self.zeebe_adapter.throw_error(
job_key=self.key, message=message, error_code=error_code, variables=variables or {}
)
else:
raise NoZeebeAdapterError()

def __eq__(self, other: object) -> bool:
if not isinstance(other, Job):
raise NotImplementedError()
return self.key == other.key


def create_copy(job: Job) -> Job:
return Job(
job.key,
job.type,
job.process_instance_key,
job.bpmn_process_id,
job.process_definition_version,
job.process_definition_key,
job.element_id,
job.element_instance_key,
copy.deepcopy(job.custom_headers),
job.worker,
job.retries,
job.deadline,
copy.deepcopy(job.variables),
job.tenant_id,
job.status,
job.zeebe_adapter,
)
self._job._set_status(JobStatus.ErrorThrown)
await self._zeebe_adapter.throw_error(
job_key=self._job.key, message=message, error_code=error_code, variables=variables or {}
)
10 changes: 5 additions & 5 deletions pyzeebe/task/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from typing import Awaitable, Callable

from pyzeebe.errors.pyzeebe_errors import BusinessError
from pyzeebe.job.job import Job
from pyzeebe.job.job import Job, JobController

logger = logging.getLogger(__name__)

ExceptionHandler = Callable[[Exception, Job], Awaitable[None]]
ExceptionHandler = Callable[[Exception, Job, JobController], Awaitable[None]]


async def default_exception_handler(e: Exception, job: Job) -> None:
async def default_exception_handler(e: Exception, job: Job, job_controller: JobController) -> None:
logger.warning("Task type: %s - failed job %s. Error: %s.", job.type, job, e)
if isinstance(e, BusinessError):
await job.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code)
await job_controller.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code)
else:
await job.set_failure_status(f"Failed job. Error: {e}")
await job_controller.set_failure_status(f"Failed job. Error: {e}")
Loading