Introduction

A task scheduler is a system component that manages the execution of tasks (jobs) based on various criteria such as priority, time, dependencies, and resource availability. Unlike distributed job schedulers that coordinate across multiple machines, a single-machine task scheduler operates entirely on one system, managing tasks within process or thread boundaries.

Designing a task scheduler requires understanding scheduling algorithms, priority management, task dependencies, resource constraints, and efficient data structures for task queuing and execution. This is a common system design interview question that tests your knowledge of operating systems, algorithms, and resource management.

This guide covers the design of a single-machine task scheduler, including architecture, scheduling algorithms, task management, dependency resolution, and implementation patterns.

Table of Contents

  1. Problem Statement
  2. Requirements
  3. Core Components
  4. API Design
  5. High-Level Architecture
  6. Scheduling Algorithms
  7. Detailed Design
  8. Deep Dive
  9. Trade-offs and Design Decisions
  10. Summary

Problem Statement

Design a single-machine task scheduler that:

  1. Accepts tasks with various attributes (priority, schedule, dependencies)
  2. Schedules tasks based on priority, time, and dependencies
  3. Executes tasks using available resources (threads, processes)
  4. Manages task dependencies (execute B after A completes)
  5. Handles retries for failed tasks
  6. Tracks task status and execution history
  7. Supports different scheduling policies (FIFO, priority, fair share)

Scale Requirements:

  • Support 1K-1M tasks
  • Handle 100-10K tasks per second
  • Task execution time: milliseconds to hours
  • Low scheduling overhead: < 1% CPU
  • Memory efficient: < 100 bytes per task overhead

Requirements

Functional Requirements

Core Features:

  1. Task Submission: Submit tasks with metadata
  2. Task Scheduling: Schedule tasks based on criteria
  3. Task Execution: Execute tasks using executors
  4. Priority Management: Support task priorities
  5. Time-based Scheduling: Support cron, interval, one-time schedules
  6. Dependency Management: Handle task dependencies
  7. Retry Logic: Retry failed tasks
  8. Status Tracking: Track task status and history

Advanced Features:

  1. Resource Constraints: Limit tasks by CPU, memory
  2. Task Groups: Group related tasks
  3. Task Cancellation: Cancel pending/running tasks
  4. Task Pausing: Pause task execution
  5. Task Persistence: Persist tasks to disk
  6. Fair Scheduling: Prevent starvation

Non-Functional Requirements

Performance:

  • Task submission: < 1ms overhead
  • Scheduling decision: < 100μs
  • Low memory overhead per task
  • Efficient dependency resolution

Reliability:

  • No task loss
  • Guaranteed execution (if resources available)
  • Handle executor failures
  • Recover from crashes

Scalability:

  • Support 1K-1M tasks
  • Handle high submission rate
  • Efficient with many dependencies

Concurrency:

  • Thread-safe operations
  • Concurrent task execution
  • Safe dependency updates

Core Components

1. Task

  • Task definition and metadata
  • Execution information
  • Status and history

2. Task Queue

  • Stores pending tasks
  • Priority ordering
  • Dependency tracking

3. Scheduler

  • Decides which task to execute next
  • Implements scheduling algorithms
  • Manages scheduling policies

4. Executor

  • Executes tasks
  • Manages execution resources
  • Reports execution results

5. Dependency Manager

  • Tracks task dependencies
  • Resolves dependencies
  • Triggers dependent tasks

API Design

Task Scheduler API

