Introduction

Redis (Remote Dictionary Server) is an open-source, in-memory data structure store that can be used as a database, cache, message broker, and streaming engine. Redis is known for its exceptional performance, sub-millisecond latency, and support for various data structures.

This guide covers:

  • Redis Fundamentals: Core concepts and data structures
  • Use Cases: Real-world applications and patterns
  • Deployment: Step-by-step setup and configuration
  • Best Practices: Performance, reliability, and optimization
  • Practical Examples: Code samples and deployment scripts

What is Redis?

Redis is an in-memory data structure store that offers:

  • High Performance: Sub-millisecond latency
  • Data Structures: Strings, Lists, Sets, Sorted Sets, Hashes, Bitmaps, Streams
  • Persistence: Optional disk persistence (RDB, AOF)
  • Scalability: Horizontal scaling with Redis Cluster
  • Rich Features: Pub/Sub, Lua scripting, transactions, streams
  • Versatility: Cache, database, message broker, session store

Key Concepts

Key-Value Store: Redis stores data as key-value pairs where keys are strings and values can be various data structures.

In-Memory: Data is stored in RAM for fast access, with optional persistence to disk.

Data Structures: Redis supports strings, lists, sets, sorted sets, hashes, bitmaps, hyperloglogs, streams, and geospatial indexes.

Expiration: Keys can have time-to-live (TTL) for automatic expiration.

Atomic Operations: All Redis operations are atomic.

Common Use Cases

1. Caching

Cache frequently accessed data to reduce database load and improve response times.

Use Cases:

  • Database query caching
  • API response caching
  • Session caching
  • Content caching

Example:

import redis
import json
import hashlib

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_user_cached(user_id):
    """Get user from cache or database"""
    cache_key = f"user:{user_id}"
    
    # Try cache first
    cached_user = redis_client.get(cache_key)
    if cached_user:
        return json.loads(cached_user)
    
    # Cache miss - fetch from database
    user = fetch_user_from_db(user_id)
    
    # Store in cache with TTL
    redis_client.setex(
        cache_key,
        3600,  # 1 hour TTL
        json.dumps(user)
    )
    
    return user

def cache_api_response(endpoint, params, response, ttl=300):
    """Cache API response"""
    # Create cache key from endpoint and params
    cache_key = f"api:{endpoint}:{hashlib.md5(str(params).encode()).hexdigest()}"
    
    redis_client.setex(
        cache_key,
        ttl,
        json.dumps(response)
    )

def get_cached_api_response(endpoint, params):
    """Get cached API response"""
    cache_key = f"api:{endpoint}:{hashlib.md5(str(params).encode()).hexdigest()}"
    cached = redis_client.get(cache_key)
    
    if cached:
        return json.loads(cached)
    return None

2. Session Storage

Store user sessions in Redis for distributed applications.

Use Cases:

  • Web application sessions
  • User authentication tokens
  • Shopping cart data
  • Temporary user data

Example:

import redis
import uuid
import json
from datetime import timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def create_session(user_id, session_data):
    """Create user session"""
    session_id = str(uuid.uuid4())
    session_key = f"session:{session_id}"
    
    session_data['user_id'] = user_id
    session_data['created_at'] = str(datetime.now())
    
    # Store session with 24 hour TTL
    redis_client.setex(
        session_key,
        timedelta(hours=24).total_seconds(),
        json.dumps(session_data)
    )
    
    return session_id

def get_session(session_id):
    """Get session data"""
    session_key = f"session:{session_id}"
    session_data = redis_client.get(session_key)
    
    if session_data:
        return json.loads(session_data)
    return None

def update_session(session_id, updates):
    """Update session data"""
    session_key = f"session:{session_id}"
    session_data = get_session(session_id)
    
    if session_data:
        session_data.update(updates)
        ttl = redis_client.ttl(session_key)
        redis_client.setex(session_key, ttl, json.dumps(session_data))

def delete_session(session_id):
    """Delete session"""
    session_key = f"session:{session_id}"
    redis_client.delete(session_key)

3. Rate Limiting

Implement rate limiting to prevent abuse and ensure fair usage.

Use Cases:

  • API rate limiting
  • Login attempt limiting
  • Request throttling
  • Feature access control

Example:

import redis
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def rate_limit(user_id, limit=100, window=3600):
    """Rate limiting using sliding window"""
    key = f"rate_limit:{user_id}"
    current = redis_client.incr(key)
    
    if current == 1:
        # Set expiration on first request
        redis_client.expire(key, window)
    
    if current > limit:
        return False, limit - current  # Exceeded limit
    
    return True, limit - current  # Within limit

def rate_limit_sliding_window(user_id, limit=100, window=60):
    """Rate limiting using sliding window log"""
    key = f"rate_limit:{user_id}"
    now = time.time()
    
    # Remove old entries
    redis_client.zremrangebyscore(key, 0, now - window)
    
    # Count current requests
    current = redis_client.zcard(key)
    
    if current >= limit:
        return False, 0  # Exceeded limit
    
    # Add current request
    redis_client.zadd(key, {str(now): now})
    redis_client.expire(key, window)
    
    return True, limit - current - 1

# Usage
allowed, remaining = rate_limit('user123', limit=100, window=3600)
if not allowed:
    return "Rate limit exceeded", 429

4. Real-Time Leaderboards

Build real-time leaderboards using sorted sets.

Use Cases:

  • Game leaderboards
  • Ranking systems
  • Top N queries
  • Score tracking

Example:

import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def update_score(user_id, score):
    """Update user score"""
    redis_client.zadd('leaderboard', {user_id: score})

def get_rank(user_id):
    """Get user rank (0-indexed, lower is better)"""
    return redis_client.zrevrank('leaderboard', user_id)

def get_top_users(limit=10):
    """Get top N users"""
    return redis_client.zrevrange('leaderboard', 0, limit - 1, withscores=True)

def get_user_score(user_id):
    """Get user score"""
    return redis_client.zscore('leaderboard', user_id)

def get_users_around(user_id, count=5):
    """Get users around a specific user"""
    rank = get_rank(user_id)
    if rank is None:
        return []
    
    start = max(0, rank - count)
    end = rank + count
    
    return redis_client.zrevrange('leaderboard', start, end, withscores=True)

# Usage
update_score('user1', 1000)
update_score('user2', 1500)
update_score('user3', 800)

top_users = get_top_users(10)
for user_id, score in top_users:
    print(f"{user_id}: {score}")

5. Pub/Sub Messaging

Implement publish-subscribe messaging for real-time communication.

Use Cases:

  • Real-time notifications
  • Event broadcasting
  • Chat applications
  • Live updates

Example:

import redis
import threading
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)
pubsub = redis_client.pubsub()

def publish_message(channel, message):
    """Publish message to channel"""
    redis_client.publish(channel, json.dumps(message))

def subscribe_to_channel(channel, callback):
    """Subscribe to channel and process messages"""
    pubsub.subscribe(channel)
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            callback(channel, data)

def subscribe_to_pattern(pattern, callback):
    """Subscribe to channels matching pattern"""
    pubsub.psubscribe(pattern)
    
    for message in pubsub.listen():
        if message['type'] == 'pmessage':
            channel = message['channel'].decode()
            data = json.loads(message['data'])
            callback(channel, data)

# Usage
def handle_notification(channel, data):
    print(f"Received on {channel}: {data}")

# Subscribe in background thread
thread = threading.Thread(
    target=subscribe_to_channel,
    args=('notifications', handle_notification)
)
thread.start()

# Publish message
publish_message('notifications', {
    'type': 'user_mention',
    'user_id': 'user123',
    'message': 'You were mentioned!'
})

6. Job Queue

Implement job queues using Redis lists.

Use Cases:

  • Task queues
  • Background job processing
  • Email sending
  • Image processing

Example:

import redis
import json
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def enqueue_job(queue_name, job_data):
    """Add job to queue"""
    job = {
        'id': str(uuid.uuid4()),
        'data': job_data,
        'created_at': str(datetime.now())
    }
    
    redis_client.lpush(queue_name, json.dumps(job))
    return job['id']

def dequeue_job(queue_name, timeout=0):
    """Get job from queue (blocking)"""
    if timeout > 0:
        result = redis_client.brpop(queue_name, timeout=timeout)
        if result:
            return json.loads(result[1])
    else:
        result = redis_client.rpop(queue_name)
        if result:
            return json.loads(result)
    return None

def process_jobs(queue_name):
    """Process jobs from queue"""
    while True:
        job = dequeue_job(queue_name, timeout=5)
        
        if job:
            try:
                process_job(job)
            except Exception as e:
                # Handle failed job
                handle_failed_job(job, e)
        else:
            # No jobs available
            time.sleep(1)

# Usage
job_id = enqueue_job('email_queue', {
    'to': 'user@example.com',
    'subject': 'Welcome',
    'body': 'Welcome to our service!'
})

7. Distributed Locks

Implement distributed locks for coordination across multiple processes.

Use Cases:

  • Preventing race conditions
  • Resource locking
  • Critical section protection
  • Distributed coordination

Example:

import redis
import time
import uuid

redis_client = redis.Redis(host='localhost', port=6379, db=0)

class DistributedLock:
    def __init__(self, redis_client, lock_key, timeout=10):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
    
    def acquire(self):
        """Acquire lock"""
        end = time.time() + self.timeout
        
        while time.time() < end:
            if self.redis_client.set(
                self.lock_key,
                self.identifier,
                nx=True,  # Only set if not exists
                ex=self.timeout  # Expiration
            ):
                return True
            time.sleep(0.001)  # Small delay
        
        return False
    
    def release(self):
        """Release lock"""
        # Lua script for atomic release
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        self.redis_client.eval(
            lua_script,
            1,
            self.lock_key,
            self.identifier
        )
    
    def __enter__(self):
        if not self.acquire():
            raise Exception("Failed to acquire lock")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# Usage
lock = DistributedLock(redis_client, 'resource_lock', timeout=10)

with lock:
    # Critical section
    process_resource()

8. Counting and Analytics

Track counts and metrics in real-time.

Use Cases:

  • Page view counting
  • Click tracking
  • User activity metrics
  • Real-time analytics

Example:

import redis
from datetime import datetime, timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def increment_counter(key):
    """Increment counter"""
    redis_client.incr(key)

def get_counter(key):
    """Get counter value"""
    return int(redis_client.get(key) or 0)

def track_page_view(page_id):
    """Track page view"""
    today = datetime.now().strftime('%Y-%m-%d')
    key = f"page_views:{page_id}:{today}"
    redis_client.incr(key)
    redis_client.expire(key, timedelta(days=30).total_seconds())

def get_page_views(page_id, date=None):
    """Get page views for date"""
    if date is None:
        date = datetime.now().strftime('%Y-%m-%d')
    key = f"page_views:{page_id}:{date}"
    return get_counter(key)

def track_user_activity(user_id, activity_type):
    """Track user activity"""
    today = datetime.now().strftime('%Y-%m-%d')
    key = f"activity:{user_id}:{activity_type}:{today}"
    redis_client.incr(key)
    redis_client.expire(key, timedelta(days=7).total_seconds())

Deployment Guide

Prerequisites

  1. Linux/macOS/Windows: Redis runs on all major platforms
  2. Memory: Minimum 512MB RAM (more recommended)
  3. Disk Space: For persistence (RDB/AOF files)

Step 1: Install Redis

Linux (Ubuntu/Debian):

sudo apt update
sudo apt install redis-server

# Start Redis
sudo systemctl start redis-server
sudo systemctl enable redis-server

# Verify installation
redis-cli ping

macOS:

brew install redis

# Start Redis
brew services start redis

# Verify installation
redis-cli ping

Docker:

# Run Redis container
docker run -d \
  --name redis \
  -p 6379:6379 \
  redis:latest

# Verify
docker exec -it redis redis-cli ping

From Source:

wget https://download.redis.io/redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
sudo make install

Step 2: Configure Redis

Edit Redis configuration:

sudo nano /etc/redis/redis.conf

Key Configuration Options:

# Network
bind 127.0.0.1  # Listen on localhost (change for remote access)
port 6379

# Memory
maxmemory 2gb
maxmemory-policy allkeys-lru  # Eviction policy

# Persistence
save 900 1      # Save after 900 sec if at least 1 key changed
save 300 10     # Save after 300 sec if at least 10 keys changed
save 60 10000   # Save after 60 sec if at least 10000 keys changed

# AOF (Append Only File)
appendonly yes
appendfsync everysec

# Security
requirepass your_password_here

# Logging
loglevel notice
logfile /var/log/redis/redis-server.log

Restart Redis:

sudo systemctl restart redis-server

Step 3: Install Python Client

Install redis-py:

pip install redis

Install hiredis (faster parser):

pip install hiredis

Step 4: Connect to Redis

Basic Connection:

import redis

# Connect to local Redis
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True  # Automatically decode responses
)

# Test connection
redis_client.ping()

Connection with Password:

redis_client = redis.Redis(
    host='localhost',
    port=6379,
    password='your_password',
    decode_responses=True
)

Connection Pool:

import redis

# Create connection pool
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    max_connections=50,
    decode_responses=True
)

redis_client = redis.Redis(connection_pool=pool)

Step 5: Basic Operations

String Operations:

# Set value
redis_client.set('key', 'value')

# Get value
value = redis_client.get('key')

# Set with expiration
redis_client.setex('key', 3600, 'value')  # Expires in 1 hour

# Increment
redis_client.incr('counter')
redis_client.incrby('counter', 5)

# Check existence
exists = redis_client.exists('key')

Hash Operations:

# Set hash field
redis_client.hset('user:123', 'name', 'John')
redis_client.hset('user:123', 'email', 'john@example.com')

# Get hash field
name = redis_client.hget('user:123', 'name')

# Get all hash fields
user_data = redis_client.hgetall('user:123')

# Set multiple fields
redis_client.hmset('user:123', {
    'name': 'John',
    'email': 'john@example.com',
    'age': 30
})

List Operations:

# Push to list
redis_client.lpush('tasks', 'task1')
redis_client.rpush('tasks', 'task2')

# Pop from list
task = redis_client.lpop('tasks')
task = redis_client.rpop('tasks')

# Get list length
length = redis_client.llen('tasks')

# Get list range
tasks = redis_client.lrange('tasks', 0, -1)

Set Operations:

# Add to set
redis_client.sadd('tags', 'python', 'redis', 'database')

# Check membership
is_member = redis_client.sismember('tags', 'python')

# Get all members
tags = redis_client.smembers('tags')

# Set operations
redis_client.sunion('set1', 'set2')
redis_client.sinter('set1', 'set2')
redis_client.sdiff('set1', 'set2')

Sorted Set Operations:

# Add to sorted set
redis_client.zadd('leaderboard', {'user1': 100, 'user2': 200})

# Get rank
rank = redis_client.zrevrank('leaderboard', 'user1')

# Get top N
top_users = redis_client.zrevrange('leaderboard', 0, 9, withscores=True)

# Get score
score = redis_client.zscore('leaderboard', 'user1')

Step 6: Redis Cluster Setup (Optional)

For production, set up Redis Cluster:

# Create cluster configuration
mkdir redis-cluster
cd redis-cluster

# Create 6 nodes (3 masters, 3 replicas)
for port in 7000 7001 7002 7003 7004 7005; do
    mkdir $port
    cat > $port/redis.conf <<EOF
port $port
cluster-enabled yes
cluster-config-file nodes-$port.conf
cluster-node-timeout 5000
appendonly yes
EOF
done

# Start nodes
for port in 7000 7001 7002 7003 7004 7005; do
    redis-server $port/redis.conf &
done

# Create cluster
redis-cli --cluster create \
  127.0.0.1:7000 \
  127.0.0.1:7001 \
  127.0.0.1:7002 \
  127.0.0.1:7003 \
  127.0.0.1:7004 \
  127.0.0.1:7005 \
  --cluster-replicas 1

Connect to Cluster:

from redis.cluster import RedisCluster

startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]

redis_client = RedisCluster(
    startup_nodes=startup_nodes,
    decode_responses=True
)

Best Practices

1. Connection Pooling

Always use connection pools for production applications.

import redis

pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    max_connections=50,
    decode_responses=True
)

redis_client = redis.Redis(connection_pool=pool)

2. Key Naming Conventions

Use consistent key naming patterns.

# Good patterns
"user:{user_id}:profile"
"session:{session_id}"
"cache:{resource_type}:{resource_id}"
"counter:{metric_name}:{date}"

# Avoid
"user123"  # No namespace
"temp_data"  # Unclear purpose

3. Expiration Strategy

Set appropriate TTLs for cached data.

# Short TTL for frequently changing data
redis_client.setex('cache_key', 300, data)  # 5 minutes

# Longer TTL for stable data
redis_client.setex('cache_key', 3600, data)  # 1 hour

# Very long TTL for rarely changing data
redis_client.setex('cache_key', 86400, data)  # 24 hours

4. Memory Management

Configure memory limits and eviction policies.

# redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru  # Evict least recently used keys

Eviction Policies:

  • noeviction: Don’t evict (return errors)
  • allkeys-lru: Evict least recently used keys
  • volatile-lru: Evict least recently used keys with expiration
  • allkeys-lfu: Evict least frequently used keys
  • volatile-lfu: Evict least frequently used keys with expiration
  • allkeys-random: Evict random keys
  • volatile-random: Evict random keys with expiration
  • volatile-ttl: Evict keys with shortest TTL

5. Persistence Configuration

Choose appropriate persistence strategy.

RDB (Snapshot):

save 900 1      # Save if 1 key changed in 900 seconds
save 300 10     # Save if 10 keys changed in 300 seconds
save 60 10000   # Save if 10000 keys changed in 60 seconds

AOF (Append Only File):

appendonly yes
appendfsync everysec  # Sync every second (balanced)
# appendfsync always  # Sync on every write (safest, slowest)
# appendfsync no      # Let OS decide (fastest, less safe)

6. Pipeline Operations

Use pipelines for multiple operations.

# Without pipeline (multiple round trips)
redis_client.set('key1', 'value1')
redis_client.set('key2', 'value2')
redis_client.set('key3', 'value3')

# With pipeline (single round trip)
pipe = redis_client.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.execute()

7. Error Handling

Handle Redis errors gracefully.

import redis
from redis.exceptions import ConnectionError, TimeoutError

try:
    redis_client.set('key', 'value')
except ConnectionError:
    # Handle connection error
    print("Redis connection failed")
except TimeoutError:
    # Handle timeout
    print("Redis operation timed out")
except Exception as e:
    # Handle other errors
    print(f"Redis error: {e}")

Monitoring

Redis CLI Commands

# Check Redis info
redis-cli info

# Check memory usage
redis-cli info memory

# Check connected clients
redis-cli client list

# Monitor commands in real-time
redis-cli monitor

# Check slow queries
redis-cli slowlog get 10

Python Monitoring

def get_redis_stats():
    """Get Redis statistics"""
    info = redis_client.info()
    
    return {
        'used_memory': info['used_memory_human'],
        'connected_clients': info['connected_clients'],
        'total_commands_processed': info['total_commands_processed'],
        'keyspace_hits': info['keyspace_hits'],
        'keyspace_misses': info['keyspace_misses']
    }

What Interviewers Look For

Redis Knowledge & Application

  1. Data Structure Selection
    • Appropriate data structure for use case
    • Strings, Sets, Sorted Sets, Hashes
    • Red Flags: Wrong data structure, inefficient operations, poor choice
  2. Caching Strategy
    • Cache-aside pattern
    • TTL management
    • Eviction policies
    • Red Flags: No caching strategy, wrong pattern, cache stampedes
  3. Performance Optimization
    • Connection pooling
    • Pipelining
    • Memory management
    • Red Flags: No pooling, no pipelining, memory issues

System Design Skills

  1. When to Use Redis
    • Caching use cases
    • Session storage
    • Real-time features
    • Red Flags: Wrong use case, over-engineering, can’t justify
  2. Scalability Design
    • Redis Cluster
    • Replication
    • Sharding strategy
    • Red Flags: Single instance, no scaling, bottlenecks
  3. Reliability Design
    • Persistence (RDB/AOF)
    • Failover strategies
    • Red Flags: No persistence, no failover, data loss

Problem-Solving Approach

  1. Trade-off Analysis
    • Memory vs performance
    • Consistency vs availability
    • Red Flags: No trade-offs, dogmatic choices
  2. Edge Cases
    • Cache misses
    • Memory limits
    • Failover scenarios
    • Red Flags: Ignoring edge cases, no handling
  3. Cost Consideration
    • Memory costs
    • Instance sizing
    • Red Flags: Ignoring costs, over-provisioning

Communication Skills

  1. Redis Explanation
    • Can explain Redis features
    • Understands use cases
    • Red Flags: No understanding, vague explanations
  2. Decision Justification
    • Explains why Redis
    • Discusses alternatives
    • Red Flags: No justification, no alternatives

Meta-Specific Focus

  1. Caching Expertise
    • Deep Redis knowledge
    • Appropriate usage
    • Key: Show caching expertise
  2. Performance Focus
    • Low-latency design
    • High-throughput
    • Key: Demonstrate performance focus

Conclusion

Redis is a powerful in-memory data structure store that enables high-performance applications. Key takeaways:

  1. Use connection pooling for production applications
  2. Set appropriate TTLs for cached data
  3. Configure memory limits and eviction policies
  4. Use pipelines for batch operations
  5. Monitor performance and memory usage
  6. Choose appropriate data structures for your use case

Whether you’re building caching layers, session stores, real-time features, or distributed systems, Redis provides the performance and flexibility you need.

References