Back to Index

ParallelAlgorithm

Inherits from: FlowAlgorithmAlgorithmTask

Contents

The ParallelAlgorithm class extends the FlowAlgorithm class to provide a framework for executing multiple jobs concurrently. It implements the job execution strategy using C++ futures and asynchronous execution to process each job in a separate thread. This approach maximizes performance on multi-core systems while maintaining the signal-based communication and progress reporting features of the base classes.

Features

Class Interface

class ParallelAlgorithm : public FlowAlgorithm {
public:
    // Constructor & destructor
    ParallelAlgorithm();
    virtual ~ParallelAlgorithm() = default;
    
    // Execute all jobs in parallel
    void exec(const ArgumentPack &args = {}) override;
    
    // Inherited from FlowAlgorithm
    virtual void doJob(const std::any &job) = 0; // Must be implemented by subclasses
};

Signals

ParallelAlgorithm emits the following signals:

Signal Type Description Arguments
job_started data Data Emitted when a job starts processing job index (size_t)
job_finished data Data Emitted when a job completes job index (size_t), success (bool)
progress data Data Reports overall execution progress float (0.0 to 1.0)
started simple Simple Indicates algorithm has started execution None
finished simple Simple Indicates algorithm has completed all jobs None
error data Data Reports errors during job execution std::string (error message)
log data Data Reports execution status std::string (log message)
warn data Data Reports warnings during execution std::string (warning message)

Usage Example

ParallelAlgorithm Implementation Example
// Create a custom parallel algorithm by extending the ParallelAlgorithm class
class ImageProcessor : public ParallelAlgorithm {
public:
    // Constructor registers additional signals if needed
    ImageProcessor() {
        createDataSignal("image_processed");
    }
    
    // Implementation of the required doJob method for processing a single job
    void doJob(const std::any& job) override {
        try {
            // Try to cast the job to the expected file path type
            std::string imagePath = std::any_cast(job);
            
            // Process the image
            emitString("log", "Processing image: " + imagePath);
            
            // Simulate image processing
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            
            // Image processing logic would go here...
            ImageData result;
            result.width = 1024;
            result.height = 768;
            result.path = imagePath;
            
            // Report successful processing
            ArgumentPack resultArgs;
            resultArgs.add(imagePath);
            resultArgs.add(result);
            emit("image_processed", resultArgs);
        } catch (const std::bad_any_cast& e) {
            throw std::runtime_error("Invalid job type: expected string path");
        } catch (const std::exception& e) {
            throw std::runtime_error("Error processing image: " + std::string(e.what()));
        }
    }
    
private:
    struct ImageData {
        std::string path;
        int width;
        int height;
    };
};
Using ParallelAlgorithm
// Using the parallel algorithm
void processImages() {
    // Create the algorithm
    auto processor = std::make_shared();
    
    // Add jobs to process
    processor->addJob(std::string("image1.jpg"));
    processor->addJob(std::string("image2.jpg"));
    processor->addJob(std::string("image3.jpg"));
    processor->addJob(std::string("image4.jpg"));
    processor->addJob(std::string("image5.jpg"));
    
    // Connect to signals
    processor->connectData("image_processed", [](const ArgumentPack& args) {
        std::string path = args.get(0);
        std::cout << "Processed image: " << path << std::endl;
    });
    
    processor->connectData("progress", [](const ArgumentPack& args) {
        float progress = args.get(0);
        std::cout << "Overall progress: " << (progress * 100) << "%" << std::endl;
    });
    
    processor->connectData("job_started", [](const ArgumentPack& args) {
        size_t jobIndex = args.get(0);
        std::cout << "Starting job " << jobIndex << std::endl;
    });
    
    processor->connectData("job_finished", [](const ArgumentPack& args) {
        size_t jobIndex = args.get(0);
        bool success = args.get(1);
        std::cout << "Job " << jobIndex << (success ? " completed successfully" : " failed") << std::endl;
    });
    
    processor->connectData("error", [](const ArgumentPack& args) {
        std::string error = args.get(0);
        std::cerr << "ERROR: " << error << std::endl;
    });
    
    // Execute asynchronously
    auto future = processor->run();
    
    // Wait for completion if needed
    future.wait();
    
    std::cout << "All images processed" << std::endl;
}

Parallel Execution Model

The ParallelAlgorithm implements a parallel execution model:

  1. Job Preparation: Before execution starts, all jobs are prepared and validated
  2. Thread Creation: A separate thread is created for each job using std::async
  3. Concurrent Execution: All jobs execute simultaneously on different threads
  4. Progress Tracking: Progress is reported as the percentage of completed jobs
  5. Result Collection: The algorithm waits for all threads to complete or be cancelled
  6. Cleanup: Resources are properly released even if execution is interrupted

The implementation uses std::future<void> to track job completion and handle exceptions.

Parallel Execution Implementation
void ParallelAlgorithm::exec(const ArgumentPack &args) {
    if (m_jobs.empty()) {
        emitString("log", "No jobs to execute");
        emit("finished");
        return;
    }

    emit("started");
    emitString("log", "Starting parallel execution of " +
                          std::to_string(m_jobs.size()) + " jobs");

    // Create futures for each job
    std::vector> futures;
    futures.reserve(m_jobs.size());

    // Launch each job in a separate thread
    for (size_t i = 0; i < m_jobs.size(); ++i) {
        const auto &job = m_jobs[i];
        futures.push_back(std::async(std::launch::async, [this, job, i]() {
            if (stopRequested()) {
                ArgumentPack jobArgs;
                jobArgs.add(i);
                emit("job_started", jobArgs);
                emitString("warn", "Job " + std::to_string(i) +
                                       " skipped due to stop request");
                return;
            }

            try {
                // Signal that job is starting
                ArgumentPack startArgs;
                startArgs.add(i);
                emit("job_started", startArgs);

                // Execute the job
                this->doJob(job);

                // Signal that job is finished
                ArgumentPack finishArgs;
                finishArgs.add(i);
                finishArgs.add(true); // Success
                emit("job_finished", finishArgs);

                // Report progress
                float progress = static_cast(i + 1) /
                                 static_cast(m_jobs.size());
                reportProgress(progress);

            } catch (const std::exception &e) {
                // Create an ArgumentPack for the error message
                ArgumentPack errorArgs;
                errorArgs.add("Job " + std::to_string(i) +
                                           " failed: " + e.what());
                emit("error", errorArgs);

                // Signal that job is finished with error
                ArgumentPack finishArgs;
                finishArgs.add(i);
                finishArgs.add(false); // Failure
                emit("job_finished", finishArgs);
            }
        }));
    }

    // Wait for all jobs to complete
    for (auto &future : futures) {
        if (stopRequested()) {
            emitString("warn", "Execution stopped by user request");
            break;
        }
        future.wait();
    }

    // If we were requested to stop, some futures might still be running
    if (stopRequested()) {
        emitString("log", "Waiting for running jobs to complete...");

        // We still need to wait for futures that might be running
        for (auto &future : futures) {
            future.wait();
        }
    }

    emitString("log", "Parallel execution completed");
    emit("finished");
}

Job Processing

The ParallelAlgorithm executes each job in the following sequence:

  1. Pre-Execution Check: Verify if execution should continue or has been stopped
  2. Job Start Signal: Emit job_started signal with the job index
  3. Job Execution: Call the doJob() method with the job data
  4. Job Completion Signal: Emit job_finished signal with success status
  5. Progress Update: Update and report overall progress based on completed job count

Each job is processed in its own thread, allowing for truly parallel execution.

Cancellation Handling

The ParallelAlgorithm supports cancellation through the stopRequested() mechanism:

  1. Cancellation Request: Client code calls stop() to request cancellation
  2. Cancellation Check: Jobs check stopRequested() before starting
  3. Graceful Shutdown: Already running jobs are allowed to complete
  4. Final Wait: Algorithm waits for all running jobs to finish before returning

This allows for responsive cancellation while ensuring clean resource handling.

Error Handling

Errors in individual jobs are handled carefully to prevent one job failure from affecting others:

  1. Exception Catching: Each job thread catches and reports exceptions
  2. Error Signal: Exceptions trigger an error signal with details
  3. Job Failure Signal: The job_finished signal includes success status (false for errors)
  4. Continued Execution: Other jobs continue executing despite individual failures
  5. Exception Propagation: Job exceptions don't propagate to the calling thread

This approach allows robust error handling while maintaining parallel execution.

Best Practices

Note: The ParallelAlgorithm uses std::async with std::launch::async to create true parallel execution. Each job executes in its own thread, which can lead to high resource usage if a large number of jobs are queued. Consider the system capabilities when determining the number of parallel jobs.