Extends Algorithm
The ThreadPool
class is a specialized Algorithm that manages a collection of
Runnable
tasks and executes them in parallel across multiple threads. It provides an
efficient way to distribute workloads across multiple processor cores, maximizing resource utilization
while maintaining a clean interface for monitoring execution progress and managing task lifecycles.
This thread-safe implementation ensures that parallel tasks can be safely executed, monitored, and controlled in multi-threaded applications, providing a robust foundation for high-performance concurrent programming.
The ThreadPool class provides comprehensive thread safety with the following guarantees:
class ThreadPool : public Algorithm {
public:
// Constructor & destructor
explicit ThreadPool(bool verbose = true);
~ThreadPool() = default;
// Task management
void add(std::unique_ptr runnable);
template
T* createAndAdd(Args&&... args);
// Information methods
unsigned int size() const;
static unsigned int maxThreadCount();
// Execute all tasks in the pool
void exec(const ArgumentPack &args = {}) override;
// Utility methods
void connectLoggerToAll(Task* logger);
void stopAll();
protected:
// Access to managed runnables
const std::vector>& runnables() const;
private:
std::vector> m_runnables;
bool m_verbose;
};
ThreadPool emits the following signals:
Signal | Type | Description | Arguments |
---|---|---|---|
started |
Simple | Indicates pool execution has started | None |
finished |
Simple | Indicates pool execution has completed | None |
progress |
Data | Reports overall execution progress | float (0.0 to 1.0) |
stats |
Data | Reports execution statistics | int64_t (execution time in ms), unsigned int (number of tasks) |
log , warn , error |
Data | Forwarded log messages from tasks | std::string (message) |
// Create a thread pool
auto pool = std::make_shared();
// Create a logger and connect it to the pool
auto logger = std::make_shared("PoolLogger");
logger->connectAllSignalsTo(pool.get());
// Create and add tasks using the template method
auto task1 = pool->createAndAdd("Task1", 1000);
auto task2 = pool->createAndAdd("Task2", 2000);
auto task3 = pool->createAndAdd("Task3", 1500);
// Alternatively, create and add tasks separately
auto task4 = std::make_unique("Task4", 3000);
pool->add(std::move(task4));
// Connect to statistics signal
pool->connectData("stats", [](const ArgumentPack& args) {
int64_t executionTime = args.get(0);
unsigned int taskCount = args.get(1);
std::cout << "Executed " << taskCount << " tasks in "
<< executionTime << " ms ("
<< static_cast(executionTime) / taskCount
<< " ms per task average)" << std::endl;
});
// Execute all tasks in parallel
pool->exec();
// Alternative: execute asynchronously
auto future = pool->run();
// Do other work while tasks are running
// Check if we need to stop tasks
if (userRequestedStop) {
pool->stopAll();
}
// Wait for completion
future.wait();
// Create multiple thread pools for different workload types
auto cpuPool = std::make_shared();
auto ioPool = std::make_shared();
// CPU-bound tasks
for (int i = 0; i < 4; i++) {
cpuPool->createAndAdd("Compute-" + std::to_string(i), data[i]);
}
// I/O-bound tasks
for (int i = 0; i < 8; i++) {
ioPool->createAndAdd("IO-" + std::to_string(i), files[i]);
}
// Start both pools asynchronously from the main thread
auto cpuFuture = cpuPool->run();
auto ioFuture = ioPool->run();
// Monitor progress from another thread
std::thread monitorThread([cpuPool, ioPool]() {
while (cpuPool->isRunning() || ioPool->isRunning()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Thread-safely check status and update UI
std::cout << "CPU tasks running: " << (cpuPool->isRunning() ? "Yes" : "No") << std::endl;
std::cout << "I/O tasks running: " << (ioPool->isRunning() ? "Yes" : "No") << std::endl;
}
});
// Wait for both pools to complete their work
cpuFuture.wait();
ioFuture.wait();
// Join the monitor thread
monitorThread.join();
std::cout << "All work completed" << std::endl;
The ThreadPool operates through the following thread-safe mechanisms:
add()
or createAndAdd<T>()
Runnable
to provide a consistent interfaceexec()
is called, the pool starts all tasks using std::async
void ThreadPool::exec(const ArgumentPack &args) {
if (m_runnables.empty()) {
emitString("warn", "ThreadPool is empty, nothing to execute");
return;
}
std::vector> futures;
auto start = std::chrono::steady_clock::now();
unsigned int taskCount = m_runnables.size();
// Report starting information
std::stringstream ss;
ss << "Starting execution of " << taskCount << " tasks";
emitString("log", ss.str());
// Start progress reporting
float progressStep = 1.0f / static_cast(taskCount);
reportProgress(0.0f);
// Launch tasks in parallel with async - each task gets its own thread
for (const auto &runnable : m_runnables) {
futures.push_back(
std::async(std::launch::async, &Runnable::run, runnable.get()));
}
// Wait for all tasks to complete
for (size_t i = 0; i < futures.size(); ++i) {
futures[i].get();
// Update progress after each task completes
reportProgress((i + 1) * progressStep);
}
auto end = std::chrono::steady_clock::now();
auto diffMs =
std::chrono::duration_cast(end - start)
.count();
// Create stats information
ArgumentPack statsArgs;
statsArgs.add(diffMs); // Execution time in ms
statsArgs.add(taskCount); // Number of executed tasks
emit("stats", statsArgs);
if (m_verbose) {
std::stringstream ss;
ss << "Executed " << taskCount << " tasks in " << diffMs << " ms ("
<< static_cast(diffMs) / taskCount
<< " ms per task average)";
emitString("log", ss.str());
}
}
The ThreadPool handles thread creation and management automatically with these key features:
maxThreadCount()
static method returns the
number of available hardware threadsstd::async
with std::launch::async
policy for automatic thread managementstd::thread::hardware_concurrency()
to determine the optimal number of
threads. This ensures efficient execution across different systems.
The ThreadPool takes ownership of tasks with clear ownership semantics:
std::unique_ptr<Runnable>
in the poolcreateAndAdd<T>()
method returns a raw pointer for conveniencecreateAndAdd<T>()
createAndAdd<T>()
are non-owning
pointers. They're valid as long as the ThreadPool exists, but should never be deleted manually.
The ThreadPool provides a coordinated way to safely stop all running tasks:
// Stop all tasks in the pool
pool->stopAll();
// Implementation of stopAll() - Thread-safe cancellation
void ThreadPool::stopAll() {
for (const auto &runnable : m_runnables) {
if (runnable->isRunning()) {
runnable->requestStop();
}
}
emitString("log", "Stop requested for all running tasks");
}
This calls requestStop()
on all runnables in the pool. Tasks should check
stopRequested()
regularly and exit gracefully when requested.
stopAll()
method can be safely called from any
thread, even while tasks are running in parallel threads. The stop requests are made using atomic operations
that allow for safe communication between threads.
The connectLoggerToAll()
method provides a convenient way to connect a logger to all tasks in
the pool with thread safety:
// Connect a logger to all tasks
auto logger = std::make_shared();
pool->connectLoggerToAll(logger.get());
// Implementation
void ThreadPool::connectLoggerToAll(Task *logger) {
if (!logger)
return;
// Use lambda functions to properly forward signals to the logger
// Thread-safely connect the logger to the ThreadPool
connectData("log", [logger](const ArgumentPack &args) {
logger->emit("log", args);
});
connectData("warn", [logger](const ArgumentPack &args) {
logger->emit("warn", args);
});
connectData("error", [logger](const ArgumentPack &args) {
logger->emit("error", args);
});
// Thread-safely connect the logger to each Runnable
for (const auto &runnable : m_runnables) {
runnable->connectData("log", [logger](const ArgumentPack &args) {
logger->emit("log", args);
});
runnable->connectData("warn", [logger](const ArgumentPack &args) {
logger->emit("warn", args);
});
runnable->connectData("error", [logger](const ArgumentPack &args) {
logger->emit("error", args);
});
}
}
This ensures that log messages from all threads and tasks are properly captured and processed in a thread-safe manner.
If tasks need to communicate results, use thread-safe mechanisms such as:
// Thread-safe result collector
class ResultCollector : public Task {
public:
void addResult(const Result& result) {
std::lock_guard lock(m_mutex);
m_results.push_back(result);
// Emit notification about the new result
ArgumentPack args;
args.add(m_results.size());
emit("resultAdded", args);
}
std::vector getResults() const {
std::lock_guard lock(m_mutex);
return m_results;
}
private:
mutable std::mutex m_mutex;
std::vector m_results;
};
// Using with ThreadPool
auto collector = std::make_shared();
// Create tasks that use the collector
for (int i = 0; i < 10; i++) {
auto task = pool->createAndAdd(data[i], collector);
}
For systems with varying workloads, dynamically adjusting the thread count can improve efficiency:
// Create task batches based on system capabilities
unsigned int optimalTaskCount = ThreadPool::maxThreadCount();
// For CPU-intensive tasks, match the number of cores
if (taskType == TaskType::CPU_INTENSIVE) {
optimalTaskCount = ThreadPool::maxThreadCount();
}
// For I/O-bound tasks, use more threads as they'll often be waiting
else if (taskType == TaskType::IO_BOUND) {
optimalTaskCount = ThreadPool::maxThreadCount() * 2;
}
// Create the appropriate number of tasks
for (int i = 0; i < optimalTaskCount; i++) {
pool->createAndAdd(workBatches[i]);
}
For more complex workloads, implement a work stealing pattern for better load balancing:
// Create a thread-safe work queue
class WorkQueue : public Task {
public:
void addWork(const WorkItem& item) {
std::lock_guard lock(m_mutex);
m_queue.push(item);
// Signal new work available
emit("workAdded");
}
std::optional getNextWork() {
std::lock_guard lock(m_mutex);
if (m_queue.empty()) {
return std::nullopt;
}
WorkItem item = m_queue.front();
m_queue.pop();
return item;
}
bool hasWork() const {
std::lock_guard lock(m_mutex);
return !m_queue.empty();
}
private:
mutable std::mutex m_mutex;
std::queue m_queue;
};
// Worker task that processes items until queue is empty
class WorkerTask : public Runnable {
public:
WorkerTask(std::shared_ptr queue) : m_queue(queue) {}
protected:
void runImpl() override {
while (!stopRequested()) {
// Try to get work
auto workItem = m_queue->getNextWork();
if (!workItem) {
// No more work
break;
}
// Process the work item
processItem(*workItem);
}
}
private:
std::shared_ptr m_queue;
};
// Using the work queue with ThreadPool
auto workQueue = std::make_shared();
// Fill the queue with work
for (const auto& item : workItems) {
workQueue->addWork(item);
}
// Create worker tasks in the pool
unsigned int workerCount = ThreadPool::maxThreadCount();
for (unsigned int i = 0; i < workerCount; i++) {
pool->createAndAdd(workQueue);
}
// Execute all workers in parallel
pool->exec();
ThreadPool works well with other task library components in a thread-safe manner:
The ThreadPool class provides a robust, thread-safe foundation for parallel task execution in high-performance applications. Its careful design ensures safe execution, cancellation, and monitoring across multiple threads, without imposing excessive synchronization overhead on task implementations.
By leveraging modern C++ concurrency primitives like futures, atomic operations, and proper thread management, the ThreadPool enables developers to harness the full power of multi-core processors while maintaining code clarity and safety. The integration with the SignalSlot system provides comprehensive monitoring capabilities to track execution progress and performance across all parallel tasks.