Evolution of a Task Scheduler in Go

14 min read #go #golang

I’ve been thinking about how to implement a task scheduler in pure Go. Task schedulers are fundamental to many production systems: background job queues, cron-like services, webhook delivery systems, and API retry mechanisms. The core requirement is clear: execute tasks at specified future times. But the implementation choices reveal important tradeoffs between CPU efficiency, concurrency, and resource management.

This post explores four task scheduler implementations in Go, each navigating trade-offs between timing precision, concurrency models, and resource management. Starting with a synchronous dynamic sleep scheduler, we experiment with different approaches (fixed-interval polling, unbounded concurrency, worker pools) before arriving at a heap-based design that combines the strengths of earlier explorations.

The complete source code for all implementations is available at github.com/hackebrot/go-task-scheduler.

The Contract

Every scheduler implementation shares a common contract defined by two interfaces:

// Scheduler manages and executes tasks at their scheduled times.
type Scheduler interface {
	// Schedule adds a task to the scheduler.
	Schedule(task Task)

	// Start begins the scheduler's background processing.
	// It runs until the provided context is cancelled.
	Start(ctx context.Context)

	// PendingTasksCount returns the number of tasks waiting to be executed.
	PendingTasksCount() int
}
// Task represents a schedulable unit of work with an execution time.
type Task interface {
	// Execute runs the task and returns an error if the execution fails.
	Execute() error

	// ExecuteAt returns the time when this task should be executed.
	ExecuteAt() time.Time

	// ID returns a unique identifier for this task (used for logging).
	ID() string
}

The full interface definitions are available in pkg/scheduler.

Creating Schedulers

While all schedulers implement the same interface, their instantiation varies based on configuration needs. Each package provides a NewScheduler() constructor with parameters appropriate for its design:

Basic Usage

Here’s how to use any scheduler:

const maxTimeOffset = 20

func main() {
	scheduler := NewScheduler()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for n := 1; n <= 10; n++ {
		offsetSeconds := rand.Intn(maxTimeOffset)
		offset := time.Now().Add(time.Duration(offsetSeconds) * time.Second)
		taskId := fmt.Sprintf("fib%d-+%ds", n, offsetSeconds)
		task := fib.NewTask(taskId, n, fibonacci.NewRecursive(), offset)
		scheduler.Schedule(task)
	}

	scheduler.Start(ctx)
}

This example schedules 10 Fibonacci calculation tasks with random execution times. Each task uses a recursive Fibonacci implementation (fibonacci.NewRecursive()), intentionally compute-intensive to demonstrate scheduler behavior under load, and runs at a randomly chosen offset between 0-20 seconds in the future. See github.com/hackebrot/go-fibonacci for the Fibonacci code.

Dynamic Sleep: Smart Waiting

The first scheduler calculates sleep duration dynamically to avoid wasting CPU cycles. It scans all pending tasks, finds the earliest execution time, and sleeps exactly until then. Upon waking, it executes the batch of ready tasks sequentially before scanning again.

// dynamicSleepScheduler is a task scheduler that dynamically calculates sleep duration until the next task.
type dynamicSleepScheduler struct {
	tasks []scheduler.Task
	mu    sync.Mutex
}

// batch represents a group of tasks ready for execution and the duration to sleep until the next batch.
type batch struct {
	ready         []scheduler.Task
	sleepDuration time.Duration
}

The batch struct groups ready tasks with the calculated sleep duration for the next wake cycle.

// Start runs the scheduler loop, executing tasks as they become ready until all tasks complete or context is cancelled.
func (s *dynamicSleepScheduler) Start(ctx context.Context) {
	slog.Info("starting dynamic sleep task scheduler", "count_tasks", s.PendingTasksCount())

	for {
		if s.PendingTasksCount() == 0 {
			slog.Info("no remaining tasks, stopping scheduler")
			return
		}

		b := s.nextBatch()

		for _, task := range b.ready {
			s.executeTask(task)
		}

		select {
		case <-ctx.Done():
			slog.Info("context cancelled, stopping scheduler")
			return
		case <-time.After(b.sleepDuration):
		}
	}
}

The main loop partitions tasks, executes ready ones sequentially, then sleeps until the next task is due.

// nextBatch partitions tasks into ready and remaining, returning ready tasks and sleep duration until next task.
func (s *dynamicSleepScheduler) nextBatch() batch {
	s.mu.Lock()
	defer s.mu.Unlock()

	ready := make([]scheduler.Task, 0, len(s.tasks))
	remaining := make([]scheduler.Task, 0, len(s.tasks))
	var nextTime time.Time
	hasNext := false

	now := time.Now()

	for _, task := range s.tasks {
		if !now.Before(task.ExecuteAt()) {
			ready = append(ready, task)
			continue
		}

		remaining = append(remaining, task)
		if !hasNext {
			nextTime = task.ExecuteAt()
			hasNext = true
		} else if task.ExecuteAt().Before(nextTime) {
			nextTime = task.ExecuteAt()
		}
	}

	s.tasks = remaining

	var sleepDuration time.Duration
	if hasNext {
		sleepDuration = max(time.Until(nextTime), 0)
	} else {
		sleepDuration = defaultCheckInterval
	}

	return batch{
		ready:         ready,
		sleepDuration: sleepDuration,
	}
}

The core logic: one scan partitions tasks by readiness and finds the earliest remaining task, providing both what to execute now and how long to sleep.

// executeTask runs a single task and logs any errors that occur.
func (s *dynamicSleepScheduler) executeTask(task scheduler.Task) {
	if err := task.Execute(); err != nil {
		slog.Error("error executing task", "task_id", task.ID(), "error", err)
	}
}

Approach: CPU-efficient waiting without fixed polling intervals.

Limitation: Sequential task execution. Each task blocks the next, making the scheduler unsuitable for workloads with varying task durations.

Polling: Concurrent Execution

The polling scheduler explores an alternative timing approach: a ticker that checks for ready tasks at fixed intervals. This implementation also introduces concurrent execution by spawning a goroutine for each ready task.

// pollingScheduler is a task scheduler that checks for ready tasks at fixed intervals.
type pollingScheduler struct {
	tasks         []scheduler.Task
	mu            sync.Mutex
	checkInterval time.Duration
}
// Start runs the scheduler loop, checking for ready tasks at regular intervals until all tasks complete or context is cancelled.
func (s *pollingScheduler) Start(ctx context.Context) {
	slog.Info("starting polling task scheduler", "count_tasks", s.PendingTasksCount())

	ticker := time.NewTicker(s.checkInterval)
	defer ticker.Stop()

	for {
		if s.PendingTasksCount() == 0 {
			slog.Info("no remaining tasks, stopping scheduler")
			return
		}

		select {
		case <-ctx.Done():
			slog.Info("context cancelled, stopping scheduler")
			return
		case <-ticker.C:
			s.executeReadyTasks()
		}
	}
}

A ticker (time.Ticker) triggers periodic scans at the configured interval (checkInterval), trading timing precision for a simpler scheduling mechanism.

// executeReadyTasks identifies and executes all tasks that are ready to run, updating the task list with remaining tasks.
func (s *pollingScheduler) executeReadyTasks() {
	s.mu.Lock()
	defer s.mu.Unlock()

	now := time.Now()
	remaining := make([]scheduler.Task, 0, len(s.tasks))

	for _, task := range s.tasks {
		if now.Before(task.ExecuteAt()) {
			remaining = append(remaining, task)
		} else {
			go s.executeTask(task)
		}
	}

	s.tasks = remaining
}

// executeTask runs a single task and logs any errors that occur.
func (s *pollingScheduler) executeTask(task scheduler.Task) {
	if err := task.Execute(); err != nil {
		slog.Error("error executing task", "task_id", task.ID(), "error", err)
	}
}

Each ready task spawns its own goroutine, enabling parallel execution at the cost of unbounded concurrency.

Design change: Uses a ticker for fixed-interval checks instead of dynamic sleep calculation.

Key improvement: Tasks execute in parallel, eliminating sequential bottlenecks.

Limitation: Unbounded goroutines. During bursts of ready tasks, the scheduler spawns potentially thousands of goroutines, risking resource exhaustion and unpredictable performance.

Worker Pool: Bounded Concurrency

The worker pool scheduler addresses runaway goroutine creation with a fixed-size worker pool. Ready tasks are dispatched to workers via buffered channels (make(chan scheduler.Task, bufSize)), providing backpressure when workers are saturated.

// workerpoolScheduler is a task scheduler that uses a pool of workers to execute ready tasks concurrently.
type workerpoolScheduler struct {
	tasks         []scheduler.Task
	mu            sync.Mutex
	checkInterval time.Duration
	workerCount   int
	wg            sync.WaitGroup
	readyTasks    chan scheduler.Task
	results       chan scheduler.TaskResult
}
// TaskResult represents the outcome of a task execution.
type TaskResult struct {
	TaskID    string
	Error     error
	StartTime time.Time
	EndTime   time.Time
	WorkerID  string
}

The struct adds concurrency primitives: channels for task distribution and results, plus a sync.WaitGroup for coordinated shutdown. The TaskResult struct captures execution metadata for each completed task.

// Start runs the scheduler loop, executing ready tasks via a worker pool until all tasks complete or context is cancelled.
func (s *workerpoolScheduler) Start(ctx context.Context) {
	slog.Info("starting workerpool task scheduler", "count_tasks", s.PendingTasksCount())

	// Start worker pool
	s.wg.Add(s.workerCount)
	for i := range s.workerCount {
		go s.runWorker(i)
	}

	// Start task scanner
	ticker := time.NewTicker(s.checkInterval)
	defer ticker.Stop()

	for {
		if s.PendingTasksCount() == 0 {
			slog.Info("no remaining tasks, stopping scheduler")
			s.shutdown()
			return
		}

		select {
		case <-ctx.Done():
			slog.Info("context cancelled, stopping scheduler")
			s.shutdown()
			return
		case <-ticker.C:
			s.dispatchReadyTasks(ctx)
		}
	}
}

The scheduler runs two parallel components: a pool of workers consuming from readyTasks, and a ticker-based scanner that dispatches ready tasks to the pool.

// runWorker processes tasks from the readyTasks channel and sends results.
func (s *workerpoolScheduler) runWorker(id int) {
	defer s.wg.Done()

	for task := range s.readyTasks {
		result := s.executeTask(task, id)
		s.results <- result
	}
}

// executeTask runs a single task and returns a result with timing and any errors that occur.
func (s *workerpoolScheduler) executeTask(task scheduler.Task, workerID int) scheduler.TaskResult {
	startTime := time.Now()
	err := task.Execute()
	endTime := time.Now()

	return scheduler.TaskResult{
		TaskID:    task.ID(),
		Error:     err,
		StartTime: startTime,
		EndTime:   endTime,
		WorkerID:  strconv.Itoa(workerID),
	}
}

Workers block on the readyTasks channel, executing tasks concurrently while capturing timing and error information.

// dispatchReadyTasks sends ready tasks to the worker pool and updates the task list with remaining tasks.
func (s *workerpoolScheduler) dispatchReadyTasks(ctx context.Context) {
	s.mu.Lock()
	defer s.mu.Unlock()

	now := time.Now()
	remaining := make([]scheduler.Task, 0, len(s.tasks))

	for _, task := range s.tasks {
		if now.Before(task.ExecuteAt()) {
			remaining = append(remaining, task)
		} else {
			select {
			case s.readyTasks <- task:
			case <-ctx.Done():
				remaining = append(remaining, task)
				s.tasks = remaining
				return
			}
		}
	}

	s.tasks = remaining
}

Key improvements:

  • Bounded resource usage with configurable workerCount (int)
  • Structured task results with timing information via results (chan scheduler.TaskResult)
  • Graceful shutdown coordination using wg (sync.WaitGroup)

