Back to Index

Runnable

Extends Task

Contents

The Runnable class is a key component of the task library that provides a standardized interface for executable tasks that can safely run in concurrent environments. It extends the base Task class to add execution state tracking, progress reporting, and cancellation support. Runnable serves as the foundation for tasks that can be executed by the ThreadPool or independently.

This thread-safe implementation ensures that Runnable tasks can be safely executed, monitored, and controlled in multi-threaded applications, providing a robust foundation for concurrent programming.

Features

Thread Safety

The Runnable class is designed for thread-safe operation with the following guarantees:

Class Interface

class Runnable : public Task {
public:
    // Constructor & destructor
    Runnable();
    virtual ~Runnable() = default;
    
    // Task execution
    void run();
    std::future runAsync();
    
    // State methods
    bool isRunning() const;
    void requestStop();
    bool stopRequested() const;
    
protected:
    // Implementation method (must be overridden by subclasses)
    virtual void runImpl() = 0;
    
    // Progress reporting helper
    void reportProgress(float progress);
    
private:
    std::atomic m_isRunning;
    std::atomic m_stopRequested;
};

Signals

Runnable emits the following signals:

Signal Type Description Arguments
started Simple Emitted when the task begins execution None
finished Simple Emitted when the task completes None
progress Data Reports task completion percentage float (0.0 to 1.0)
log, warn, error Data Standard logging signals std::string (message)

Usage Example

Creating and Using a Runnable Task
// Create a custom task by extending Runnable
class DataProcessor : public Runnable {
public:
    DataProcessor(const std::string& name, const std::vector& data)
        : m_name(name), m_data(data) {}
    
protected:
    // Implement the required runImpl method
    void runImpl() override {
        emitString("log", m_name + " started processing " + 
                  std::to_string(m_data.size()) + " items");
        
        int sum = 0;
        
        // Process each item with progress updates
        for (size_t i = 0; i < m_data.size(); ++i) {
            // Check if we should stop - IMPORTANT for thread safety
            if (stopRequested()) {
                emitString("warn", m_name + " stopped before completion");
                return;
            }
            
            // Process this item
            sum += m_data[i];
            
            // Simulate work
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            
            // Report progress (0.0 to 1.0)
            float progress = static_cast(i + 1) / m_data.size();
            reportProgress(progress);
            
            // Log milestone progress
            if ((i + 1) % (m_data.size() / 10) == 0 || i + 1 == m_data.size()) {
                emitString("log", m_name + " processed " + 
                          std::to_string(i + 1) + "/" + 
                          std::to_string(m_data.size()) + " items");
            }
        }
        
        // Report result
        emitString("log", m_name + " completed with sum: " + std::to_string(sum));
    }
    
private:
    std::string m_name;
    std::vector m_data;
};
Thread-Safe Execution from Multiple Threads
// Using the task with thread-safe execution
void processDataConcurrently() {
    // Create shared data and multiple task instances
    std::vector data1 = {1, 2, 3, 4, 5};
    std::vector data2 = {6, 7, 8, 9, 10};
    
    auto processor1 = std::make_shared("Processor-A", data1);
    auto processor2 = std::make_shared("Processor-B", data2);
    
    // Create a logger to capture output from both processors
    auto logger = std::make_shared();
    logger->connectAllSignalsTo(processor1.get());
    logger->connectAllSignalsTo(processor2.get());
    
    // Create progress tracking
    processor1->connect("progress", [](const ArgumentPack& args) {
        float progress = args.get(0);
        std::cout << "Processor-A: " << (progress * 100) << "%" << std::endl;
    });
    
    processor2->connect("progress", [](const ArgumentPack& args) {
        float progress = args.get(0);
        std::cout << "Processor-B: " << (progress * 100) << "%" << std::endl;
    });
    
    // Launch tasks asynchronously (in separate threads)
    auto future1 = processor1->runAsync();
    auto future2 = processor2->runAsync();
    
    // Demonstrate cancellation from main thread
    std::this_thread::sleep_for(std::chrono::milliseconds(20));
    
    // Thread-safely request stop for one processor
    processor1->requestStop();
    std::cout << "Requested stop for Processor-A" << std::endl;
    
    // Wait for both to complete
    future1.wait();
    future2.wait();
    
    std::cout << "All processing completed" << std::endl;
}

Task Lifecycle

A Runnable task goes through the following thread-safe lifecycle:

  1. Construction: Internal state initialized with atomic flags (running=false, stopRequested=false)
  2. Configuration: Task-specific parameters set by calling its methods
  3. Execution Start: run() or runAsync() called, atomically sets running=true, emits "started" signal
  4. Task Execution: runImpl() implementation runs on a single thread, may report progress
  5. Execution End: Task completes, atomically sets running=false, emits "finished" signal
  6. Error Handling: If an exception occurs, it is safely caught, emits "error" signal with details

This lifecycle can be repeated multiple times for the same task instance, as long as a task is not already running when run() is called.