class TaskScheduler:
    def __init__(self, 
                 executor: TaskExecutor,
                 max_concurrent_tasks: int = 10):
        """
        Initialize task scheduler.
        
        Args:
            executor: Task executor for running tasks
            max_concurrent_tasks: Maximum concurrent tasks
        """
        pass
    
    def submit(self, 
               task: Callable,
               task_id: str = None,
               priority: int = 0,
               schedule: Schedule = None,
               dependencies: List[str] = None,
               retry_policy: RetryPolicy = None,
               **kwargs) -> str:
        """
        Submit a task for scheduling.
        
        Args:
            task: Callable to execute
            task_id: Unique task ID (auto-generated if None)
            priority: Task priority (higher = more important)
            schedule: Schedule (cron, interval, one-time)
            dependencies: List of task IDs that must complete first
            retry_policy: Retry policy for failures
            
        Returns:
            Task ID
        """
        pass
    
    def cancel(self, task_id: str) -> bool:
        """
        Cancel a pending or running task.
        
        Returns:
            True if cancelled, False if not found or already completed
        """
        pass
    
    def get_status(self, task_id: str) -> TaskStatus:
        """Get task status."""
        pass
    
    def get_history(self, task_id: str) -> List[TaskExecution]:
        """Get task execution history."""
        pass
    
    def shutdown(self, wait: bool = True):
        """Shutdown scheduler."""
        pass

Task Definition

class Task:
    def __init__(self,
                 task_id: str,
                 callable: Callable,
                 args: tuple = (),
                 kwargs: dict = None,
                 priority: int = 0,
                 schedule: Schedule = None,
                 dependencies: List[str] = None,
                 retry_policy: RetryPolicy = None,
                 resource_requirements: ResourceRequirements = None):
        self.task_id = task_id
        self.callable = callable
        self.args = args
        self.kwargs = kwargs or {}
        self.priority = priority
        self.schedule = schedule
        self.dependencies = dependencies or []
        self.retry_policy = retry_policy
        self.resource_requirements = resource_requirements
        
        # Status
        self.status = TaskStatus.PENDING
        self.created_at = time.time()
        self.scheduled_at = None
        self.started_at = None
        self.completed_at = None
        
        # Execution history
        self.executions = []
        self.retry_count = 0

Schedule Types

class Schedule:
    pass

class OneTimeSchedule(Schedule):
    def __init__(self, execute_at: float):
        self.execute_at = execute_at

class IntervalSchedule(Schedule):
    def __init__(self, interval_seconds: float, start_time: float = None):
        self.interval_seconds = interval_seconds
        self.start_time = start_time or time.time()

class CronSchedule(Schedule):
    def __init__(self, cron_expression: str):
        self.cron_expression = cron_expression
        self.parser = CronParser(cron_expression)
    
    def next_run_time(self, current_time: float) -> float:
        return self.parser.get_next(current_time)

High-Level Architecture

┌─────────────────────────────────────────────────────────┐
│                  Application Layer                       │
│  (Submit tasks: submit(task), cancel(task_id))          │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│              Task Scheduler                               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │   Task       │  │   Scheduler  │  │   Dependency │  │
│  │   Queue      │  │   Engine     │  │   Manager    │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │   Task       │  │   Resource   │  │   Retry      │  │
│  │   Store      │  │   Manager    │  │   Manager    │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│              Task Executor                                │
│  (Thread pool, process pool, or custom executor)         │
└─────────────────────────────────────────────────────────┘

Scheduling Algorithms

1. FIFO (First In First Out)

Characteristics:

  • Simple implementation
  • Fair ordering
  • No priority support
  • May starve low-priority tasks

Use Cases:

  • Simple workloads
  • No priority requirements
  • Fair execution order

Implementation:

class FIFOScheduler:
    def __init__(self):
        self.queue = queue.Queue()
    
    def schedule(self, task: Task):
        self.queue.put(task)
    
    def get_next(self) -> Optional[Task]:
        try:
            return self.queue.get_nowait()
        except queue.Empty:
            return None

2. Priority Scheduling

Characteristics:

  • Higher priority tasks first
  • May starve low-priority tasks
  • Requires priority queue

Use Cases:

  • Priority-based workloads
  • Critical vs non-critical tasks

Implementation:

import heapq

class PriorityScheduler:
    def __init__(self):
        self.queue = []
        self.counter = 0  # For FIFO tie-breaking
        self.lock = threading.Lock()
    
    def schedule(self, task: Task):
        with self.lock:
            # Negative priority for max-heap (higher priority first)
            heapq.heappush(self.queue, (-task.priority, self.counter, task))
            self.counter += 1
    
    def get_next(self) -> Optional[Task]:
        with self.lock:
            if not self.queue:
                return None
            _, _, task = heapq.heappop(self.queue)
            return task

3. Fair Share Scheduling

Characteristics:

  • Prevents starvation
  • Fair resource distribution
  • More complex implementation

Use Cases:

  • Multiple users/groups
  • Need fairness guarantees

Implementation:

class FairShareScheduler:
    def __init__(self):
        self.queues = {}  # user_id -> queue
        self.weights = {}  # user_id -> weight
        self.last_scheduled = {}  # user_id -> last scheduled time
        self.lock = threading.Lock()
    
    def schedule(self, task: Task, user_id: str):
        with self.lock:
            if user_id not in self.queues:
                self.queues[user_id] = queue.Queue()
                self.weights[user_id] = 1.0
                self.last_scheduled[user_id] = 0
            
            self.queues[user_id].put(task)
    
    def get_next(self) -> Optional[Task]:
        with self.lock:
            if not self.queues:
                return None
            
            # Calculate virtual time for each user
            current_time = time.time()
            best_user = None
            best_virtual_time = float('inf')
            
            for user_id, user_queue in self.queues.items():
                if user_queue.empty():
                    continue
                
                # Virtual time = last_scheduled + (1 / weight)
                virtual_time = self.last_scheduled[user_id] + (1.0 / self.weights[user_id])
                
                if virtual_time < best_virtual_time:
                    best_virtual_time = virtual_time
                    best_user = user_id
            
            if best_user:
                task = self.queues[best_user].get_nowait()
                self.last_scheduled[best_user] = current_time
                return task
            
            return None

4. Round-Robin Scheduling

Characteristics:

  • Distribute tasks evenly
  • Simple implementation
  • Good for similar task sizes

Use Cases:

  • Similar task execution times
  • Need even distribution

Detailed Design

Task Queue Manager

class TaskQueueManager:
    def __init__(self, scheduler: Scheduler):
        self.scheduler = scheduler
        self.pending_tasks = {}  # task_id -> Task
        self.running_tasks = {}  # task_id -> Task
        self.completed_tasks = {}  # task_id -> Task
        self.lock = threading.Lock()
    
    def add_task(self, task: Task):
        """Add task to queue."""
        with self.lock:
            self.pending_tasks[task.task_id] = task
            self.scheduler.schedule(task)
    
    def get_next_ready_task(self) -> Optional[Task]:
        """Get next task ready for execution."""
        with self.lock:
            task = self.scheduler.get_next()
            
            if task and self._is_ready(task):
                del self.pending_tasks[task.task_id]
                self.running_tasks[task.task_id] = task
                task.status = TaskStatus.RUNNING
                task.started_at = time.time()
                return task
            
            return None
    
    def _is_ready(self, task: Task) -> bool:
        """Check if task is ready (dependencies met, schedule time reached)."""
        # Check dependencies
        for dep_id in task.dependencies:
            if dep_id not in self.completed_tasks:
                return False
            dep_task = self.completed_tasks[dep_id]
            if dep_task.status != TaskStatus.COMPLETED:
                return False
        
        # Check schedule
        if task.schedule:
            current_time = time.time()
            if isinstance(task.schedule, OneTimeSchedule):
                if current_time < task.schedule.execute_at:
                    return False
            elif isinstance(task.schedule, IntervalSchedule):
                if task.scheduled_at is None:
                    task.scheduled_at = task.schedule.start_time
                if current_time < task.scheduled_at:
                    return False
        
        return True
    
    def complete_task(self, task_id: str, success: bool, result: Any = None, error: Exception = None):
        """Mark task as completed."""
        with self.lock:
            if task_id not in self.running_tasks:
                return
            
            task = self.running_tasks[task_id]
            del self.running_tasks[task_id]
            
            task.completed_at = time.time()
            task.status = TaskStatus.COMPLETED if success else TaskStatus.FAILED
            task.result = result
            task.error = error
            
            self.completed_tasks[task_id] = task
            
            # Trigger dependent tasks
            self._trigger_dependent_tasks(task_id)
    
    def _trigger_dependent_tasks(self, completed_task_id: str):
        """Check and schedule tasks that depend on completed task."""
        for task in self.pending_tasks.values():
            if completed_task_id in task.dependencies:
                # Check if all dependencies are met
                if self._is_ready(task):
                    self.scheduler.schedule(task)