Limitation: Still uses fixed-interval polling, which either wastes CPU cycles (short intervals) or delays task execution (long intervals).

Heap: Optimal Efficiency

The final scheduler combines the best of dynamic sleep and worker pools while replacing the task slice with a min-heap. The heap maintains tasks ordered by execution time, making next-task lookups O(log n) instead of O(n).

// heapScheduler is a task scheduler that uses a min-heap to efficiently manage task priorities
// and a worker pool to execute ready tasks concurrently.
type heapScheduler struct {
	tasks       *taskHeap
	mu          sync.Mutex
	workerCount int
	wg          sync.WaitGroup
	readyTasks  chan scheduler.Task
	results     chan scheduler.TaskResult
}

The struct replaces the task slice with a *taskHeap (a min-heap) while keeping the worker pool infrastructure from the previous scheduler, including the TaskResult struct for execution metadata.

Heap Implementation

Go’s container/heap package requires implementing the heap.Interface:

// taskHeap implements heap.Interface for scheduler.Task, ordered by ExecuteAt time.
type taskHeap []scheduler.Task

func (h taskHeap) Len() int { return len(h) }

func (h taskHeap) Less(i, j int) bool {
	return h[i].ExecuteAt().Before(h[j].ExecuteAt())
}

func (h taskHeap) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *taskHeap) Push(x any) {
	*h = append(*h, x.(scheduler.Task))
}

func (h *taskHeap) Pop() any {
	old := *h
	n := len(old)
	task := old[n-1]
	*h = old[0 : n-1]
	return task
}

The Less() method orders tasks by execution time, ensuring the earliest task is always at index 0.

// Start runs the scheduler loop, executing ready tasks via a worker pool until all tasks complete or context is cancelled.
func (s *heapScheduler) Start(ctx context.Context) {
	slog.Info("starting heap task scheduler", "count_tasks", s.PendingTasksCount())

	// Start worker pool
	s.wg.Add(s.workerCount)
	for i := range s.workerCount {
		go s.runWorker(i)
	}

	// Start task scanner with dynamic sleep based on next task time
	for {
		if s.PendingTasksCount() == 0 {
			slog.Info("no remaining tasks, stopping scheduler")
			s.shutdown()
			return
		}

		select {
		case <-ctx.Done():
			slog.Info("context cancelled, stopping scheduler")
			s.shutdown()
			return
		default:
			sleepDuration := s.dispatchReadyTasks(ctx)
			if sleepDuration > 0 {
				timer := time.NewTimer(sleepDuration)
				select {
				case <-ctx.Done():
					timer.Stop()
					slog.Info("context cancelled, stopping scheduler")
					s.shutdown()
					return
				case <-timer.C:
					// Continue to next iteration
				}
			}
		}
	}
}

Unlike the worker pool scheduler’s fixed-interval ticker, this uses a dynamically calculated sleep duration returned by dispatchReadyTasks(), sleeping exactly until the next task is ready.

The runWorker() and executeTask() methods are identical to the worker pool implementation.

Dispatch with Heap Operations

The dispatch logic leverages the heap to efficiently find the next task:

// dispatchReadyTasks sends ready tasks to the worker pool and returns the duration to sleep until the next task.
// Returns the sleep duration until the next task, or 0 if the heap is now empty (allowing immediate shutdown check).
func (s *heapScheduler) dispatchReadyTasks(ctx context.Context) time.Duration {
	s.mu.Lock()
	defer s.mu.Unlock()

	now := time.Now()
	dispatched := false

	// The heap keeps tasks sorted by ExecuteAt, so we only need to check the top
	for s.tasks.Len() > 0 {
		nextTask := (*s.tasks)[0]

		if now.Before(nextTask.ExecuteAt()) {
			return nextTask.ExecuteAt().Sub(now)
		}

		task := heap.Pop(s.tasks).(scheduler.Task)
		dispatched = true

		select {
		case s.readyTasks <- task:
		case <-ctx.Done():
			heap.Push(s.tasks, task)
			return 0
		}
	}

	// Heap is empty. Return 0 so Start() can immediately check PendingTasksCount and shutdown.
	if dispatched {
		return 0
	}

	// Defensive: heap was already empty (shouldn't happen since Start checks PendingTasksCount first)
	return 100 * time.Millisecond
}

