Quickstart Guide
Before we start
To follow this guide you will need a virtual environment with repid
installed.
We will use in memory brokers, but feel free to exchange those for any other ones -
it should just work.
Consumer
Let's start by creating a simple pseudo-async function that counts length of a string:
import asyncio
async def string_length(the_string: str) -> int:
await asyncio.sleep(1)
print(the_string)
return len(the_string)
Now, we have to create a router for repid
to know, that string_length
actor exists.
import asyncio
import repid
router = repid.Router()
@router.actor
async def string_length(the_string: str) -> int:
await asyncio.sleep(1)
print(the_string)
return len(the_string)
Aaand let's finish our application with specifying connection and creating a worker.
import asyncio
import repid
app = repid.Repid(repid.Connection(repid.InMemoryMessageBroker())) # (1)
router = repid.Router()
@router.actor
async def string_length(the_string: str) -> int:
await asyncio.sleep(1)
print(the_string)
return len(the_string)
async def main() -> None: # (2)
async with app.magic(): # (3)
worker = repid.Worker(routers=[router]) # (4)
await worker.run() # (5)
if __name__ == "__main__":
asyncio.run(main())
- Create a
repid
app with the in-memory message broker. - The main function, which will execute our async code.
- Inside of this context manager every object will be provided with the connection
that is attached to the
repid
app which we've created. - Create an instance of a worker. Don't forget to specify our router!
- Run the worker until it receives a
SIGINT
( Ctrl+C ) or aSIGTERM
.
Producer
Let's enqueue a job!
example.py
import asyncio
import repid
app = repid.Repid(repid.Connection(repid.InMemoryMessageBroker()))
router = repid.Router()
@router.actor
async def string_length(the_string: str) -> int:
await asyncio.sleep(1)
print(the_string)
return len(the_string)
async def main() -> None:
async with app.magic():
hello_job = repid.Job(
"string_length", # (1)
args=dict(the_string="Hello world!"), # (2)
)
await hello_job.queue.declare() # (3)
await hello_job.enqueue()
worker = repid.Worker(routers=[router])
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
- Name of the job will be used to route it to the similarly named actor.
- Using dictionary to map arguments' names to values.
The dictionary will be encoded with
json
module by default. - You only need to declare a queue once. Ideally you would do that on application startup.
This will enqueue a job to the default queue, which than worker will consume
& route to the string_length
function with argument the_string
set to "Hello world!"
.
After running the script, you should receive: