Introduction
A task scheduler is a system component that manages the execution of tasks (jobs) based on various criteria such as priority, time, dependencies, and resource availability. Unlike distributed job schedulers that coordinate across multiple machines, a single-machine task scheduler operates entirely on one system, managing tasks within process or thread boundaries.
Designing a task scheduler requires understanding scheduling algorithms, priority management, task dependencies, resource constraints, and efficient data structures for task queuing and execution. This is a common system design interview question that tests your knowledge of operating systems, algorithms, and resource management.
This guide covers the design of a single-machine task scheduler, including architecture, scheduling algorithms, task management, dependency resolution, and implementation patterns.
Table of Contents
- Problem Statement
- Requirements
- Core Components
- API Design
- High-Level Architecture
- Scheduling Algorithms
- Detailed Design
- Deep Dive
- Trade-offs and Design Decisions
- Summary
Problem Statement
Design a single-machine task scheduler that:
- Accepts tasks with various attributes (priority, schedule, dependencies)
- Schedules tasks based on priority, time, and dependencies
- Executes tasks using available resources (threads, processes)
- Manages task dependencies (execute B after A completes)
- Handles retries for failed tasks
- Tracks task status and execution history
- Supports different scheduling policies (FIFO, priority, fair share)
Scale Requirements:
- Support 1K-1M tasks
- Handle 100-10K tasks per second
- Task execution time: milliseconds to hours
- Low scheduling overhead: < 1% CPU
- Memory efficient: < 100 bytes per task overhead
Requirements
Functional Requirements
Core Features:
- Task Submission: Submit tasks with metadata
- Task Scheduling: Schedule tasks based on criteria
- Task Execution: Execute tasks using executors
- Priority Management: Support task priorities
- Time-based Scheduling: Support cron, interval, one-time schedules
- Dependency Management: Handle task dependencies
- Retry Logic: Retry failed tasks
- Status Tracking: Track task status and history
Advanced Features:
- Resource Constraints: Limit tasks by CPU, memory
- Task Groups: Group related tasks
- Task Cancellation: Cancel pending/running tasks
- Task Pausing: Pause task execution
- Task Persistence: Persist tasks to disk
- Fair Scheduling: Prevent starvation
Non-Functional Requirements
Performance:
- Task submission: < 1ms overhead
- Scheduling decision: < 100μs
- Low memory overhead per task
- Efficient dependency resolution
Reliability:
- No task loss
- Guaranteed execution (if resources available)
- Handle executor failures
- Recover from crashes
Scalability:
- Support 1K-1M tasks
- Handle high submission rate
- Efficient with many dependencies
Concurrency:
- Thread-safe operations
- Concurrent task execution
- Safe dependency updates
Core Components
1. Task
- Task definition and metadata
- Execution information
- Status and history
2. Task Queue
- Stores pending tasks
- Priority ordering
- Dependency tracking
3. Scheduler
- Decides which task to execute next
- Implements scheduling algorithms
- Manages scheduling policies
4. Executor
- Executes tasks
- Manages execution resources
- Reports execution results
5. Dependency Manager
- Tracks task dependencies
- Resolves dependencies
- Triggers dependent tasks
API Design
Task Scheduler API
class TaskScheduler:
def __init__(self,
executor: TaskExecutor,
max_concurrent_tasks: int = 10):
"""
Initialize task scheduler.
Args:
executor: Task executor for running tasks
max_concurrent_tasks: Maximum concurrent tasks
"""
pass
def submit(self,
task: Callable,
task_id: str = None,
priority: int = 0,
schedule: Schedule = None,
dependencies: List[str] = None,
retry_policy: RetryPolicy = None,
**kwargs) -> str:
"""
Submit a task for scheduling.
Args:
task: Callable to execute
task_id: Unique task ID (auto-generated if None)
priority: Task priority (higher = more important)
schedule: Schedule (cron, interval, one-time)
dependencies: List of task IDs that must complete first
retry_policy: Retry policy for failures
Returns:
Task ID
"""
pass
def cancel(self, task_id: str) -> bool:
"""
Cancel a pending or running task.
Returns:
True if cancelled, False if not found or already completed
"""
pass
def get_status(self, task_id: str) -> TaskStatus:
"""Get task status."""
pass
def get_history(self, task_id: str) -> List[TaskExecution]:
"""Get task execution history."""
pass
def shutdown(self, wait: bool = True):
"""Shutdown scheduler."""
pass
Task Definition
class Task:
def __init__(self,
task_id: str,
callable: Callable,
args: tuple = (),
kwargs: dict = None,
priority: int = 0,
schedule: Schedule = None,
dependencies: List[str] = None,
retry_policy: RetryPolicy = None,
resource_requirements: ResourceRequirements = None):
self.task_id = task_id
self.callable = callable
self.args = args
self.kwargs = kwargs or {}
self.priority = priority
self.schedule = schedule
self.dependencies = dependencies or []
self.retry_policy = retry_policy
self.resource_requirements = resource_requirements
# Status
self.status = TaskStatus.PENDING
self.created_at = time.time()
self.scheduled_at = None
self.started_at = None
self.completed_at = None
# Execution history
self.executions = []
self.retry_count = 0
Schedule Types
class Schedule:
pass
class OneTimeSchedule(Schedule):
def __init__(self, execute_at: float):
self.execute_at = execute_at
class IntervalSchedule(Schedule):
def __init__(self, interval_seconds: float, start_time: float = None):
self.interval_seconds = interval_seconds
self.start_time = start_time or time.time()
class CronSchedule(Schedule):
def __init__(self, cron_expression: str):
self.cron_expression = cron_expression
self.parser = CronParser(cron_expression)
def next_run_time(self, current_time: float) -> float:
return self.parser.get_next(current_time)
High-Level Architecture
┌─────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Submit tasks: submit(task), cancel(task_id)) │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ Task Scheduler │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Task │ │ Scheduler │ │ Dependency │ │
│ │ Queue │ │ Engine │ │ Manager │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Task │ │ Resource │ │ Retry │ │
│ │ Store │ │ Manager │ │ Manager │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ Task Executor │
│ (Thread pool, process pool, or custom executor) │
└─────────────────────────────────────────────────────────┘
Scheduling Algorithms
1. FIFO (First In First Out)
Characteristics:
- Simple implementation
- Fair ordering
- No priority support
- May starve low-priority tasks
Use Cases:
- Simple workloads
- No priority requirements
- Fair execution order
Implementation:
class FIFOScheduler:
def __init__(self):
self.queue = queue.Queue()
def schedule(self, task: Task):
self.queue.put(task)
def get_next(self) -> Optional[Task]:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
2. Priority Scheduling
Characteristics:
- Higher priority tasks first
- May starve low-priority tasks
- Requires priority queue
Use Cases:
- Priority-based workloads
- Critical vs non-critical tasks
Implementation:
import heapq
class PriorityScheduler:
def __init__(self):
self.queue = []
self.counter = 0 # For FIFO tie-breaking
self.lock = threading.Lock()
def schedule(self, task: Task):
with self.lock:
# Negative priority for max-heap (higher priority first)
heapq.heappush(self.queue, (-task.priority, self.counter, task))
self.counter += 1
def get_next(self) -> Optional[Task]:
with self.lock:
if not self.queue:
return None
_, _, task = heapq.heappop(self.queue)
return task
3. Fair Share Scheduling
Characteristics:
- Prevents starvation
- Fair resource distribution
- More complex implementation
Use Cases:
- Multiple users/groups
- Need fairness guarantees
Implementation:
class FairShareScheduler:
def __init__(self):
self.queues = {} # user_id -> queue
self.weights = {} # user_id -> weight
self.last_scheduled = {} # user_id -> last scheduled time
self.lock = threading.Lock()
def schedule(self, task: Task, user_id: str):
with self.lock:
if user_id not in self.queues:
self.queues[user_id] = queue.Queue()
self.weights[user_id] = 1.0
self.last_scheduled[user_id] = 0
self.queues[user_id].put(task)
def get_next(self) -> Optional[Task]:
with self.lock:
if not self.queues:
return None
# Calculate virtual time for each user
current_time = time.time()
best_user = None
best_virtual_time = float('inf')
for user_id, user_queue in self.queues.items():
if user_queue.empty():
continue
# Virtual time = last_scheduled + (1 / weight)
virtual_time = self.last_scheduled[user_id] + (1.0 / self.weights[user_id])
if virtual_time < best_virtual_time:
best_virtual_time = virtual_time
best_user = user_id
if best_user:
task = self.queues[best_user].get_nowait()
self.last_scheduled[best_user] = current_time
return task
return None
4. Round-Robin Scheduling
Characteristics:
- Distribute tasks evenly
- Simple implementation
- Good for similar task sizes
Use Cases:
- Similar task execution times
- Need even distribution
Detailed Design
Task Queue Manager
class TaskQueueManager:
def __init__(self, scheduler: Scheduler):
self.scheduler = scheduler
self.pending_tasks = {} # task_id -> Task
self.running_tasks = {} # task_id -> Task
self.completed_tasks = {} # task_id -> Task
self.lock = threading.Lock()
def add_task(self, task: Task):
"""Add task to queue."""
with self.lock:
self.pending_tasks[task.task_id] = task
self.scheduler.schedule(task)
def get_next_ready_task(self) -> Optional[Task]:
"""Get next task ready for execution."""
with self.lock:
task = self.scheduler.get_next()
if task and self._is_ready(task):
del self.pending_tasks[task.task_id]
self.running_tasks[task.task_id] = task
task.status = TaskStatus.RUNNING
task.started_at = time.time()
return task
return None
def _is_ready(self, task: Task) -> bool:
"""Check if task is ready (dependencies met, schedule time reached)."""
# Check dependencies
for dep_id in task.dependencies:
if dep_id not in self.completed_tasks:
return False
dep_task = self.completed_tasks[dep_id]
if dep_task.status != TaskStatus.COMPLETED:
return False
# Check schedule
if task.schedule:
current_time = time.time()
if isinstance(task.schedule, OneTimeSchedule):
if current_time < task.schedule.execute_at:
return False
elif isinstance(task.schedule, IntervalSchedule):
if task.scheduled_at is None:
task.scheduled_at = task.schedule.start_time
if current_time < task.scheduled_at:
return False
return True
def complete_task(self, task_id: str, success: bool, result: Any = None, error: Exception = None):
"""Mark task as completed."""
with self.lock:
if task_id not in self.running_tasks:
return
task = self.running_tasks[task_id]
del self.running_tasks[task_id]
task.completed_at = time.time()
task.status = TaskStatus.COMPLETED if success else TaskStatus.FAILED
task.result = result
task.error = error
self.completed_tasks[task_id] = task
# Trigger dependent tasks
self._trigger_dependent_tasks(task_id)
def _trigger_dependent_tasks(self, completed_task_id: str):
"""Check and schedule tasks that depend on completed task."""
for task in self.pending_tasks.values():
if completed_task_id in task.dependencies:
# Check if all dependencies are met
if self._is_ready(task):
self.scheduler.schedule(task)
Scheduler
class TaskScheduler:
def __init__(self,
executor: TaskExecutor,
scheduler_type: str = "priority",
max_concurrent_tasks: int = 10):
self.executor = executor
self.max_concurrent_tasks = max_concurrent_tasks
self.current_tasks = 0
self.lock = threading.Lock()
# Create scheduler based on type
if scheduler_type == "fifo":
self.scheduler = FIFOScheduler()
elif scheduler_type == "priority":
self.scheduler = PriorityScheduler()
elif scheduler_type == "fair_share":
self.scheduler = FairShareScheduler()
else:
raise ValueError(f"Unknown scheduler type: {scheduler_type}")
# Task queue manager
self.queue_manager = TaskQueueManager(self.scheduler)
# Scheduler thread
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.running = True
self.scheduler_thread.start()
def submit(self, task: Callable, **kwargs) -> str:
"""Submit task."""
task_obj = Task(
task_id=kwargs.get('task_id') or self._generate_task_id(),
callable=task,
**kwargs
)
self.queue_manager.add_task(task_obj)
return task_obj.task_id
def _scheduler_loop(self):
"""Main scheduler loop."""
while self.running:
with self.lock:
if self.current_tasks >= self.max_concurrent_tasks:
time.sleep(0.01) # Wait for slot
continue
# Get next ready task
task = self.queue_manager.get_next_ready_task()
if task:
with self.lock:
self.current_tasks += 1
# Execute task
self.executor.execute(
task,
callback=lambda t, success, result, error: self._task_completed(t, success, result, error)
)
else:
time.sleep(0.1) # No tasks, sleep briefly
def _task_completed(self, task: Task, success: bool, result: Any, error: Exception):
"""Handle task completion."""
with self.lock:
self.current_tasks -= 1
self.queue_manager.complete_task(task.task_id, success, result, error)
# Handle retry if failed
if not success and task.retry_policy:
if task.retry_count < task.retry_policy.max_retries:
task.retry_count += 1
task.status = TaskStatus.PENDING
# Reschedule with delay
time.sleep(task.retry_policy.retry_delay)
self.queue_manager.add_task(task)
Task Executor
class TaskExecutor:
def __init__(self, executor_type: str = "thread_pool", num_workers: int = 10):
if executor_type == "thread_pool":
from concurrent.futures import ThreadPoolExecutor
self.executor = ThreadPoolExecutor(max_workers=num_workers)
elif executor_type == "process_pool":
from concurrent.futures import ProcessPoolExecutor
self.executor = ProcessPoolExecutor(max_workers=num_workers)
else:
raise ValueError(f"Unknown executor type: {executor_type}")
def execute(self, task: Task, callback: Callable):
"""Execute task asynchronously."""
future = self.executor.submit(self._execute_task, task)
future.add_done_callback(lambda f: self._handle_result(f, task, callback))
def _execute_task(self, task: Task):
"""Execute task and return result."""
try:
result = task.callable(*task.args, **task.kwargs)
return (True, result, None)
except Exception as e:
return (False, None, e)
def _handle_result(self, future, task: Task, callback: Callable):
"""Handle execution result."""
try:
success, result, error = future.result()
callback(task, success, result, error)
except Exception as e:
callback(task, False, None, e)
Dependency Resolver
class DependencyResolver:
def __init__(self, queue_manager: TaskQueueManager):
self.queue_manager = queue_manager
self.dependency_graph = {} # task_id -> set of dependent task IDs
self.lock = threading.Lock()
def add_dependency(self, task_id: str, depends_on: str):
"""Add dependency relationship."""
with self.lock:
if depends_on not in self.dependency_graph:
self.dependency_graph[depends_on] = set()
self.dependency_graph[depends_on].add(task_id)
def on_task_completed(self, completed_task_id: str):
"""Handle task completion and trigger dependents."""
with self.lock:
dependent_tasks = self.dependency_graph.get(completed_task_id, set())
for dependent_task_id in dependent_tasks:
# Check if all dependencies are met
dependent_task = self.queue_manager.pending_tasks.get(dependent_task_id)
if dependent_task and self._all_dependencies_met(dependent_task):
# Task is ready, schedule it
self.queue_manager.scheduler.schedule(dependent_task)
def _all_dependencies_met(self, task: Task) -> bool:
"""Check if all dependencies are completed."""
for dep_id in task.dependencies:
if dep_id not in self.queue_manager.completed_tasks:
return False
dep_task = self.queue_manager.completed_tasks[dep_id]
if dep_task.status != TaskStatus.COMPLETED:
return False
return True
Deep Dive
Priority Scheduling
Priority Levels:
- Critical: 100
- High: 50
- Normal: 0
- Low: -50
Implementation Considerations:
- Use max-heap for priority queue
- Tie-breaking with submission time
- Prevent starvation of low-priority tasks
Task Dependencies
Dependency Types:
- Sequential: B runs after A completes
- Parallel: B and C run after A (no dependency between B and C)
- Conditional: B runs after A only if A succeeds
Dependency Resolution:
- Build dependency graph
- Topological sort for execution order
- Track completion status
- Trigger dependent tasks on completion
Retry Mechanisms
Retry Policies:
class RetryPolicy:
def __init__(self,
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_multiplier: float = 2.0,
max_delay: float = 60.0):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.backoff_multiplier = backoff_multiplier
self.max_delay = max_delay
def get_retry_delay(self, retry_count: int) -> float:
"""Calculate retry delay with exponential backoff."""
delay = self.retry_delay * (self.backoff_multiplier ** retry_count)
return min(delay, self.max_delay)
Resource Management
Resource Constraints:
class ResourceRequirements:
def __init__(self, cpu: float = 1.0, memory: int = 0):
self.cpu = cpu # CPU cores
self.memory = memory # Memory in bytes
class ResourceManager:
def __init__(self, total_cpu: float, total_memory: int):
self.total_cpu = total_cpu
self.total_memory = total_memory
self.used_cpu = 0.0
self.used_memory = 0
self.lock = threading.Lock()
def can_allocate(self, requirements: ResourceRequirements) -> bool:
"""Check if resources can be allocated."""
with self.lock:
return (self.used_cpu + requirements.cpu <= self.total_cpu and
self.used_memory + requirements.memory <= self.total_memory)
def allocate(self, requirements: ResourceRequirements):
"""Allocate resources."""
with self.lock:
self.used_cpu += requirements.cpu
self.used_memory += requirements.memory
def deallocate(self, requirements: ResourceRequirements):
"""Deallocate resources."""
with self.lock:
self.used_cpu -= requirements.cpu
self.used_memory -= requirements.memory
Task Persistence
Persistence Strategies:
- In-memory only (fast, lost on crash)
- File-based (simple, slower)
- Database (reliable, more complex)
Implementation:
class TaskPersistence:
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.lock = threading.Lock()
def save_task(self, task: Task):
"""Save task to disk."""
with self.lock:
filepath = os.path.join(self.storage_path, f"{task.task_id}.json")
with open(filepath, 'w') as f:
json.dump(task.to_dict(), f)
def load_task(self, task_id: str) -> Task:
"""Load task from disk."""
filepath = os.path.join(self.storage_path, f"{task_id}.json")
with open(filepath, 'r') as f:
data = json.load(f)
return Task.from_dict(data)
def load_all_tasks(self) -> List[Task]:
"""Load all tasks from disk."""
tasks = []
for filename in os.listdir(self.storage_path):
if filename.endswith('.json'):
task_id = filename[:-5]
tasks.append(self.load_task(task_id))
return tasks
Trade-offs and Design Decisions
FIFO vs Priority Scheduling
FIFO:
- Pros: Simple, fair, predictable
- Cons: No priority support, may starve important tasks
- Use When: No priority requirements, need fairness
Priority:
- Pros: Important tasks first, flexible
- Cons: May starve low-priority tasks, more complex
- Use When: Need priority handling, critical tasks exist
In-Memory vs Persistent Storage
In-Memory:
- Pros: Fast, simple
- Cons: Lost on crash
- Use When: Can tolerate data loss, need speed
Persistent:
- Pros: Survives crashes, reliable
- Cons: Slower, more complex
- Use When: Need reliability, critical tasks
Thread Pool vs Process Pool Executor
Thread Pool:
- Pros: Lower overhead, shared memory
- Cons: GIL limitations (Python), less isolation
- Use When: I/O-bound tasks, need shared state
Process Pool:
- Pros: True parallelism, isolation
- Cons: Higher overhead, no shared memory
- Use When: CPU-bound tasks, need isolation
What Interviewers Look For
Algorithm Knowledge
- Scheduling Algorithms
- Understanding of FIFO, priority, fair share
- When to use which algorithm
- Starvation prevention
- Red Flags: No algorithm knowledge, wrong choice
- Data Structures
- Priority queue implementation
- Task queue design
- Efficient data structures
- Red Flags: Inefficient structures, wrong choices
Problem-Solving Approach
- Dependency Management
- Dependency graph modeling
- Topological sort understanding
- Dependency resolution
- Red Flags: No dependency handling, incorrect resolution
- Resource Management
- Resource constraints handling
- Fair resource allocation
- Red Flags: No resource management, unfair allocation
- Edge Cases
- Task failures
- Circular dependencies
- Resource exhaustion
- Red Flags: Ignoring edge cases, no handling
Code Quality
- State Management
- Proper task state tracking
- State transitions
- Red Flags: Incorrect states, invalid transitions
- Error Handling
- Retry mechanisms
- Failure handling
- Red Flags: No retry, no failure handling
Meta-Specific Focus
- Algorithmic Thinking
- Strong algorithm knowledge
- Efficient implementations
- Key: Show CS fundamentals
- System Design
- Proper architecture
- Scalability thinking
- Key: Balance correctness with efficiency
Summary
Key Takeaways
- Task Scheduler Architecture:
- Task queue, scheduler, executor
- Dependency management
- Resource management
- Scheduling Algorithms:
- FIFO (simple, fair)
- Priority (important first)
- Fair share (prevent starvation)
- Task Management:
- Dependencies and resolution
- Retry mechanisms
- Status tracking
- Implementation Considerations:
- Thread safety
- Resource constraints
- Task persistence
- Error handling
Design Principles
- Efficiency: Fast scheduling decisions
- Fairness: Prevent starvation
- Reliability: Handle failures, no task loss
- Flexibility: Support various scheduling policies
- Observability: Track status and metrics
Common Interview Questions
- Design a task scheduler
- Architecture and components
- Scheduling algorithms
- Dependency management
- Implement priority scheduling
- Priority queue implementation
- Starvation prevention
- Handle task dependencies
- Dependency graph
- Topological sort
- Trigger mechanism
Understanding task scheduler design is crucial for interviews focusing on:
- Operating systems
- Resource management
- Scheduling algorithms
- System programming
- Concurrent systems