By peeking at the heap’s top element (index 0), the scheduler calculates the exact sleep duration until the next task without scanning all pending tasks.

Key improvements:

  • Heap-based priority queue (*taskHeap) eliminates linear scans (O(log n) vs O(n))
  • Dynamic sleep duration via time.NewTimer() (no polling overhead)
  • Bounded concurrency with workerCount and results channel (chan scheduler.TaskResult)
  • Optimal CPU usage: sleeps exactly until the next task is ready

Advanced Usage: Result Analysis

The worker pool and heap schedulers provide a results channel for tracking task execution:

func main() {
	scheduler := NewScheduler(runtime.NumCPU(), 10)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Consume results in background
	done := make(chan struct{})
	go func() {
		defer close(done)
		processResults(scheduler.Results())
	}()

	// Schedule tasks
	for n := 1; n <= 10; n++ {
		offsetSeconds := rand.Intn(20)
		offset := time.Now().Add(time.Duration(offsetSeconds) * time.Second)
		taskId := fmt.Sprintf("fib%d-+%ds", n, offsetSeconds)
		task := fib.NewTask(taskId, n, fibonacci.NewRecursive(), offset)
		scheduler.Schedule(task)
	}

	scheduler.Start(ctx)
	<-done  // Wait for result processing
}

The results processor computes execution statistics:

// processResults logs task results and computes summary statistics for successful executions.
func processResults(results <-chan scheduler.TaskResult) {
	var durations []time.Duration
	for result := range results {
		duration := result.EndTime.Sub(result.StartTime)
		args := []any{
			"task_id", result.TaskID,
			"worker_id", result.WorkerID,
			"duration_microseconds", duration.Microseconds(),
		}

		if result.Error != nil {
			args = append(args, "error", result.Error)
			slog.Error("error executing task", args...)
		} else {
			durations = append(durations, duration)
			slog.Info("task completed", args...)
		}
	}

	if len(durations) > 0 {
		slices.Sort(durations)

		var total time.Duration
		for _, d := range durations {
			total += d
		}
		// Convert len to time.Duration for division to get mean duration
		mean := total / time.Duration(len(durations))
		median := durations[len(durations)/2]

		args := []any{
			"count", len(durations),
			"mean_microseconds", mean.Microseconds(),
			"median_microseconds", median.Microseconds(),
			"min_microseconds", durations[0].Microseconds(),
			"max_microseconds", durations[len(durations)-1].Microseconds(),
		}

		// Percentiles require sufficient samples to be meaningful (1% of 100 = 1 sample)
		if len(durations) >= 100 {
			p95 := durations[int(float64(len(durations)-1)*0.95)]
			p99 := durations[int(float64(len(durations)-1)*0.99)]
			args = append(args,
				"p95_microseconds", p95.Microseconds(),
				"p99_microseconds", p99.Microseconds(),
			)
		}

		slog.Info("task execution summary", args...)
	}
}

Example output from running the heap scheduler (scroll horizontally to see full log lines):

