Efficient Messaging with FastAPI and Redis Queue in Python

Posted by


In this tutorial, we will be using FastAPI and Redis Queue to create a message queue system in Python. FastAPI is a modern, fast web framework for building APIs with Python 3.6+ based on standard Python type hints. Redis is an open-source, in-memory data structure store that can be used as a message broker. We will be using Redis Queue (RQ) which is a simple Python library for queueing jobs and processing them in the background with workers.

To get started, make sure you have Python installed on your system. You can install FastAPI and RQ using pip:

pip install fastapi
pip install rq

To install Redis, you can download and install it from the official website (https://redis.io/download) or use a Docker container:

docker run -d -p 6379:6379 --name redis redis

Now that we have all the necessary components installed, let’s create a simple FastAPI server with a Redis queue.

Create a new Python file called app.py and add the following code:

from fastapi import FastAPI
from redis import Redis
from rq import Queue
from rq.job import Job

app = FastAPI()

redis_conn = Redis(host='localhost', port=6379)
queue = Queue(connection=redis_conn)

@app.get('/')
async def home():
    return {'message': 'Welcome to FastAPI and Redis Queue'}

@app.post('/task')
async def create_task():
    job = queue.enqueue(task_function)
    return {'message': 'Task created', 'job_id': job.id}

def task_function():
    # Add your task logic here
    print('Task is running')

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='localhost', port=8000)

In this code, we have defined two routes: / and /task. The / route returns a welcome message, and the /task route enqueues a task to be processed by the Redis queue.

Now let’s start the FastAPI server by running the following command:

python app.py

You should see a message saying "Uvicorn running on http://localhost:8000". You can now open a web browser and go to http://localhost:8000 to see the welcome message.

To process the tasks in the Redis queue, we need to start a worker. Create a new Python file called worker.py and add the following code:

from redis import Redis
from rq import Worker, Queue, Connection

redis_conn = Redis(host='localhost', port=6379)
queue = Queue(connection=redis_conn)

if __name__ == '__main__':
    with Connection(redis_conn):
        worker = Worker([queue])
        worker.work()

Now run the worker by running the following command:

python worker.py

You should see a message saying "Listening on default". This means that the worker is now processing tasks from the queue.

To test the message queue system, you can send a POST request to the /task route. You can do this using tools like cURL or Postman, or you can write a simple Python script to send the request:

import requests

response = requests.post('http://localhost:8000/task')
print(response.json())

You should see a message saying "Task created" along with a job ID. You can check the logs of the worker to see the message "Task is running" indicating that the task has been processed successfully.

Congratulations! You have now successfully created a message queue system using FastAPI and Redis Queue in Python. You can extend this tutorial by adding more complex tasks and error handling logic. Happy coding!

0 0 votes
Article Rating

Leave a Reply

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x