Back to Index

ThreadPool

Extends Algorithm

Contents

The ThreadPool class is a specialized Algorithm that manages a collection of Runnable tasks and executes them in parallel across multiple threads. It provides an efficient way to distribute workloads across multiple processor cores, maximizing resource utilization while maintaining a clean interface for monitoring execution progress and managing task lifecycles.

This thread-safe implementation ensures that parallel tasks can be safely executed, monitored, and controlled in multi-threaded applications, providing a robust foundation for high-performance concurrent programming.

Features

Thread Safety

The ThreadPool class provides comprehensive thread safety with the following guarantees:

Class Interface

class ThreadPool : public Algorithm {
public:
    // Constructor & destructor
    explicit ThreadPool(bool verbose = true);
    ~ThreadPool() = default;
    
    // Task management
    void add(std::unique_ptr runnable);
    
    template 
    T* createAndAdd(Args&&... args);
    
    // Information methods
    unsigned int size() const;
    static unsigned int maxThreadCount();
    
    // Execute all tasks in the pool
    void exec(const ArgumentPack &args = {}) override;
    
    // Utility methods
    void connectLoggerToAll(Task* logger);
    void stopAll();
    
protected:
    // Access to managed runnables
    const std::vector>& runnables() const;
    
private:
    std::vector> m_runnables;
    bool m_verbose;
};

Signals

ThreadPool emits the following signals:

Signal Type Description Arguments
started Simple Indicates pool execution has started None
finished Simple Indicates pool execution has completed None
progress Data Reports overall execution progress float (0.0 to 1.0)
stats Data Reports execution statistics int64_t (execution time in ms), unsigned int (number of tasks)
log, warn, error Data Forwarded log messages from tasks std::string (message)

Usage Example

Basic Thread Pool Usage
// Create a thread pool
auto pool = std::make_shared();

// Create a logger and connect it to the pool
auto logger = std::make_shared("PoolLogger");
logger->connectAllSignalsTo(pool.get());

// Create and add tasks using the template method
auto task1 = pool->createAndAdd("Task1", 1000);
auto task2 = pool->createAndAdd("Task2", 2000);
auto task3 = pool->createAndAdd("Task3", 1500);

// Alternatively, create and add tasks separately
auto task4 = std::make_unique("Task4", 3000);
pool->add(std::move(task4));

// Connect to statistics signal
pool->connectData("stats", [](const ArgumentPack& args) {
    int64_t executionTime = args.get(0);
    unsigned int taskCount = args.get(1);
    
    std::cout << "Executed " << taskCount << " tasks in " 
              << executionTime << " ms ("
              << static_cast(executionTime) / taskCount 
              << " ms per task average)" << std::endl;
});

// Execute all tasks in parallel
pool->exec();

// Alternative: execute asynchronously
auto future = pool->run();

// Do other work while tasks are running

// Check if we need to stop tasks
if (userRequestedStop) {
    pool->stopAll();
}

// Wait for completion
future.wait();
Thread-Safe Multi-Pool Coordination
// Create multiple thread pools for different workload types
auto cpuPool = std::make_shared();
auto ioPool = std::make_shared();

// CPU-bound tasks
for (int i = 0; i < 4; i++) {
    cpuPool->createAndAdd("Compute-" + std::to_string(i), data[i]);
}

// I/O-bound tasks
for (int i = 0; i < 8; i++) {
    ioPool->createAndAdd("IO-" + std::to_string(i), files[i]);
}

// Start both pools asynchronously from the main thread
auto cpuFuture = cpuPool->run();
auto ioFuture = ioPool->run();

// Monitor progress from another thread
std::thread monitorThread([cpuPool, ioPool]() {
    while (cpuPool->isRunning() || ioPool->isRunning()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        // Thread-safely check status and update UI
        std::cout << "CPU tasks running: " << (cpuPool->isRunning() ? "Yes" : "No") << std::endl;
        std::cout << "I/O tasks running: " << (ioPool->isRunning() ? "Yes" : "No") << std::endl;
    }
});

// Wait for both pools to complete their work
cpuFuture.wait();
ioFuture.wait();

// Join the monitor thread
monitorThread.join();

std::cout << "All work completed" << std::endl;

How It Works

The ThreadPool operates through the following thread-safe mechanisms:

  1. Task Collection:
    • Tasks are added to the pool via add() or createAndAdd<T>()
    • All tasks must inherit from Runnable to provide a consistent interface
    • Tasks are stored in a synchronized data structure for thread safety
  2. Parallel Execution:
    • When exec() is called, the pool starts all tasks using std::async
    • Each task runs in its own thread, with the system managing thread distribution
    • The pool uses futures to track task completion and handle exceptions
    • Thread communication happens through futures and atomic operations
  3. Progress Tracking:
    • Overall progress is calculated as tasks complete
    • Progress updates are synchronized to prevent data races
    • Each completed task contributes equally to overall progress
  4. Signal Forwarding:
    • Log messages from tasks are forwarded to pool listeners through thread-safe signal mechanism
    • This allows centralized logging and monitoring across all threads
  5. Task Management:
    • The pool maintains ownership of tasks via unique pointers with clear ownership semantics
    • Tasks remain valid until the pool is destroyed
    • Pool controls task execution and can halt all tasks if needed
Thread-Safe Execution Implementation
void ThreadPool::exec(const ArgumentPack &args) {
    if (m_runnables.empty()) {
        emitString("warn", "ThreadPool is empty, nothing to execute");
        return;
    }

    std::vector> futures;
    auto start = std::chrono::steady_clock::now();

    unsigned int taskCount = m_runnables.size();

    // Report starting information
    std::stringstream ss;
    ss << "Starting execution of " << taskCount << " tasks";
    emitString("log", ss.str());

    // Start progress reporting
    float progressStep = 1.0f / static_cast(taskCount);
    reportProgress(0.0f);

    // Launch tasks in parallel with async - each task gets its own thread
    for (const auto &runnable : m_runnables) {
        futures.push_back(
            std::async(std::launch::async, &Runnable::run, runnable.get()));
    }

    // Wait for all tasks to complete
    for (size_t i = 0; i < futures.size(); ++i) {
        futures[i].get();
        // Update progress after each task completes
        reportProgress((i + 1) * progressStep);
    }

    auto end = std::chrono::steady_clock::now();
    auto diffMs =
        std::chrono::duration_cast(end - start)
            .count();

    // Create stats information
    ArgumentPack statsArgs;
    statsArgs.add(diffMs);         // Execution time in ms
    statsArgs.add(taskCount); // Number of executed tasks
    emit("stats", statsArgs);

    if (m_verbose) {
        std::stringstream ss;
        ss << "Executed " << taskCount << " tasks in " << diffMs << " ms ("
           << static_cast(diffMs) / taskCount
           << " ms per task average)";
        emitString("log", ss.str());
    }
}

Thread Management

The ThreadPool handles thread creation and management automatically with these key features:

Automatic Hardware Detection: The ThreadPool automatically adapts to the hardware it's running on by using std::thread::hardware_concurrency() to determine the optimal number of threads. This ensures efficient execution across different systems.

Task Ownership

The ThreadPool takes ownership of tasks with clear ownership semantics:

Warning: Raw pointers returned by createAndAdd<T>() are non-owning pointers. They're valid as long as the ThreadPool exists, but should never be deleted manually.

Thread-Safe Task Stopping

The ThreadPool provides a coordinated way to safely stop all running tasks:

Stopping All Tasks
// Stop all tasks in the pool
pool->stopAll();

// Implementation of stopAll() - Thread-safe cancellation
void ThreadPool::stopAll() {
    for (const auto &runnable : m_runnables) {
        if (runnable->isRunning()) {
            runnable->requestStop();
        }
    }
    emitString("log", "Stop requested for all running tasks");
}

This calls requestStop() on all runnables in the pool. Tasks should check stopRequested() regularly and exit gracefully when requested.

Thread Safety Guarantee: The stopAll() method can be safely called from any thread, even while tasks are running in parallel threads. The stop requests are made using atomic operations that allow for safe communication between threads.

Thread-Safe Logger Connection

The connectLoggerToAll() method provides a convenient way to connect a logger to all tasks in the pool with thread safety:

Thread-Safe Logger Connection
// Connect a logger to all tasks
auto logger = std::make_shared();
pool->connectLoggerToAll(logger.get());

// Implementation
void ThreadPool::connectLoggerToAll(Task *logger) {
    if (!logger)
        return;

    // Use lambda functions to properly forward signals to the logger
    // Thread-safely connect the logger to the ThreadPool
    connectData("log", [logger](const ArgumentPack &args) {
        logger->emit("log", args);
    });
    connectData("warn", [logger](const ArgumentPack &args) {
        logger->emit("warn", args);
    });
    connectData("error", [logger](const ArgumentPack &args) {
        logger->emit("error", args);
    });

    // Thread-safely connect the logger to each Runnable
    for (const auto &runnable : m_runnables) {
        runnable->connectData("log", [logger](const ArgumentPack &args) {
            logger->emit("log", args);
        });
        runnable->connectData("warn", [logger](const ArgumentPack &args) {
            logger->emit("warn", args);
        });
        runnable->connectData("error", [logger](const ArgumentPack &args) {
            logger->emit("error", args);
        });
    }
}

This ensures that log messages from all threads and tasks are properly captured and processed in a thread-safe manner.

Performance Considerations

Thread Safety Implementation

Key Thread Safety Features

Thread-Safe Best Practices

Advanced Thread Safety Patterns

Thread-Safe Task Communication

If tasks need to communicate results, use thread-safe mechanisms such as:

Thread-Safe Result Aggregation
// Thread-safe result collector
class ResultCollector : public Task {
public:
    void addResult(const Result& result) {
        std::lock_guard lock(m_mutex);
        m_results.push_back(result);
        
        // Emit notification about the new result
        ArgumentPack args;
        args.add(m_results.size());
        emit("resultAdded", args);
    }
    
    std::vector getResults() const {
        std::lock_guard lock(m_mutex);
        return m_results;
    }
    
private:
    mutable std::mutex m_mutex;
    std::vector m_results;
};

// Using with ThreadPool
auto collector = std::make_shared();

// Create tasks that use the collector
for (int i = 0; i < 10; i++) {
    auto task = pool->createAndAdd(data[i], collector);
}

Optimal Thread Count Management

For systems with varying workloads, dynamically adjusting the thread count can improve efficiency:

Adaptive Thread Count
// Create task batches based on system capabilities
unsigned int optimalTaskCount = ThreadPool::maxThreadCount();

// For CPU-intensive tasks, match the number of cores
if (taskType == TaskType::CPU_INTENSIVE) {
    optimalTaskCount = ThreadPool::maxThreadCount();
} 
// For I/O-bound tasks, use more threads as they'll often be waiting
else if (taskType == TaskType::IO_BOUND) {
    optimalTaskCount = ThreadPool::maxThreadCount() * 2;
}

// Create the appropriate number of tasks
for (int i = 0; i < optimalTaskCount; i++) {
    pool->createAndAdd(workBatches[i]);
}

Work Stealing Pattern

For more complex workloads, implement a work stealing pattern for better load balancing:

Thread-Safe Work Queue
// Create a thread-safe work queue
class WorkQueue : public Task {
public:
    void addWork(const WorkItem& item) {
        std::lock_guard lock(m_mutex);
        m_queue.push(item);
        
        // Signal new work available
        emit("workAdded");
    }
    
    std::optional getNextWork() {
        std::lock_guard lock(m_mutex);
        if (m_queue.empty()) {
            return std::nullopt;
        }
        
        WorkItem item = m_queue.front();
        m_queue.pop();
        return item;
    }
    
    bool hasWork() const {
        std::lock_guard lock(m_mutex);
        return !m_queue.empty();
    }
    
private:
    mutable std::mutex m_mutex;
    std::queue m_queue;
};

// Worker task that processes items until queue is empty
class WorkerTask : public Runnable {
public:
    WorkerTask(std::shared_ptr queue) : m_queue(queue) {}
    
protected:
    void runImpl() override {
        while (!stopRequested()) {
            // Try to get work
            auto workItem = m_queue->getNextWork();
            if (!workItem) {
                // No more work
                break;
            }
            
            // Process the work item
            processItem(*workItem);
        }
    }
    
private:
    std::shared_ptr m_queue;
};

// Using the work queue with ThreadPool
auto workQueue = std::make_shared();

// Fill the queue with work
for (const auto& item : workItems) {
    workQueue->addWork(item);
}

// Create worker tasks in the pool
unsigned int workerCount = ThreadPool::maxThreadCount();
for (unsigned int i = 0; i < workerCount; i++) {
    pool->createAndAdd(workQueue);
}

// Execute all workers in parallel
pool->exec();

Integration with Other Components

ThreadPool works well with other task library components in a thread-safe manner:

Summary

The ThreadPool class provides a robust, thread-safe foundation for parallel task execution in high-performance applications. Its careful design ensures safe execution, cancellation, and monitoring across multiple threads, without imposing excessive synchronization overhead on task implementations.

By leveraging modern C++ concurrency primitives like futures, atomic operations, and proper thread management, the ThreadPool enables developers to harness the full power of multi-core processors while maintaining code clarity and safety. The integration with the SignalSlot system provides comprehensive monitoring capabilities to track execution progress and performance across all parallel tasks.