Jobs, Actors and Workers
Now, that you now how to properly set up a connection, let's focus on creation of jobs and their processing.
Jobs
Job is a data structure, which describes what and how you want to be executed.
As you may want the execution to be periodic or to be retried in case of a failure, 1 job doesn't necessarily translate to 1 message on the broker's side.
The only required parameter of a Job is name
. It specifies what actor
has to be called on the
consumer side. Routing is done on application level.
Simple job
It's extremely easy to create a job:
import asyncio
from repid import Connection, InMemoryMessageBroker, Job, Repid
app = Repid(Connection(InMemoryMessageBroker()))
async def main() -> None:
async with app.magic():
Job(name="my_awesome_job")
if __name__ == "__main__":
asyncio.run(main())
Jobs and queues
To schedule an execution of a job, you have to call enqueue
method...
# code above is omitted
async with app.magic():
j = Job(name="my_awesome_job")
await j.enqueue() # 💥 KeyError: 'default'
# code below is omitted
...but it will fail, because we haven't initialized the default queue. Let's fix it:
# code above is omitted
async with app.magic():
j = Job(name="my_awesome_job") # (1)
await j.queue.declare()
await j.enqueue()
# ✅ Success!
# code below is omitted
- By default queue is set to
Queue(name="default")
You may also want to initialize queue(-s) preemptively, in, e.g., setup
function.
from repid import Queue
# code above is omitted
async def setup() -> None:
async with app.magic():
await Queue().declare() # (1)
await Queue(name="my_awesome_queue").declare()
# code below is omitted
- Will declare a queue with name set to
default
Tip
Usually brokers support multiple queue declarations (== re-declaring already existing queue will have no effect), meaning that you can declare a queue on every app startup to ensure correct workflow of your application.
You can specify queue
argument either as a string or as a Queue
object. In case of a string it
will be automatically converted to a Queue
object during Job
object initialization.
my_awesome_job = Job(name="my_awesome_job", queue="non-default-queue")
another_job = Job(name="another_job", queue=Queue("another-queue"))
print(type(my_awesome_job.queue)) # repid.Queue
print(type(another_job.queue)) # repid.Queue
Job priority
You can specify priority of the job using PrioritiesT
enum.
Default is set to PrioritiesT.MEDIUM
. HIGH
, MEDIUM
& LOW
levels are officially supported.
from repid import Job, PrioritiesT
# code above is omitted
Job("awesome_job_with_priority", priority=PrioritiesT.HIGH)
# code below is omitted
Job id and uniqueness
By default Job doesn't have any id and on every enqueue
call a message will be generated
a new id using uuid.uuid4().hex
. Therefore, by default, a Job isn't considered unique.
You can check it with is_unique
flag.
If you specify id_
argument - it will not be regenerated on every call of enqueue
and therefore
the Job is considered unique.
Warning
Some, although not all, brokers are ensuring that there are no messages with the same id in the queue.
Retries
If you want a Job to be retried in case of a failure during the execution -
specify retries
argument. Default is set to 0
, which means that the Job will only be executed
once and put in a dead-letter queue in case of a failure.
Timeout
You can also specify maximum allowed execution time for a Job using timeout
argument.
Default is set to 10 minutes
.
from datetime import timedelta
# code above is omitted
Job("very_fast_job", timeout=timedelta(seconds=5))
Job("very_slow_job", timeout=timedelta(hours=2))
# code below is omitted
Execution timeout is also a valid reason for a retry.
Tip
You should sensibly limit Job's timeout, as it can be a deciding factor for rescheduling in case of a worker disconnection.
Actors
Actors are functions, which are meant to be executed when a worker (== consumer) receives a message.
To create an actor you first have to create a Router
and then use Router.actor
as a decorator
for your function.
from repid import Router
my_router = Router()
@my_router.actor
async def my_awesome_function() -> None:
print("Hello Repid!")
Note
Actor decorator does not anyhow change the decorated function. The initial function remains callable as it was before.
Actor name
By default, name of an actor is the same as the name of the function.
You can override it with name
argument.
from repid import Router
my_router = Router()
@my_router.actor(name="another_name")
async def my_awesome_function() -> None:
print("Hello Repid!")
Actor queue
You can also specify what queue should actor listen on using queue
argument.
Thread vs Process
If your function is synchronous, you can specify if you want it to be run in a separate process
instead of a thread by setting run_in_process
to True.
Warning
If you are running on the emscripten
platform (e.g. PyScript) run_in_process
setting
doesn't take any effect.
Retry Policy
In case of a retry, application will reschedule the message with a delay. The delay is calculated
using a retry policy, which defaults to exponential. You can override the behavior
with retry_policy
argument.
from repid import Router, default_retry_policy_factory
my_router = Router()
@my_router.actor(
retry_policy=default_retry_policy_factory(
min_backoff=60,
max_backoff=86400,
multiplier=5,
max_exponent=15,
)
)
async def my_exceptional_function() -> None:
raise Exception("Some random exception.")
...or you can use practically any function, which confirms to RetryPolicyT
.
from repid import Router
my_router = Router()
@my_router.actor(retry_policy=lambda retry_number=1: retry_number * 100)
async def my_exceptional_function() -> None:
raise Exception("Some random exception.")
Workers
Workers are controllers, which are responsible for receiving messages and executing related actors.
Workers and Routers
Workers are also routers. You can assign actors directly to them.
import asyncio
from repid import Connection, InMemoryMessageBroker, Job, Repid, Worker
app = Repid(Connection(InMemoryMessageBroker()))
async def main() -> None:
async with app.magic():
my_worker = Worker()
@my_worker.actor
async def my_func() -> None:
print("Hello!")
if __name__ == "__main__":
asyncio.run(main())
You can include actors from other routers into a worker.
Warning
Adding actor with already existing name will override previously stored actor.
router = Router()
other_router = Router()
# code above is omitted
Worker(routers=[router, other_router])
# or
my_worker = Worker()
my_worker.include_router(router)
my_worker.include_router(other_router)
# code below is omitted
Running a Worker
To run a worker simply call run
method.
The worker will be running until it receives a signal (SIGINT
or SIGTERM
, by default)
or until it reaches messages_limit
, which by default is set to infinity.
Tip
Worker will declare all queues that it's aware about (via assigned actors) on the execution of
run
method. To override this behavior set auto_declare
flag to False.
import asyncio
from repid import Connection, InMemoryMessageBroker, Repid, Worker
app = Repid(Connection(InMemoryMessageBroker()))
async def main() -> None:
async with app.magic(auto_disconnect=True):
my_worker = Worker()
await my_worker.run()
if __name__ == "__main__":
asyncio.run(main())
After worker receives a signal or reaches messages_limit
it will attempt a graceful shutdown.
It means that it will stop receiving new messages and wait for graceful_shutdown_time
(default: 25 seconds) to let already running actors complete.
Note
Default graceful_shutdown_time
is set to 25 seconds because default Kubernetes timeout
after sending SIGTERM
and before sending SIGKILL
is set to 30 seconds. 5 second buffer is
provided to ensure that every uncompleted task will be rejected before ungraceful termination.
Any actor which exceeds that period will be forced to stop and corresponding message will be rejected, meaning that the message broker will be responsible for requeueing the message and retry counter will not be increased.
from signal import SIGQUIT
# code above is omitted
Worker(
graceful_shutdown_time=100.0, # seconds
messages_limit=10_000,
handle_signals=[SIGQUIT],
)
# code below is omitted
Warning
If you are running on the emscripten
platform (e.g. PyScript) signal handling is disabled
by default, as it isn't supported by the platform.
After Worker is done running, it will return an internal _Runner
object, which you can use
to retrieve information about amount of actor runs.
import asyncio
from repid import Connection, InMemoryMessageBroker, Repid, Worker
app = Repid(Connection(InMemoryMessageBroker()))
async def main() -> None:
async with app.magic(auto_disconnect=True):
my_worker = Worker()
runner = await my_worker.run()
print(f"Total actor runs: {runner.processed}")
if __name__ == "__main__":
asyncio.run(main())
Recap
- Declare a
Queue
and enqueue aJob
- Create an
Actor
- Assign it to a
Worker
directly or via aRouter
- Run the
Worker
import asyncio
import repid
app = repid.Repid(repid.Connection(repid.InMemoryMessageBroker()))
router = repid.Router()
@router.actor
async def awesome_hello() -> None:
print("Hello from an actor!")
async def producer_side() -> None:
async with app.magic():
await repid.Queue().declare()
await repid.Job("awesome_hello").enqueue()
async def consumer_side() -> None:
async with app.magic(auto_disconnect=True):
await repid.Worker(routers=[router], messages_limit=1).run()
async def main() -> None:
await producer_side()
await consumer_side()
if __name__ == "__main__":
asyncio.run(main())