Scheduler

class TaskScheduler:
    def __init__(self, 
                 executor: TaskExecutor,
                 scheduler_type: str = "priority",
                 max_concurrent_tasks: int = 10):
        self.executor = executor
        self.max_concurrent_tasks = max_concurrent_tasks
        self.current_tasks = 0
        self.lock = threading.Lock()
        
        # Create scheduler based on type
        if scheduler_type == "fifo":
            self.scheduler = FIFOScheduler()
        elif scheduler_type == "priority":
            self.scheduler = PriorityScheduler()
        elif scheduler_type == "fair_share":
            self.scheduler = FairShareScheduler()
        else:
            raise ValueError(f"Unknown scheduler type: {scheduler_type}")
        
        # Task queue manager
        self.queue_manager = TaskQueueManager(self.scheduler)
        
        # Scheduler thread
        self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
        self.running = True
        self.scheduler_thread.start()
    
    def submit(self, task: Callable, **kwargs) -> str:
        """Submit task."""
        task_obj = Task(
            task_id=kwargs.get('task_id') or self._generate_task_id(),
            callable=task,
            **kwargs
        )
        self.queue_manager.add_task(task_obj)
        return task_obj.task_id
    
    def _scheduler_loop(self):
        """Main scheduler loop."""
        while self.running:
            with self.lock:
                if self.current_tasks >= self.max_concurrent_tasks:
                    time.sleep(0.01)  # Wait for slot
                    continue
            
            # Get next ready task
            task = self.queue_manager.get_next_ready_task()
            
            if task:
                with self.lock:
                    self.current_tasks += 1
                
                # Execute task
                self.executor.execute(
                    task,
                    callback=lambda t, success, result, error: self._task_completed(t, success, result, error)
                )
            else:
                time.sleep(0.1)  # No tasks, sleep briefly
    
    def _task_completed(self, task: Task, success: bool, result: Any, error: Exception):
        """Handle task completion."""
        with self.lock:
            self.current_tasks -= 1
        
        self.queue_manager.complete_task(task.task_id, success, result, error)
        
        # Handle retry if failed
        if not success and task.retry_policy:
            if task.retry_count < task.retry_policy.max_retries:
                task.retry_count += 1
                task.status = TaskStatus.PENDING
                # Reschedule with delay
                time.sleep(task.retry_policy.retry_delay)
                self.queue_manager.add_task(task)

Task Executor

class TaskExecutor:
    def __init__(self, executor_type: str = "thread_pool", num_workers: int = 10):
        if executor_type == "thread_pool":
            from concurrent.futures import ThreadPoolExecutor
            self.executor = ThreadPoolExecutor(max_workers=num_workers)
        elif executor_type == "process_pool":
            from concurrent.futures import ProcessPoolExecutor
            self.executor = ProcessPoolExecutor(max_workers=num_workers)
        else:
            raise ValueError(f"Unknown executor type: {executor_type}")
    
    def execute(self, task: Task, callback: Callable):
        """Execute task asynchronously."""
        future = self.executor.submit(self._execute_task, task)
        future.add_done_callback(lambda f: self._handle_result(f, task, callback))
    
    def _execute_task(self, task: Task):
        """Execute task and return result."""
        try:
            result = task.callable(*task.args, **task.kwargs)
            return (True, result, None)
        except Exception as e:
            return (False, None, e)
    
    def _handle_result(self, future, task: Task, callback: Callable):
        """Handle execution result."""
        try:
            success, result, error = future.result()
            callback(task, success, result, error)
        except Exception as e:
            callback(task, False, None, e)

Dependency Resolver

