Inherits from: Algorithm
The FlowAlgorithm
class extends the Algorithm
class to provide a framework for implementing algorithms that process a sequence of jobs. It serves as an intermediate layer in the algorithm hierarchy that adds job management capabilities while maintaining the execution control, signaling, and progress reporting features of the base Algorithm
class.
std::any
class FlowAlgorithm : public Algorithm {
public:
// Constructor & destructor
FlowAlgorithm() = default;
virtual ~FlowAlgorithm() = default;
// Job management
void addJob(const std::any &job);
void clearJobs();
// Core job processing method (must be implemented by subclasses)
virtual void doJob(const std::any &job) = 0;
protected:
// Job storage
std::vector m_jobs;
};
FlowAlgorithm inherits and emits the following signals:
Signal | Type | Description | Arguments |
---|---|---|---|
progress data | Data | Reports execution progress | float (0.0 to 1.0) |
started simple | Simple | Indicates algorithm has started execution | None |
finished simple | Simple | Indicates algorithm has completed execution | None |
error data | Data | Reports execution errors | std::string (error message) |
log data | Data | Reports job management operations | std::string (log message) |
// Create a custom flow algorithm by extending the FlowAlgorithm class
class TextProcessor : public FlowAlgorithm {
public:
// Constructor adds custom signals
TextProcessor() {
createDataSignal("file_processed");
}
// Implement the required exec method to process all jobs
void exec(const ArgumentPack& args = {}) override {
if (m_jobs.empty()) {
emitString("log", "No text files to process");
return;
}
emit("started");
size_t totalJobs = m_jobs.size();
for (size_t i = 0; i < totalJobs; ++i) {
// Check for cancellation
if (stopRequested()) {
emitString("warn", "Processing stopped by user");
return;
}
// Process this job
try {
doJob(m_jobs[i]);
} catch (const std::exception& e) {
emitString("error", std::string("Error processing job: ") + e.what());
}
// Report progress
float progress = static_cast(i + 1) / totalJobs;
reportProgress(progress);
}
emitString("log", "Text processing completed");
emit("finished");
}
// Implement the required doJob method to process individual jobs
void doJob(const std::any& job) override {
try {
// Try to cast the job to the expected type
auto filename = std::any_cast(job);
// Process the text file
emitString("log", "Processing file: " + filename);
// File processing logic here...
int wordCount = 0; // Calculated from file processing
// Create result for this job
ArgumentPack resultArgs;
resultArgs.add(filename);
resultArgs.add(wordCount);
emit("file_processed", resultArgs);
} catch (const std::bad_any_cast& e) {
throw std::runtime_error("Invalid job type: expected string filename");
}
}
};
// Using the flow algorithm
void processTextFiles() {
// Create the algorithm
auto processor = std::make_shared();
// Add jobs to process
processor->addJob(std::string("file1.txt"));
processor->addJob(std::string("file2.txt"));
processor->addJob(std::string("file3.txt"));
// Connect to signals
processor->connectData("file_processed", [](const ArgumentPack& args) {
std::string filename = args.get(0);
int wordCount = args.get(1);
std::cout << "Processed " << filename << ": "
<< wordCount << " words" << std::endl;
});
processor->connectData("progress", [](const ArgumentPack& args) {
float progress = args.get(0);
std::cout << "Progress: " << (progress * 100) << "%" << std::endl;
});
// Execute asynchronously
auto future = processor->run();
// Wait for completion if needed
future.wait();
}
The FlowAlgorithm class maintains a vector of std::any
objects that represent the jobs to be processed:
addJob(const std::any& job)
: Adds a new job to the collectionclearJobs()
: Removes all jobs from the collectiondoJob(const std::any& job)
: Abstract method that must be implemented by subclasses to process a single jobstd::any
allows for flexibility in job types, but requires careful type checking in the doJob()
implementation.
void FlowAlgorithm::addJob(const std::any &job) {
m_jobs.push_back(job);
// Emit a signal to indicate job has been added
ArgumentPack args;
args.add(m_jobs.size());
emitString("log",
"Job added. Total jobs: " + std::to_string(m_jobs.size()));
}
void FlowAlgorithm::clearJobs() {
size_t oldSize = m_jobs.size();
m_jobs.clear();
// Emit a signal that jobs have been cleared
ArgumentPack args;
args.add(oldSize);
emitString("log", "Cleared " + std::to_string(oldSize) + " jobs");
}
FlowAlgorithm itself doesn't implement the exec()
method, leaving it to subclasses to define how jobs are processed:
void SequentialAlgorithm::exec(const ArgumentPack& args) {
if (m_jobs.empty()) {
emitString("log", "No jobs to execute");
return;
}
emit("started");
// Process jobs sequentially
size_t jobCount = m_jobs.size();
for (size_t i = 0; i < jobCount; ++i) {
// Check for stop request
if (stopRequested()) {
emitString("warn", "Processing stopped by user request");
break;
}
// Process current job
try {
doJob(m_jobs[i]);
} catch (const std::exception& e) {
emitString("error", "Job failed: " + std::string(e.what()));
}
// Report progress
reportProgress(static_cast(i + 1) / jobCount);
}
emit("finished");
}
doJob()
to handle the std::any
job objects safelystopRequested()
between jobs to allow for clean cancellationFlowAlgorithm serves as the base for several specialized algorithm classes: