Utilizing Python, FastAPI, and Redis Queue for Background Job Processing | Python | Message Queue

Posted by


Background job processing is a common requirement in many web applications. It allows you to execute long-running tasks in the background without blocking the main thread or the user’s request. In this tutorial, we will explore how to implement background job processing using Python, FastAPI, and Redis Queue (RQ) as the message queue.

What is Redis Queue (RQ)?
Redis Queue (RQ) is a simple Python library for queueing jobs and processing them in the background with Redis as the message broker. It is built on top of Redis and provides a simple API for creating, enqueuing, and processing jobs.

Prerequisites
Before getting started, make sure you have the following installed on your machine:

  1. Python 3.x
  2. FastAPI
  3. Redis server
  4. Redis Queue (RQ) library

Setting up Redis
First, you need to have Redis installed and running on your machine. You can download Redis from https://redis.io/download and follow the installation instructions for your operating system.

Once Redis is installed, start the Redis server by running the following command in your terminal:

redis-server

Setting up FastAPI
FastAPI is a modern web framework for building APIs with Python. You can install FastAPI using pip:

pip install fastapi

Creating a FastAPI application
Now, let’s create a simple FastAPI application that will enqueue and process background jobs using Redis Queue.

Create a new Python file, app.py, and add the following code:

from fastapi import FastAPI
import rq
from rq import Queue
from redis import Redis

app = FastAPI()

# Connect to Redis
redis_conn = Redis()
queue = Queue(connection=redis_conn)

# Background job function
def process_job(job_id):
    print(f"Processing job {job_id}")

# Route to enqueue a job
@app.post("/enqueue-job/{job_id}")
async def enqueue_job(job_id: int):
    job = queue.enqueue(process_job, job_id)
    return {"message": f"Job enqueued with id {job.id}"}

In this code snippet, we have created a FastAPI application with a route /enqueue-job/{job_id} that accepts a job ID as a parameter and enqueues a background job using Redis Queue.

Processing background jobs
To process the enqueued jobs, we need to create a separate worker process that listens for and processes the jobs in the queue.

Create a new Python file, worker.py, and add the following code:

import os
import rq
from redis import Redis

# Connect to Redis
redis_conn = Redis()
queue = rq.Queue(connection=redis_conn)

# Worker process
def worker():
    with rq.Connection(redis_conn):
        worker = rq.Worker([queue])
        worker.work()

if __name__ == "__main__":
    worker()

This code snippet creates a worker process that listens for jobs in the queue and processes them using the function process_job defined in app.py.

Running the application
To run the FastAPI application and start enqueueing jobs, run the following command in your terminal:

uvicorn app:app --reload

Open a new terminal window and start the worker process by running the following command:

python worker.py

Now, you can enqueue jobs by sending a POST request to http://localhost:8000/enqueue-job/{job_id}. The worker process will process the enqueued jobs in the background.

Conclusion
In this tutorial, we learned how to implement background job processing using Python, FastAPI, and Redis Queue. By leveraging message queues like Redis Queue, you can efficiently handle long-running tasks in your web applications without affecting performance or user experience. Experiment with different types of jobs and explore more advanced features of Redis Queue to build robust and scalable applications.

0 0 votes
Article Rating
20 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
@manjit1305
1 month ago

i am using windows10 os. When i run the "rq worker" i get this error:

child_pid = os.fork()

^^^^^^^

AttributeError: module 'os' has no attribute 'fork'

@marcgentner1322
1 month ago

Do i setup the que on antoher server than the producer? Qnd what are the consumers? A application on a server or ? How do you host this fulltime?

@abhishekbhosale3628
1 month ago

How do we start workers on different machines rather than same machine as the server ?

@stestingtesting
1 month ago

i am getting Waiting for background tasks to complete. (CTRL+C to force quit) while sending bulk emails did you know the solution of this problem

@maheshmmmec
1 month ago

Where would this function be executed, on the same compute where fast api is running?

@nisharohilla5918
1 month ago

Thank you for the video Sir, Is there a way to count the pending queues in redis?

@MohammadDLitoo
1 month ago

awesome tut… is it possible for you to show us celery rabbit mq grpc in python in future ?

@betterhumans1752
1 month ago

How can I have a background process that uses a UI so the user can alter settings?

@jimmyauris8412
1 month ago

There is a guide to make this in production? Thanks 🎉

@contactkashif
1 month ago

Nice tutorial. I don't get which part was the consumer?

@stepkurniawan
1 month ago

We can clearly see the high quality video you put into. thanks!

@ammadkhan4687
1 month ago

Very beautifully explained. I am planning to implement this scenario to replace azure queue. As that is costing us more for volume of Queues deployed there. Any similar idea or video will be very much appreciated. Thank you for your effort.

@khanshehryar2258
1 month ago

Just looking like a WOW. I was just sacred of this topic. You made it very easy.

@billluo9568
1 month ago

Easy to understand!

@sany2k8
1 month ago

Do you have the next part of it? Reading data from redis queue and process.

@murtazasingapurwala8044
1 month ago

Hey, actually i am receving an error …
ValueError: time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'
I am using windows subsystem for linux and have installed "python3-rq" library to run command "rq worker <queue_name>"

@chikosan99
1 month ago

Very nice (:
So the job process happing in the redis server (worker) and not in the fastapi server ?!
if my job have module depends ?
Thanks

@Megalon11235
1 month ago

Put forward extremely simply.
Straigh-forward, to-the-point and no fluff!

11/10 would definitely recommend to everyone.

@KivySchool
1 month ago

Thanks for your high quality tutorial.

I am struggling to understand how to use `redis` to cache the response from some routes on FastAPI.
For example, I have a route GET /posts. This route takes 50~70ms to process and return a response.
How can I cache this response with redis, so that the next time someone query GET /posts in the next 30 seconds, it returns a faster response?
And suppose someone makes a new post, how can I force the `cache` to refresh?

I tried to use this library "pip install fastapi-cache2", but haven't had success using redis with it. I would appreciate a tutorial from you on this, thanks!

@nirajgautam403
1 month ago

How to get the status of the job continuously.