2026/01/06 10:25:38 INFO starting heap task scheduler count_tasks=10
2026/01/06 10:25:38 INFO starting computation task_id=fib2-+0s n=2
2026/01/06 10:25:38 INFO computation complete task_id=fib2-+0s n=2 result=1
2026/01/06 10:25:38 INFO task completed task_id=fib2-+0s worker_id=5 duration_microseconds=34
2026/01/06 10:25:41 INFO starting computation task_id=fib10-+3s n=10
2026/01/06 10:25:41 INFO computation complete task_id=fib10-+3s n=10 result=55
2026/01/06 10:25:41 INFO starting computation task_id=fib5-+3s n=5
2026/01/06 10:25:41 INFO computation complete task_id=fib5-+3s n=5 result=5
2026/01/06 10:25:41 INFO starting computation task_id=fib6-+3s n=6
2026/01/06 10:25:41 INFO computation complete task_id=fib6-+3s n=6 result=8
2026/01/06 10:25:41 INFO task completed task_id=fib10-+3s worker_id=7 duration_microseconds=265
2026/01/06 10:25:41 INFO task completed task_id=fib5-+3s worker_id=11 duration_microseconds=357
2026/01/06 10:25:41 INFO task completed task_id=fib6-+3s worker_id=6 duration_microseconds=369
2026/01/06 10:25:43 INFO starting computation task_id=fib4-+5s n=4
2026/01/06 10:25:43 ERROR error executing task task_id=fib4-+5s worker_id=8 duration_microseconds=282 error="computation failed: n=4 is not supported"
2026/01/06 10:25:44 INFO starting computation task_id=fib8-+6s n=8
2026/01/06 10:25:44 INFO computation complete task_id=fib8-+6s n=8 result=21
2026/01/06 10:25:44 INFO task completed task_id=fib8-+6s worker_id=9 duration_microseconds=252
2026/01/06 10:25:53 INFO starting computation task_id=fib9-+15s n=9
2026/01/06 10:25:53 INFO computation complete task_id=fib9-+15s n=9 result=34
2026/01/06 10:25:53 INFO task completed task_id=fib9-+15s worker_id=10 duration_microseconds=323
2026/01/06 10:25:54 INFO starting computation task_id=fib1-+16s n=1
2026/01/06 10:25:54 INFO computation complete task_id=fib1-+16s n=1 result=1
2026/01/06 10:25:54 INFO task completed task_id=fib1-+16s worker_id=0 duration_microseconds=283
2026/01/06 10:25:55 INFO starting computation task_id=fib3-+17s n=3
2026/01/06 10:25:55 INFO computation complete task_id=fib3-+17s n=3 result=2
2026/01/06 10:25:55 INFO task completed task_id=fib3-+17s worker_id=3 duration_microseconds=358
2026/01/06 10:25:57 INFO no remaining tasks, stopping scheduler
2026/01/06 10:25:57 INFO starting computation task_id=fib7-+19s n=7
2026/01/06 10:25:57 INFO computation complete task_id=fib7-+19s n=7 result=13
2026/01/06 10:25:57 INFO task completed task_id=fib7-+19s worker_id=1 duration_microseconds=230
2026/01/06 10:25:57 INFO task execution summary count=9 mean_microseconds=275 median_microseconds=283 min_microseconds=34 max_microseconds=369

This pattern provides the foundation for performance monitoring, error tracking, and SLA verification when building production schedulers.

Comparison

SchedulerExecution ModelTime ComplexitySpace ComplexityBest For
Dynamic SleepSequentialO(n) per batchO(n)Low concurrency requirements
PollingConcurrent (unbounded)O(n) per intervalO(n)Low task counts, minimal code complexity
Worker PoolConcurrent (bounded)O(n) per intervalO(n)Controlled concurrency, result tracking
HeapConcurrent (bounded)O(log n) per dispatchO(n)High task volumes, production systems*

*Production use requires additional work: persistence, retries, observability, graceful shutdown, and more.

Conclusion

The four scheduler implementations navigate different tradeoffs in task scheduling. Dynamic sleep eliminated busy-waiting through calculated sleep durations but executed tasks sequentially. Polling explored fixed-interval timing as an alternative dispatch mechanism while introducing concurrent execution with unbounded goroutines. Worker pools constrained concurrency and added result tracking but still relied on periodic polling. The heap-based scheduler uses a priority queue for efficient task selection, dynamic sleep timing, and bounded concurrency, combining the most effective techniques from earlier iterations while eliminating their key limitations.

All implementations are available at github.com/hackebrot/go-task-scheduler.