Introduction
Redis (Remote Dictionary Server) is an open-source, in-memory data structure store that can be used as a database, cache, message broker, and streaming engine. Redis is known for its exceptional performance, sub-millisecond latency, and support for various data structures.
This guide covers:
- Redis Fundamentals: Core concepts and data structures
- Use Cases: Real-world applications and patterns
- Deployment: Step-by-step setup and configuration
- Best Practices: Performance, reliability, and optimization
- Practical Examples: Code samples and deployment scripts
What is Redis?
Redis is an in-memory data structure store that offers:
- High Performance: Sub-millisecond latency
- Data Structures: Strings, Lists, Sets, Sorted Sets, Hashes, Bitmaps, Streams
- Persistence: Optional disk persistence (RDB, AOF)
- Scalability: Horizontal scaling with Redis Cluster
- Rich Features: Pub/Sub, Lua scripting, transactions, streams
- Versatility: Cache, database, message broker, session store
Key Concepts
Key-Value Store: Redis stores data as key-value pairs where keys are strings and values can be various data structures.
In-Memory: Data is stored in RAM for fast access, with optional persistence to disk.
Data Structures: Redis supports strings, lists, sets, sorted sets, hashes, bitmaps, hyperloglogs, streams, and geospatial indexes.
Expiration: Keys can have time-to-live (TTL) for automatic expiration.
Atomic Operations: All Redis operations are atomic.
Common Use Cases
1. Caching
Cache frequently accessed data to reduce database load and improve response times.
Use Cases:
- Database query caching
- API response caching
- Session caching
- Content caching
Example:
import redis
import json
import hashlib
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_user_cached(user_id):
"""Get user from cache or database"""
cache_key = f"user:{user_id}"
# Try cache first
cached_user = redis_client.get(cache_key)
if cached_user:
return json.loads(cached_user)
# Cache miss - fetch from database
user = fetch_user_from_db(user_id)
# Store in cache with TTL
redis_client.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(user)
)
return user
def cache_api_response(endpoint, params, response, ttl=300):
"""Cache API response"""
# Create cache key from endpoint and params
cache_key = f"api:{endpoint}:{hashlib.md5(str(params).encode()).hexdigest()}"
redis_client.setex(
cache_key,
ttl,
json.dumps(response)
)
def get_cached_api_response(endpoint, params):
"""Get cached API response"""
cache_key = f"api:{endpoint}:{hashlib.md5(str(params).encode()).hexdigest()}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
2. Session Storage
Store user sessions in Redis for distributed applications.
Use Cases:
- Web application sessions
- User authentication tokens
- Shopping cart data
- Temporary user data
Example:
import redis
import uuid
import json
from datetime import timedelta
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def create_session(user_id, session_data):
"""Create user session"""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data['user_id'] = user_id
session_data['created_at'] = str(datetime.now())
# Store session with 24 hour TTL
redis_client.setex(
session_key,
timedelta(hours=24).total_seconds(),
json.dumps(session_data)
)
return session_id
def get_session(session_id):
"""Get session data"""
session_key = f"session:{session_id}"
session_data = redis_client.get(session_key)
if session_data:
return json.loads(session_data)
return None
def update_session(session_id, updates):
"""Update session data"""
session_key = f"session:{session_id}"
session_data = get_session(session_id)
if session_data:
session_data.update(updates)
ttl = redis_client.ttl(session_key)
redis_client.setex(session_key, ttl, json.dumps(session_data))
def delete_session(session_id):
"""Delete session"""
session_key = f"session:{session_id}"
redis_client.delete(session_key)
3. Rate Limiting
Implement rate limiting to prevent abuse and ensure fair usage.
Use Cases:
- API rate limiting
- Login attempt limiting
- Request throttling
- Feature access control
Example:
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(user_id, limit=100, window=3600):
"""Rate limiting using sliding window"""
key = f"rate_limit:{user_id}"
current = redis_client.incr(key)
if current == 1:
# Set expiration on first request
redis_client.expire(key, window)
if current > limit:
return False, limit - current # Exceeded limit
return True, limit - current # Within limit
def rate_limit_sliding_window(user_id, limit=100, window=60):
"""Rate limiting using sliding window log"""
key = f"rate_limit:{user_id}"
now = time.time()
# Remove old entries
redis_client.zremrangebyscore(key, 0, now - window)
# Count current requests
current = redis_client.zcard(key)
if current >= limit:
return False, 0 # Exceeded limit
# Add current request
redis_client.zadd(key, {str(now): now})
redis_client.expire(key, window)
return True, limit - current - 1
# Usage
allowed, remaining = rate_limit('user123', limit=100, window=3600)
if not allowed:
return "Rate limit exceeded", 429
4. Real-Time Leaderboards
Build real-time leaderboards using sorted sets.
Use Cases:
- Game leaderboards
- Ranking systems
- Top N queries
- Score tracking
Example:
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def update_score(user_id, score):
"""Update user score"""
redis_client.zadd('leaderboard', {user_id: score})
def get_rank(user_id):
"""Get user rank (0-indexed, lower is better)"""
return redis_client.zrevrank('leaderboard', user_id)
def get_top_users(limit=10):
"""Get top N users"""
return redis_client.zrevrange('leaderboard', 0, limit - 1, withscores=True)
def get_user_score(user_id):
"""Get user score"""
return redis_client.zscore('leaderboard', user_id)
def get_users_around(user_id, count=5):
"""Get users around a specific user"""
rank = get_rank(user_id)
if rank is None:
return []
start = max(0, rank - count)
end = rank + count
return redis_client.zrevrange('leaderboard', start, end, withscores=True)
# Usage
update_score('user1', 1000)
update_score('user2', 1500)
update_score('user3', 800)
top_users = get_top_users(10)
for user_id, score in top_users:
print(f"{user_id}: {score}")
5. Pub/Sub Messaging
Implement publish-subscribe messaging for real-time communication.
Use Cases:
- Real-time notifications
- Event broadcasting
- Chat applications
- Live updates
Example:
import redis
import threading
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
pubsub = redis_client.pubsub()
def publish_message(channel, message):
"""Publish message to channel"""
redis_client.publish(channel, json.dumps(message))
def subscribe_to_channel(channel, callback):
"""Subscribe to channel and process messages"""
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(channel, data)
def subscribe_to_pattern(pattern, callback):
"""Subscribe to channels matching pattern"""
pubsub.psubscribe(pattern)
for message in pubsub.listen():
if message['type'] == 'pmessage':
channel = message['channel'].decode()
data = json.loads(message['data'])
callback(channel, data)
# Usage
def handle_notification(channel, data):
print(f"Received on {channel}: {data}")
# Subscribe in background thread
thread = threading.Thread(
target=subscribe_to_channel,
args=('notifications', handle_notification)
)
thread.start()
# Publish message
publish_message('notifications', {
'type': 'user_mention',
'user_id': 'user123',
'message': 'You were mentioned!'
})
6. Job Queue
Implement job queues using Redis lists.
Use Cases:
- Task queues
- Background job processing
- Email sending
- Image processing
Example:
import redis
import json
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def enqueue_job(queue_name, job_data):
"""Add job to queue"""
job = {
'id': str(uuid.uuid4()),
'data': job_data,
'created_at': str(datetime.now())
}
redis_client.lpush(queue_name, json.dumps(job))
return job['id']
def dequeue_job(queue_name, timeout=0):
"""Get job from queue (blocking)"""
if timeout > 0:
result = redis_client.brpop(queue_name, timeout=timeout)
if result:
return json.loads(result[1])
else:
result = redis_client.rpop(queue_name)
if result:
return json.loads(result)
return None
def process_jobs(queue_name):
"""Process jobs from queue"""
while True:
job = dequeue_job(queue_name, timeout=5)
if job:
try:
process_job(job)
except Exception as e:
# Handle failed job
handle_failed_job(job, e)
else:
# No jobs available
time.sleep(1)
# Usage
job_id = enqueue_job('email_queue', {
'to': 'user@example.com',
'subject': 'Welcome',
'body': 'Welcome to our service!'
})
7. Distributed Locks
Implement distributed locks for coordination across multiple processes.
Use Cases:
- Preventing race conditions
- Resource locking
- Critical section protection
- Distributed coordination
Example:
import redis
import time
import uuid
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class DistributedLock:
def __init__(self, redis_client, lock_key, timeout=10):
self.redis_client = redis_client
self.lock_key = lock_key
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self):
"""Acquire lock"""
end = time.time() + self.timeout
while time.time() < end:
if self.redis_client.set(
self.lock_key,
self.identifier,
nx=True, # Only set if not exists
ex=self.timeout # Expiration
):
return True
time.sleep(0.001) # Small delay
return False
def release(self):
"""Release lock"""
# Lua script for atomic release
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis_client.eval(
lua_script,
1,
self.lock_key,
self.identifier
)
def __enter__(self):
if not self.acquire():
raise Exception("Failed to acquire lock")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# Usage
lock = DistributedLock(redis_client, 'resource_lock', timeout=10)
with lock:
# Critical section
process_resource()
8. Counting and Analytics
Track counts and metrics in real-time.
Use Cases:
- Page view counting
- Click tracking
- User activity metrics
- Real-time analytics
Example:
import redis
from datetime import datetime, timedelta
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def increment_counter(key):
"""Increment counter"""
redis_client.incr(key)
def get_counter(key):
"""Get counter value"""
return int(redis_client.get(key) or 0)
def track_page_view(page_id):
"""Track page view"""
today = datetime.now().strftime('%Y-%m-%d')
key = f"page_views:{page_id}:{today}"
redis_client.incr(key)
redis_client.expire(key, timedelta(days=30).total_seconds())
def get_page_views(page_id, date=None):
"""Get page views for date"""
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
key = f"page_views:{page_id}:{date}"
return get_counter(key)
def track_user_activity(user_id, activity_type):
"""Track user activity"""
today = datetime.now().strftime('%Y-%m-%d')
key = f"activity:{user_id}:{activity_type}:{today}"
redis_client.incr(key)
redis_client.expire(key, timedelta(days=7).total_seconds())
Deployment Guide
Prerequisites
- Linux/macOS/Windows: Redis runs on all major platforms
- Memory: Minimum 512MB RAM (more recommended)
- Disk Space: For persistence (RDB/AOF files)
Step 1: Install Redis
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install redis-server
# Start Redis
sudo systemctl start redis-server
sudo systemctl enable redis-server
# Verify installation
redis-cli ping
macOS:
brew install redis
# Start Redis
brew services start redis
# Verify installation
redis-cli ping
Docker:
# Run Redis container
docker run -d \
--name redis \
-p 6379:6379 \
redis:latest
# Verify
docker exec -it redis redis-cli ping
From Source:
wget https://download.redis.io/redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
sudo make install
Step 2: Configure Redis
Edit Redis configuration:
sudo nano /etc/redis/redis.conf
Key Configuration Options:
# Network
bind 127.0.0.1 # Listen on localhost (change for remote access)
port 6379
# Memory
maxmemory 2gb
maxmemory-policy allkeys-lru # Eviction policy
# Persistence
save 900 1 # Save after 900 sec if at least 1 key changed
save 300 10 # Save after 300 sec if at least 10 keys changed
save 60 10000 # Save after 60 sec if at least 10000 keys changed
# AOF (Append Only File)
appendonly yes
appendfsync everysec
# Security
requirepass your_password_here
# Logging
loglevel notice
logfile /var/log/redis/redis-server.log
Restart Redis:
sudo systemctl restart redis-server
Step 3: Install Python Client
Install redis-py:
pip install redis
Install hiredis (faster parser):
pip install hiredis
Step 4: Connect to Redis
Basic Connection:
import redis
# Connect to local Redis
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # Automatically decode responses
)
# Test connection
redis_client.ping()
Connection with Password:
redis_client = redis.Redis(
host='localhost',
port=6379,
password='your_password',
decode_responses=True
)
Connection Pool:
import redis
# Create connection pool
pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=pool)
Step 5: Basic Operations
String Operations:
# Set value
redis_client.set('key', 'value')
# Get value
value = redis_client.get('key')
# Set with expiration
redis_client.setex('key', 3600, 'value') # Expires in 1 hour
# Increment
redis_client.incr('counter')
redis_client.incrby('counter', 5)
# Check existence
exists = redis_client.exists('key')
Hash Operations:
# Set hash field
redis_client.hset('user:123', 'name', 'John')
redis_client.hset('user:123', 'email', 'john@example.com')
# Get hash field
name = redis_client.hget('user:123', 'name')
# Get all hash fields
user_data = redis_client.hgetall('user:123')
# Set multiple fields
redis_client.hmset('user:123', {
'name': 'John',
'email': 'john@example.com',
'age': 30
})
List Operations:
# Push to list
redis_client.lpush('tasks', 'task1')
redis_client.rpush('tasks', 'task2')
# Pop from list
task = redis_client.lpop('tasks')
task = redis_client.rpop('tasks')
# Get list length
length = redis_client.llen('tasks')
# Get list range
tasks = redis_client.lrange('tasks', 0, -1)
Set Operations:
# Add to set
redis_client.sadd('tags', 'python', 'redis', 'database')
# Check membership
is_member = redis_client.sismember('tags', 'python')
# Get all members
tags = redis_client.smembers('tags')
# Set operations
redis_client.sunion('set1', 'set2')
redis_client.sinter('set1', 'set2')
redis_client.sdiff('set1', 'set2')
Sorted Set Operations:
# Add to sorted set
redis_client.zadd('leaderboard', {'user1': 100, 'user2': 200})
# Get rank
rank = redis_client.zrevrank('leaderboard', 'user1')
# Get top N
top_users = redis_client.zrevrange('leaderboard', 0, 9, withscores=True)
# Get score
score = redis_client.zscore('leaderboard', 'user1')
Step 6: Redis Cluster Setup (Optional)
For production, set up Redis Cluster:
# Create cluster configuration
mkdir redis-cluster
cd redis-cluster
# Create 6 nodes (3 masters, 3 replicas)
for port in 7000 7001 7002 7003 7004 7005; do
mkdir $port
cat > $port/redis.conf <<EOF
port $port
cluster-enabled yes
cluster-config-file nodes-$port.conf
cluster-node-timeout 5000
appendonly yes
EOF
done
# Start nodes
for port in 7000 7001 7002 7003 7004 7005; do
redis-server $port/redis.conf &
done
# Create cluster
redis-cli --cluster create \
127.0.0.1:7000 \
127.0.0.1:7001 \
127.0.0.1:7002 \
127.0.0.1:7003 \
127.0.0.1:7004 \
127.0.0.1:7005 \
--cluster-replicas 1
Connect to Cluster:
from redis.cluster import RedisCluster
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
redis_client = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True
)
Best Practices
1. Connection Pooling
Always use connection pools for production applications.
import redis
pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=pool)
2. Key Naming Conventions
Use consistent key naming patterns.
# Good patterns
"user:{user_id}:profile"
"session:{session_id}"
"cache:{resource_type}:{resource_id}"
"counter:{metric_name}:{date}"
# Avoid
"user123" # No namespace
"temp_data" # Unclear purpose
3. Expiration Strategy
Set appropriate TTLs for cached data.
# Short TTL for frequently changing data
redis_client.setex('cache_key', 300, data) # 5 minutes
# Longer TTL for stable data
redis_client.setex('cache_key', 3600, data) # 1 hour
# Very long TTL for rarely changing data
redis_client.setex('cache_key', 86400, data) # 24 hours
4. Memory Management
Configure memory limits and eviction policies.
# redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru # Evict least recently used keys
Eviction Policies:
noeviction: Don’t evict (return errors)allkeys-lru: Evict least recently used keysvolatile-lru: Evict least recently used keys with expirationallkeys-lfu: Evict least frequently used keysvolatile-lfu: Evict least frequently used keys with expirationallkeys-random: Evict random keysvolatile-random: Evict random keys with expirationvolatile-ttl: Evict keys with shortest TTL
5. Persistence Configuration
Choose appropriate persistence strategy.
RDB (Snapshot):
save 900 1 # Save if 1 key changed in 900 seconds
save 300 10 # Save if 10 keys changed in 300 seconds
save 60 10000 # Save if 10000 keys changed in 60 seconds
AOF (Append Only File):
appendonly yes
appendfsync everysec # Sync every second (balanced)
# appendfsync always # Sync on every write (safest, slowest)
# appendfsync no # Let OS decide (fastest, less safe)
6. Pipeline Operations
Use pipelines for multiple operations.
# Without pipeline (multiple round trips)
redis_client.set('key1', 'value1')
redis_client.set('key2', 'value2')
redis_client.set('key3', 'value3')
# With pipeline (single round trip)
pipe = redis_client.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.execute()
7. Error Handling
Handle Redis errors gracefully.
import redis
from redis.exceptions import ConnectionError, TimeoutError
try:
redis_client.set('key', 'value')
except ConnectionError:
# Handle connection error
print("Redis connection failed")
except TimeoutError:
# Handle timeout
print("Redis operation timed out")
except Exception as e:
# Handle other errors
print(f"Redis error: {e}")
Monitoring
Redis CLI Commands
# Check Redis info
redis-cli info
# Check memory usage
redis-cli info memory
# Check connected clients
redis-cli client list
# Monitor commands in real-time
redis-cli monitor
# Check slow queries
redis-cli slowlog get 10
Python Monitoring
def get_redis_stats():
"""Get Redis statistics"""
info = redis_client.info()
return {
'used_memory': info['used_memory_human'],
'connected_clients': info['connected_clients'],
'total_commands_processed': info['total_commands_processed'],
'keyspace_hits': info['keyspace_hits'],
'keyspace_misses': info['keyspace_misses']
}
What Interviewers Look For
Redis Knowledge & Application
- Data Structure Selection
- Appropriate data structure for use case
- Strings, Sets, Sorted Sets, Hashes
- Red Flags: Wrong data structure, inefficient operations, poor choice
- Caching Strategy
- Cache-aside pattern
- TTL management
- Eviction policies
- Red Flags: No caching strategy, wrong pattern, cache stampedes
- Performance Optimization
- Connection pooling
- Pipelining
- Memory management
- Red Flags: No pooling, no pipelining, memory issues
System Design Skills
- When to Use Redis
- Caching use cases
- Session storage
- Real-time features
- Red Flags: Wrong use case, over-engineering, can’t justify
- Scalability Design
- Redis Cluster
- Replication
- Sharding strategy
- Red Flags: Single instance, no scaling, bottlenecks
- Reliability Design
- Persistence (RDB/AOF)
- Failover strategies
- Red Flags: No persistence, no failover, data loss
Problem-Solving Approach
- Trade-off Analysis
- Memory vs performance
- Consistency vs availability
- Red Flags: No trade-offs, dogmatic choices
- Edge Cases
- Cache misses
- Memory limits
- Failover scenarios
- Red Flags: Ignoring edge cases, no handling
- Cost Consideration
- Memory costs
- Instance sizing
- Red Flags: Ignoring costs, over-provisioning
Communication Skills
- Redis Explanation
- Can explain Redis features
- Understands use cases
- Red Flags: No understanding, vague explanations
- Decision Justification
- Explains why Redis
- Discusses alternatives
- Red Flags: No justification, no alternatives
Meta-Specific Focus
- Caching Expertise
- Deep Redis knowledge
- Appropriate usage
- Key: Show caching expertise
- Performance Focus
- Low-latency design
- High-throughput
- Key: Demonstrate performance focus
Conclusion
Redis is a powerful in-memory data structure store that enables high-performance applications. Key takeaways:
- Use connection pooling for production applications
- Set appropriate TTLs for cached data
- Configure memory limits and eviction policies
- Use pipelines for batch operations
- Monitor performance and memory usage
- Choose appropriate data structures for your use case
Whether you’re building caching layers, session stores, real-time features, or distributed systems, Redis provides the performance and flexibility you need.