class DependencyResolver:
    def __init__(self, queue_manager: TaskQueueManager):
        self.queue_manager = queue_manager
        self.dependency_graph = {}  # task_id -> set of dependent task IDs
        self.lock = threading.Lock()
    
    def add_dependency(self, task_id: str, depends_on: str):
        """Add dependency relationship."""
        with self.lock:
            if depends_on not in self.dependency_graph:
                self.dependency_graph[depends_on] = set()
            self.dependency_graph[depends_on].add(task_id)
    
    def on_task_completed(self, completed_task_id: str):
        """Handle task completion and trigger dependents."""
        with self.lock:
            dependent_tasks = self.dependency_graph.get(completed_task_id, set())
            
            for dependent_task_id in dependent_tasks:
                # Check if all dependencies are met
                dependent_task = self.queue_manager.pending_tasks.get(dependent_task_id)
                if dependent_task and self._all_dependencies_met(dependent_task):
                    # Task is ready, schedule it
                    self.queue_manager.scheduler.schedule(dependent_task)
    
    def _all_dependencies_met(self, task: Task) -> bool:
        """Check if all dependencies are completed."""
        for dep_id in task.dependencies:
            if dep_id not in self.queue_manager.completed_tasks:
                return False
            dep_task = self.queue_manager.completed_tasks[dep_id]
            if dep_task.status != TaskStatus.COMPLETED:
                return False
        return True

Deep Dive

Priority Scheduling

Priority Levels:

  • Critical: 100
  • High: 50
  • Normal: 0
  • Low: -50

Implementation Considerations:

  • Use max-heap for priority queue
  • Tie-breaking with submission time
  • Prevent starvation of low-priority tasks

Task Dependencies

Dependency Types:

  1. Sequential: B runs after A completes
  2. Parallel: B and C run after A (no dependency between B and C)
  3. Conditional: B runs after A only if A succeeds

Dependency Resolution:

  • Build dependency graph
  • Topological sort for execution order
  • Track completion status
  • Trigger dependent tasks on completion

Retry Mechanisms

Retry Policies:

