Back to Index

FlowAlgorithm

Inherits from: Algorithm

Contents

The FlowAlgorithm class extends the Algorithm class to provide a framework for implementing algorithms that process a sequence of jobs. It serves as an intermediate layer in the algorithm hierarchy that adds job management capabilities while maintaining the execution control, signaling, and progress reporting features of the base Algorithm class.

Features

Class Interface

class FlowAlgorithm : public Algorithm {
public:
    // Constructor & destructor
    FlowAlgorithm() = default;
    virtual ~FlowAlgorithm() = default;
    
    // Job management
    void addJob(const std::any &job);
    void clearJobs();
    
    // Core job processing method (must be implemented by subclasses)
    virtual void doJob(const std::any &job) = 0;
    
protected:
    // Job storage
    std::vector m_jobs;
};

Signals

FlowAlgorithm inherits and emits the following signals:

Signal Type Description Arguments
progress data Data Reports execution progress float (0.0 to 1.0)
started simple Simple Indicates algorithm has started execution None
finished simple Simple Indicates algorithm has completed execution None
error data Data Reports execution errors std::string (error message)
log data Data Reports job management operations std::string (log message)

Usage Example

FlowAlgorithm Implementation Example
// Create a custom flow algorithm by extending the FlowAlgorithm class
class TextProcessor : public FlowAlgorithm {
public:
    // Constructor adds custom signals
    TextProcessor() {
        createDataSignal("file_processed");
    }
    
    // Implement the required exec method to process all jobs
    void exec(const ArgumentPack& args = {}) override {
        if (m_jobs.empty()) {
            emitString("log", "No text files to process");
            return;
        }
        
        emit("started");
        
        size_t totalJobs = m_jobs.size();
        for (size_t i = 0; i < totalJobs; ++i) {
            // Check for cancellation
            if (stopRequested()) {
                emitString("warn", "Processing stopped by user");
                return;
            }
            
            // Process this job
            try {
                doJob(m_jobs[i]);
            } catch (const std::exception& e) {
                emitString("error", std::string("Error processing job: ") + e.what());
            }
            
            // Report progress
            float progress = static_cast(i + 1) / totalJobs;
            reportProgress(progress);
        }
        
        emitString("log", "Text processing completed");
        emit("finished");
    }
    
    // Implement the required doJob method to process individual jobs
    void doJob(const std::any& job) override {
        try {
            // Try to cast the job to the expected type
            auto filename = std::any_cast(job);
            
            // Process the text file
            emitString("log", "Processing file: " + filename);
            
            // File processing logic here...
            int wordCount = 0; // Calculated from file processing
            
            // Create result for this job
            ArgumentPack resultArgs;
            resultArgs.add(filename);
            resultArgs.add(wordCount);
            emit("file_processed", resultArgs);
        } catch (const std::bad_any_cast& e) {
            throw std::runtime_error("Invalid job type: expected string filename");
        }
    }
};
Using the FlowAlgorithm
// Using the flow algorithm
void processTextFiles() {
    // Create the algorithm
    auto processor = std::make_shared();
    
    // Add jobs to process
    processor->addJob(std::string("file1.txt"));
    processor->addJob(std::string("file2.txt"));
    processor->addJob(std::string("file3.txt"));
    
    // Connect to signals
    processor->connectData("file_processed", [](const ArgumentPack& args) {
        std::string filename = args.get(0);
        int wordCount = args.get(1);
        
        std::cout << "Processed " << filename << ": " 
                  << wordCount << " words" << std::endl;
    });
    
    processor->connectData("progress", [](const ArgumentPack& args) {
        float progress = args.get(0);
        std::cout << "Progress: " << (progress * 100) << "%" << std::endl;
    });
    
    // Execute asynchronously
    auto future = processor->run();
    
    // Wait for completion if needed
    future.wait();
}

Job Management

The FlowAlgorithm class maintains a vector of std::any objects that represent the jobs to be processed:

Note: The use of std::any allows for flexibility in job types, but requires careful type checking in the doJob() implementation.
Job Management Implementation
void FlowAlgorithm::addJob(const std::any &job) {
    m_jobs.push_back(job);

    // Emit a signal to indicate job has been added
    ArgumentPack args;
    args.add(m_jobs.size());
    emitString("log",
               "Job added. Total jobs: " + std::to_string(m_jobs.size()));
}

void FlowAlgorithm::clearJobs() {
    size_t oldSize = m_jobs.size();
    m_jobs.clear();

    // Emit a signal that jobs have been cleared
    ArgumentPack args;
    args.add(oldSize);
    emitString("log", "Cleared " + std::to_string(oldSize) + " jobs");
}

Execution Implementation

FlowAlgorithm itself doesn't implement the exec() method, leaving it to subclasses to define how jobs are processed:

Sequential Processing Example
void SequentialAlgorithm::exec(const ArgumentPack& args) {
    if (m_jobs.empty()) {
        emitString("log", "No jobs to execute");
        return;
    }
    
    emit("started");
    
    // Process jobs sequentially
    size_t jobCount = m_jobs.size();
    for (size_t i = 0; i < jobCount; ++i) {
        // Check for stop request
        if (stopRequested()) {
            emitString("warn", "Processing stopped by user request");
            break;
        }
        
        // Process current job
        try {
            doJob(m_jobs[i]);
        } catch (const std::exception& e) {
            emitString("error", "Job failed: " + std::string(e.what()));
        }
        
        // Report progress
        reportProgress(static_cast(i + 1) / jobCount);
    }
    
    emit("finished");
}

Best Practices

Extensions

FlowAlgorithm serves as the base for several specialized algorithm classes:

Note: When implementing a job processing algorithm, consider whether ParallelAlgorithm might meet your needs before creating a custom implementation.