Extends Task
The Runnable
class is a key component of the task library that provides a standardized
interface for executable tasks that can safely run in concurrent environments. It extends the base
Task
class to add execution state tracking, progress reporting, and cancellation support.
Runnable serves as the foundation for tasks that can be executed by the ThreadPool or independently.
This thread-safe implementation ensures that Runnable tasks can be safely executed, monitored, and controlled in multi-threaded applications, providing a robust foundation for concurrent programming.
The Runnable class is designed for thread-safe operation with the following guarantees:
class Runnable : public Task {
public:
// Constructor & destructor
Runnable();
virtual ~Runnable() = default;
// Task execution
void run();
std::future runAsync();
// State methods
bool isRunning() const;
void requestStop();
bool stopRequested() const;
protected:
// Implementation method (must be overridden by subclasses)
virtual void runImpl() = 0;
// Progress reporting helper
void reportProgress(float progress);
private:
std::atomic m_isRunning;
std::atomic m_stopRequested;
};
Runnable emits the following signals:
Signal | Type | Description | Arguments |
---|---|---|---|
started |
Simple | Emitted when the task begins execution | None |
finished |
Simple | Emitted when the task completes | None |
progress |
Data | Reports task completion percentage | float (0.0 to 1.0) |
log , warn , error |
Data | Standard logging signals | std::string (message) |
// Create a custom task by extending Runnable
class DataProcessor : public Runnable {
public:
DataProcessor(const std::string& name, const std::vector& data)
: m_name(name), m_data(data) {}
protected:
// Implement the required runImpl method
void runImpl() override {
emitString("log", m_name + " started processing " +
std::to_string(m_data.size()) + " items");
int sum = 0;
// Process each item with progress updates
for (size_t i = 0; i < m_data.size(); ++i) {
// Check if we should stop - IMPORTANT for thread safety
if (stopRequested()) {
emitString("warn", m_name + " stopped before completion");
return;
}
// Process this item
sum += m_data[i];
// Simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// Report progress (0.0 to 1.0)
float progress = static_cast(i + 1) / m_data.size();
reportProgress(progress);
// Log milestone progress
if ((i + 1) % (m_data.size() / 10) == 0 || i + 1 == m_data.size()) {
emitString("log", m_name + " processed " +
std::to_string(i + 1) + "/" +
std::to_string(m_data.size()) + " items");
}
}
// Report result
emitString("log", m_name + " completed with sum: " + std::to_string(sum));
}
private:
std::string m_name;
std::vector m_data;
};
// Using the task with thread-safe execution
void processDataConcurrently() {
// Create shared data and multiple task instances
std::vector data1 = {1, 2, 3, 4, 5};
std::vector data2 = {6, 7, 8, 9, 10};
auto processor1 = std::make_shared("Processor-A", data1);
auto processor2 = std::make_shared("Processor-B", data2);
// Create a logger to capture output from both processors
auto logger = std::make_shared();
logger->connectAllSignalsTo(processor1.get());
logger->connectAllSignalsTo(processor2.get());
// Create progress tracking
processor1->connect("progress", [](const ArgumentPack& args) {
float progress = args.get(0);
std::cout << "Processor-A: " << (progress * 100) << "%" << std::endl;
});
processor2->connect("progress", [](const ArgumentPack& args) {
float progress = args.get(0);
std::cout << "Processor-B: " << (progress * 100) << "%" << std::endl;
});
// Launch tasks asynchronously (in separate threads)
auto future1 = processor1->runAsync();
auto future2 = processor2->runAsync();
// Demonstrate cancellation from main thread
std::this_thread::sleep_for(std::chrono::milliseconds(20));
// Thread-safely request stop for one processor
processor1->requestStop();
std::cout << "Requested stop for Processor-A" << std::endl;
// Wait for both to complete
future1.wait();
future2.wait();
std::cout << "All processing completed" << std::endl;
}
A Runnable task goes through the following thread-safe lifecycle:
run()
or runAsync()
called, atomically sets
running=true, emits "started" signalrunImpl()
implementation runs on a single thread, may
report progressThis lifecycle can be repeated multiple times for the same task instance, as long as a task is not already
running when run()
is called.
The runAsync()
method provides thread-safe asynchronous execution using C++ futures:
// Start task in a background thread
std::future future = task->runAsync();
// Do other work in the current thread while task executes
// Wait for completion when needed
future.wait();
// Check if task is still running
if (task->isRunning()) {
std::cout << "Task is still in progress..." << std::endl;
}
// Wait with timeout (C++11)
if (future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
std::cout << "Task is taking longer than expected..." << std::endl;
task->requestStop(); // Request early termination
}
runAsync()
method uses std::async
with
std::launch::async
which guarantees execution on a separate thread. This is essential for
proper concurrency and preventing deadlocks when multiple tasks are executed.
Runnable tasks support thread-safe cancellation during execution:
requestStop()
to atomically
set the stop flagstopRequested()
which atomically reads the flagvoid runImpl() override {
// Initialize resources
std::vector results;
results.reserve(m_items.size());
for (size_t i = 0; i < m_items.size(); i++) {
// Thread-safe cancellation check
if (stopRequested()) {
emitString("log", "Operation cancelled after processing "
+ std::to_string(i) + " items");
// Clean up any resources if needed
// ...
return; // Exit early
}
// Perform work step
results.push_back(processItem(m_items[i]));
// Report progress
reportProgress(static_cast(i + 1) / m_items.size());
}
// Process final results if not cancelled
finalizeResults(results);
}
Runnable tasks are designed to work seamlessly with the ThreadPool for parallel execution:
// Create a thread pool
auto pool = std::make_shared();
// Add tasks to the pool
pool->add(std::make_unique());
pool->add(std::make_unique());
// Create and add a task in one step
auto task = pool->createAndAdd("Processor", data);
// Connect to signals for this specific task
task->connectSimple("finished", []() {
std::cout << "DataProcessor task completed" << std::endl;
});
// Execute all tasks in parallel
pool->exec();
// Or execute asynchronously
auto future = pool->run();
// Request stop for all tasks in the pool
pool->stopAll();
The ThreadPool manages the execution of all tasks, collecting results and providing centralized monitoring, all in a thread-safe manner.
Runnable implements automatic thread-safe error handling:
void run() {
if (m_isRunning.exchange(true)) {
emitString("warn", "Task is already running");
return;
}
// Emit started signal
emit("started");
try {
// Run the implementation
runImpl();
} catch (const std::exception& e) {
// Report error if an exception occurs
emitString("error", e.what());
} catch (...) {
// Handle unknown exceptions
emitString("error", "Unknown exception during task execution");
}
// Mark task as completed
m_isRunning.store(false);
// Emit finished signal
emit("finished");
}
This ensures that exceptions in tasks are properly captured and reported, preventing crashes and allowing for proper error handling even in concurrent scenarios.
std::atomic
for thread-safe accessstopRequested()
regularly during long-running
operations for responsive cancellationrunImpl()
methodreportProgress()
to provide feedback on execution
statusThe Runnable class ensures thread-safe state transitions between running, stopped, and idle states:
bool run() {
// Try to transition from idle to running (atomic)
bool expected = false;
if (!m_isRunning.compare_exchange_strong(expected, true)) {
// Task was already running
return false;
}
// We successfully transitioned to running state
emit("started");
try {
runImpl();
} catch (...) {
// Handle exceptions...
}
// Transition back to idle state (atomic)
m_isRunning.store(false);
emit("finished");
return true;
}
Runnable uses thread confinement to simplify concurrency management:
runImpl()
method executes on a
single thread at a time, eliminating the need for complex synchronization within the implementation. State
flags are safely accessed across threads using atomic operations.
The Runnable class provides a robust, thread-safe foundation for implementing executable tasks in concurrent applications. Its careful design ensures safe execution, cancellation, and monitoring across thread boundaries, without imposing excessive synchronization overhead on task implementations.
By adhering to the Runnable interface and following the documented thread-safety practices, developers can create reliable concurrent applications with predictable behavior and proper resource management. The integration with ThreadPool provides a scalable approach to parallel task execution in compute-intensive applications.