Inherits from: FlowAlgorithm → Algorithm → Task
The ParallelAlgorithm
class extends the FlowAlgorithm
class to provide a framework for executing multiple jobs concurrently. It implements the job execution strategy using C++ futures and asynchronous execution to process each job in a separate thread. This approach maximizes performance on multi-core systems while maintaining the signal-based communication and progress reporting features of the base classes.
std::any
mechanismclass ParallelAlgorithm : public FlowAlgorithm {
public:
// Constructor & destructor
ParallelAlgorithm();
virtual ~ParallelAlgorithm() = default;
// Execute all jobs in parallel
void exec(const ArgumentPack &args = {}) override;
// Inherited from FlowAlgorithm
virtual void doJob(const std::any &job) = 0; // Must be implemented by subclasses
};
ParallelAlgorithm emits the following signals:
Signal | Type | Description | Arguments |
---|---|---|---|
job_started data | Data | Emitted when a job starts processing | job index (size_t) |
job_finished data | Data | Emitted when a job completes | job index (size_t), success (bool) |
progress data | Data | Reports overall execution progress | float (0.0 to 1.0) |
started simple | Simple | Indicates algorithm has started execution | None |
finished simple | Simple | Indicates algorithm has completed all jobs | None |
error data | Data | Reports errors during job execution | std::string (error message) |
log data | Data | Reports execution status | std::string (log message) |
warn data | Data | Reports warnings during execution | std::string (warning message) |
// Create a custom parallel algorithm by extending the ParallelAlgorithm class
class ImageProcessor : public ParallelAlgorithm {
public:
// Constructor registers additional signals if needed
ImageProcessor() {
createDataSignal("image_processed");
}
// Implementation of the required doJob method for processing a single job
void doJob(const std::any& job) override {
try {
// Try to cast the job to the expected file path type
std::string imagePath = std::any_cast(job);
// Process the image
emitString("log", "Processing image: " + imagePath);
// Simulate image processing
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Image processing logic would go here...
ImageData result;
result.width = 1024;
result.height = 768;
result.path = imagePath;
// Report successful processing
ArgumentPack resultArgs;
resultArgs.add(imagePath);
resultArgs.add(result);
emit("image_processed", resultArgs);
} catch (const std::bad_any_cast& e) {
throw std::runtime_error("Invalid job type: expected string path");
} catch (const std::exception& e) {
throw std::runtime_error("Error processing image: " + std::string(e.what()));
}
}
private:
struct ImageData {
std::string path;
int width;
int height;
};
};
// Using the parallel algorithm
void processImages() {
// Create the algorithm
auto processor = std::make_shared();
// Add jobs to process
processor->addJob(std::string("image1.jpg"));
processor->addJob(std::string("image2.jpg"));
processor->addJob(std::string("image3.jpg"));
processor->addJob(std::string("image4.jpg"));
processor->addJob(std::string("image5.jpg"));
// Connect to signals
processor->connectData("image_processed", [](const ArgumentPack& args) {
std::string path = args.get(0);
std::cout << "Processed image: " << path << std::endl;
});
processor->connectData("progress", [](const ArgumentPack& args) {
float progress = args.get(0);
std::cout << "Overall progress: " << (progress * 100) << "%" << std::endl;
});
processor->connectData("job_started", [](const ArgumentPack& args) {
size_t jobIndex = args.get(0);
std::cout << "Starting job " << jobIndex << std::endl;
});
processor->connectData("job_finished", [](const ArgumentPack& args) {
size_t jobIndex = args.get(0);
bool success = args.get(1);
std::cout << "Job " << jobIndex << (success ? " completed successfully" : " failed") << std::endl;
});
processor->connectData("error", [](const ArgumentPack& args) {
std::string error = args.get(0);
std::cerr << "ERROR: " << error << std::endl;
});
// Execute asynchronously
auto future = processor->run();
// Wait for completion if needed
future.wait();
std::cout << "All images processed" << std::endl;
}
The ParallelAlgorithm implements a parallel execution model:
std::async
The implementation uses std::future<void>
to track job completion and handle exceptions.
void ParallelAlgorithm::exec(const ArgumentPack &args) {
if (m_jobs.empty()) {
emitString("log", "No jobs to execute");
emit("finished");
return;
}
emit("started");
emitString("log", "Starting parallel execution of " +
std::to_string(m_jobs.size()) + " jobs");
// Create futures for each job
std::vector> futures;
futures.reserve(m_jobs.size());
// Launch each job in a separate thread
for (size_t i = 0; i < m_jobs.size(); ++i) {
const auto &job = m_jobs[i];
futures.push_back(std::async(std::launch::async, [this, job, i]() {
if (stopRequested()) {
ArgumentPack jobArgs;
jobArgs.add(i);
emit("job_started", jobArgs);
emitString("warn", "Job " + std::to_string(i) +
" skipped due to stop request");
return;
}
try {
// Signal that job is starting
ArgumentPack startArgs;
startArgs.add(i);
emit("job_started", startArgs);
// Execute the job
this->doJob(job);
// Signal that job is finished
ArgumentPack finishArgs;
finishArgs.add(i);
finishArgs.add(true); // Success
emit("job_finished", finishArgs);
// Report progress
float progress = static_cast(i + 1) /
static_cast(m_jobs.size());
reportProgress(progress);
} catch (const std::exception &e) {
// Create an ArgumentPack for the error message
ArgumentPack errorArgs;
errorArgs.add("Job " + std::to_string(i) +
" failed: " + e.what());
emit("error", errorArgs);
// Signal that job is finished with error
ArgumentPack finishArgs;
finishArgs.add(i);
finishArgs.add(false); // Failure
emit("job_finished", finishArgs);
}
}));
}
// Wait for all jobs to complete
for (auto &future : futures) {
if (stopRequested()) {
emitString("warn", "Execution stopped by user request");
break;
}
future.wait();
}
// If we were requested to stop, some futures might still be running
if (stopRequested()) {
emitString("log", "Waiting for running jobs to complete...");
// We still need to wait for futures that might be running
for (auto &future : futures) {
future.wait();
}
}
emitString("log", "Parallel execution completed");
emit("finished");
}
The ParallelAlgorithm executes each job in the following sequence:
job_started
signal with the job indexdoJob()
method with the job datajob_finished
signal with success statusEach job is processed in its own thread, allowing for truly parallel execution.
The ParallelAlgorithm supports cancellation through the stopRequested()
mechanism:
stop()
to request cancellationstopRequested()
before startingThis allows for responsive cancellation while ensuring clean resource handling.
Errors in individual jobs are handled carefully to prevent one job failure from affecting others:
error
signal with detailsjob_finished
signal includes success status (false for errors)This approach allows robust error handling while maintaining parallel execution.
doJob()
implementationstd::async
with std::launch::async
to create true parallel execution. Each job executes in its own thread, which can lead to high resource usage if a large number of jobs are queued. Consider the system capabilities when determining the number of parallel jobs.