Background job processing is a common requirement in many web applications. It allows you to execute long-running tasks in the background without blocking the main thread or the user’s request. In this tutorial, we will explore how to implement background job processing using Python, FastAPI, and Redis Queue (RQ) as the message queue.
What is Redis Queue (RQ)?
Redis Queue (RQ) is a simple Python library for queueing jobs and processing them in the background with Redis as the message broker. It is built on top of Redis and provides a simple API for creating, enqueuing, and processing jobs.
Prerequisites
Before getting started, make sure you have the following installed on your machine:
- Python 3.x
- FastAPI
- Redis server
- Redis Queue (RQ) library
Setting up Redis
First, you need to have Redis installed and running on your machine. You can download Redis from https://redis.io/download and follow the installation instructions for your operating system.
Once Redis is installed, start the Redis server by running the following command in your terminal:
redis-server
Setting up FastAPI
FastAPI is a modern web framework for building APIs with Python. You can install FastAPI using pip:
pip install fastapi
Creating a FastAPI application
Now, let’s create a simple FastAPI application that will enqueue and process background jobs using Redis Queue.
Create a new Python file, app.py
, and add the following code:
from fastapi import FastAPI
import rq
from rq import Queue
from redis import Redis
app = FastAPI()
# Connect to Redis
redis_conn = Redis()
queue = Queue(connection=redis_conn)
# Background job function
def process_job(job_id):
print(f"Processing job {job_id}")
# Route to enqueue a job
@app.post("/enqueue-job/{job_id}")
async def enqueue_job(job_id: int):
job = queue.enqueue(process_job, job_id)
return {"message": f"Job enqueued with id {job.id}"}
In this code snippet, we have created a FastAPI application with a route /enqueue-job/{job_id}
that accepts a job ID as a parameter and enqueues a background job using Redis Queue.
Processing background jobs
To process the enqueued jobs, we need to create a separate worker process that listens for and processes the jobs in the queue.
Create a new Python file, worker.py
, and add the following code:
import os
import rq
from redis import Redis
# Connect to Redis
redis_conn = Redis()
queue = rq.Queue(connection=redis_conn)
# Worker process
def worker():
with rq.Connection(redis_conn):
worker = rq.Worker([queue])
worker.work()
if __name__ == "__main__":
worker()
This code snippet creates a worker process that listens for jobs in the queue and processes them using the function process_job
defined in app.py
.
Running the application
To run the FastAPI application and start enqueueing jobs, run the following command in your terminal:
uvicorn app:app --reload
Open a new terminal window and start the worker process by running the following command:
python worker.py
Now, you can enqueue jobs by sending a POST request to http://localhost:8000/enqueue-job/{job_id}
. The worker process will process the enqueued jobs in the background.
Conclusion
In this tutorial, we learned how to implement background job processing using Python, FastAPI, and Redis Queue. By leveraging message queues like Redis Queue, you can efficiently handle long-running tasks in your web applications without affecting performance or user experience. Experiment with different types of jobs and explore more advanced features of Redis Queue to build robust and scalable applications.
i am using windows10 os. When i run the "rq worker" i get this error:
child_pid = os.fork()
^^^^^^^
AttributeError: module 'os' has no attribute 'fork'
Do i setup the que on antoher server than the producer? Qnd what are the consumers? A application on a server or ? How do you host this fulltime?
How do we start workers on different machines rather than same machine as the server ?
i am getting Waiting for background tasks to complete. (CTRL+C to force quit) while sending bulk emails did you know the solution of this problem
Where would this function be executed, on the same compute where fast api is running?
Thank you for the video Sir, Is there a way to count the pending queues in redis?
awesome tut… is it possible for you to show us celery rabbit mq grpc in python in future ?
How can I have a background process that uses a UI so the user can alter settings?
There is a guide to make this in production? Thanks 🎉
Nice tutorial. I don't get which part was the consumer?
We can clearly see the high quality video you put into. thanks!
Very beautifully explained. I am planning to implement this scenario to replace azure queue. As that is costing us more for volume of Queues deployed there. Any similar idea or video will be very much appreciated. Thank you for your effort.
Just looking like a WOW. I was just sacred of this topic. You made it very easy.
Easy to understand!
Do you have the next part of it? Reading data from redis queue and process.
Hey, actually i am receving an error …
ValueError: time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'
I am using windows subsystem for linux and have installed "python3-rq" library to run command "rq worker <queue_name>"
Very nice (:
So the job process happing in the redis server (worker) and not in the fastapi server ?!
if my job have module depends ?
Thanks
Put forward extremely simply.
Straigh-forward, to-the-point and no fluff!
11/10 would definitely recommend to everyone.
Thanks for your high quality tutorial.
I am struggling to understand how to use `redis` to cache the response from some routes on FastAPI.
For example, I have a route GET /posts. This route takes 50~70ms to process and return a response.
How can I cache this response with redis, so that the next time someone query GET /posts in the next 30 seconds, it returns a faster response?
And suppose someone makes a new post, how can I force the `cache` to refresh?
I tried to use this library "pip install fastapi-cache2", but haven't had success using redis with it. I would appreciate a tutorial from you on this, thanks!
How to get the status of the job continuously.