Asynchronous Execution

The runAsync() method provides thread-safe asynchronous execution using C++ futures:

Asynchronous Execution
// Start task in a background thread
std::future future = task->runAsync();

// Do other work in the current thread while task executes

// Wait for completion when needed
future.wait();

// Check if task is still running
if (task->isRunning()) {
    std::cout << "Task is still in progress..." << std::endl;
}

// Wait with timeout (C++11)
if (future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
    std::cout << "Task is taking longer than expected..." << std::endl;
    task->requestStop(); // Request early termination
}
Note: The runAsync() method uses std::async with std::launch::async which guarantees execution on a separate thread. This is essential for proper concurrency and preventing deadlocks when multiple tasks are executed.

Thread-Safe Cancellation Mechanism

Runnable tasks support thread-safe cancellation during execution:

  1. Requesting Cancellation: Any thread can call requestStop() to atomically set the stop flag
  2. Checking for Cancellation: The executing thread periodically calls stopRequested() which atomically reads the flag
  3. Handling Cancellation: If stop is requested, task can clean up and exit
Implementing Cancellation Support
void runImpl() override {
    // Initialize resources
    std::vector results;
    results.reserve(m_items.size());
    
    for (size_t i = 0; i < m_items.size(); i++) {
        // Thread-safe cancellation check
        if (stopRequested()) {
            emitString("log", "Operation cancelled after processing " 
                      + std::to_string(i) + " items");
            
            // Clean up any resources if needed
            // ...
            
            return; // Exit early
        }
        
        // Perform work step
        results.push_back(processItem(m_items[i]));
        
        // Report progress
        reportProgress(static_cast(i + 1) / m_items.size());
    }
    
    // Process final results if not cancelled
    finalizeResults(results);
}
Thread Safety: The cancellation mechanism is implemented using atomic operations, ensuring that stop requests and checks are thread-safe without requiring explicit locks. This allows for responsive cancellation even in high-throughput scenarios.

ThreadPool Integration

Runnable tasks are designed to work seamlessly with the ThreadPool for parallel execution:

Using Runnable Tasks with ThreadPool
// Create a thread pool
auto pool = std::make_shared();

// Add tasks to the pool
pool->add(std::make_unique());
pool->add(std::make_unique());

// Create and add a task in one step
auto task = pool->createAndAdd("Processor", data);

// Connect to signals for this specific task
task->connectSimple("finished", []() {
    std::cout << "DataProcessor task completed" << std::endl;
});

// Execute all tasks in parallel
pool->exec();

// Or execute asynchronously
auto future = pool->run();

// Request stop for all tasks in the pool
pool->stopAll();

The ThreadPool manages the execution of all tasks, collecting results and providing centralized monitoring, all in a thread-safe manner.

Thread-Safe Error Handling

Runnable implements automatic thread-safe error handling:

Exception Handling Implementation
void run() {
    if (m_isRunning.exchange(true)) {
        emitString("warn", "Task is already running");
        return;
    }

    // Emit started signal
    emit("started");

    try {
        // Run the implementation
        runImpl();
    } catch (const std::exception& e) {
        // Report error if an exception occurs
        emitString("error", e.what());
    } catch (...) {
        // Handle unknown exceptions
        emitString("error", "Unknown exception during task execution");
    }

    // Mark task as completed
    m_isRunning.store(false);

    // Emit finished signal
    emit("finished");
}

This ensures that exceptions in tasks are properly captured and reported, preventing crashes and allowing for proper error handling even in concurrent scenarios.

Thread Safety Implementation

Key Thread Safety Features

Thread-Safe Best Practices

Advanced Thread Safety Patterns

Thread-Safe State Transitions

The Runnable class ensures thread-safe state transitions between running, stopped, and idle states:

Thread-Safe State Machine
bool run() {
    // Try to transition from idle to running (atomic)
    bool expected = false;
    if (!m_isRunning.compare_exchange_strong(expected, true)) {
        // Task was already running
        return false;
    }
    
    // We successfully transitioned to running state
    emit("started");
    
    try {
        runImpl();
    } catch (...) {
        // Handle exceptions...
    }
    
    // Transition back to idle state (atomic)
    m_isRunning.store(false);
    emit("finished");
    
    return true;
}

Thread Confinement Pattern

Runnable uses thread confinement to simplify concurrency management:

Thread Confinement: Each Runnable instance's runImpl() method executes on a single thread at a time, eliminating the need for complex synchronization within the implementation. State flags are safely accessed across threads using atomic operations.

Summary

The Runnable class provides a robust, thread-safe foundation for implementing executable tasks in concurrent applications. Its careful design ensures safe execution, cancellation, and monitoring across thread boundaries, without imposing excessive synchronization overhead on task implementations.

By adhering to the Runnable interface and following the documented thread-safety practices, developers can create reliable concurrent applications with predictable behavior and proper resource management. The integration with ThreadPool provides a scalable approach to parallel task execution in compute-intensive applications.