class RetryPolicy:
    def __init__(self,
                 max_retries: int = 3,
                 retry_delay: float = 1.0,
                 backoff_multiplier: float = 2.0,
                 max_delay: float = 60.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.backoff_multiplier = backoff_multiplier
        self.max_delay = max_delay
    
    def get_retry_delay(self, retry_count: int) -> float:
        """Calculate retry delay with exponential backoff."""
        delay = self.retry_delay * (self.backoff_multiplier ** retry_count)
        return min(delay, self.max_delay)

Resource Management

Resource Constraints:

class ResourceRequirements:
    def __init__(self, cpu: float = 1.0, memory: int = 0):
        self.cpu = cpu  # CPU cores
        self.memory = memory  # Memory in bytes

class ResourceManager:
    def __init__(self, total_cpu: float, total_memory: int):
        self.total_cpu = total_cpu
        self.total_memory = total_memory
        self.used_cpu = 0.0
        self.used_memory = 0
        self.lock = threading.Lock()
    
    def can_allocate(self, requirements: ResourceRequirements) -> bool:
        """Check if resources can be allocated."""
        with self.lock:
            return (self.used_cpu + requirements.cpu <= self.total_cpu and
                    self.used_memory + requirements.memory <= self.total_memory)
    
    def allocate(self, requirements: ResourceRequirements):
        """Allocate resources."""
        with self.lock:
            self.used_cpu += requirements.cpu
            self.used_memory += requirements.memory
    
    def deallocate(self, requirements: ResourceRequirements):
        """Deallocate resources."""
        with self.lock:
            self.used_cpu -= requirements.cpu
            self.used_memory -= requirements.memory

Task Persistence

Persistence Strategies:

  • In-memory only (fast, lost on crash)
  • File-based (simple, slower)
  • Database (reliable, more complex)

Implementation:

class TaskPersistence:
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.lock = threading.Lock()
    
    def save_task(self, task: Task):
        """Save task to disk."""
        with self.lock:
            filepath = os.path.join(self.storage_path, f"{task.task_id}.json")
            with open(filepath, 'w') as f:
                json.dump(task.to_dict(), f)
    
    def load_task(self, task_id: str) -> Task:
        """Load task from disk."""
        filepath = os.path.join(self.storage_path, f"{task_id}.json")
        with open(filepath, 'r') as f:
            data = json.load(f)
            return Task.from_dict(data)
    
    def load_all_tasks(self) -> List[Task]:
        """Load all tasks from disk."""
        tasks = []
        for filename in os.listdir(self.storage_path):
            if filename.endswith('.json'):
                task_id = filename[:-5]
                tasks.append(self.load_task(task_id))
        return tasks

Trade-offs and Design Decisions

FIFO vs Priority Scheduling

FIFO:

  • Pros: Simple, fair, predictable
  • Cons: No priority support, may starve important tasks
  • Use When: No priority requirements, need fairness

Priority:

  • Pros: Important tasks first, flexible
  • Cons: May starve low-priority tasks, more complex
  • Use When: Need priority handling, critical tasks exist

In-Memory vs Persistent Storage

In-Memory:

  • Pros: Fast, simple
  • Cons: Lost on crash
  • Use When: Can tolerate data loss, need speed

Persistent:

  • Pros: Survives crashes, reliable
  • Cons: Slower, more complex
  • Use When: Need reliability, critical tasks

Thread Pool vs Process Pool Executor

Thread Pool:

  • Pros: Lower overhead, shared memory
  • Cons: GIL limitations (Python), less isolation
  • Use When: I/O-bound tasks, need shared state

Process Pool:

  • Pros: True parallelism, isolation
  • Cons: Higher overhead, no shared memory
  • Use When: CPU-bound tasks, need isolation

What Interviewers Look For

Algorithm Knowledge

  1. Scheduling Algorithms
    • Understanding of FIFO, priority, fair share
    • When to use which algorithm
    • Starvation prevention
    • Red Flags: No algorithm knowledge, wrong choice
  2. Data Structures
    • Priority queue implementation
    • Task queue design
    • Efficient data structures
    • Red Flags: Inefficient structures, wrong choices

Problem-Solving Approach

  1. Dependency Management
    • Dependency graph modeling
    • Topological sort understanding
    • Dependency resolution
    • Red Flags: No dependency handling, incorrect resolution
  2. Resource Management
    • Resource constraints handling
    • Fair resource allocation
    • Red Flags: No resource management, unfair allocation
  3. Edge Cases
    • Task failures
    • Circular dependencies
    • Resource exhaustion
    • Red Flags: Ignoring edge cases, no handling

Code Quality

  1. State Management
    • Proper task state tracking
    • State transitions
    • Red Flags: Incorrect states, invalid transitions
  2. Error Handling
    • Retry mechanisms
    • Failure handling
    • Red Flags: No retry, no failure handling

Meta-Specific Focus

  1. Algorithmic Thinking
    • Strong algorithm knowledge
    • Efficient implementations
    • Key: Show CS fundamentals
  2. System Design
    • Proper architecture
    • Scalability thinking
    • Key: Balance correctness with efficiency

Summary

Key Takeaways

  1. Task Scheduler Architecture:
    • Task queue, scheduler, executor
    • Dependency management
    • Resource management
  2. Scheduling Algorithms:
    • FIFO (simple, fair)
    • Priority (important first)
    • Fair share (prevent starvation)
  3. Task Management:
    • Dependencies and resolution
    • Retry mechanisms
    • Status tracking
  4. Implementation Considerations:
    • Thread safety
    • Resource constraints
    • Task persistence
    • Error handling

Design Principles

  1. Efficiency: Fast scheduling decisions
  2. Fairness: Prevent starvation
  3. Reliability: Handle failures, no task loss
  4. Flexibility: Support various scheduling policies
  5. Observability: Track status and metrics

Common Interview Questions

  1. Design a task scheduler
    • Architecture and components
    • Scheduling algorithms
    • Dependency management
  2. Implement priority scheduling
    • Priority queue implementation
    • Starvation prevention
  3. Handle task dependencies
    • Dependency graph
    • Topological sort
    • Trigger mechanism

Understanding task scheduler design is crucial for interviews focusing on:

  • Operating systems
  • Resource management
  • Scheduling algorithms
  • System programming
  • Concurrent systems