Back to Index

TaskQueue

Inherits from Task

Contents

The TaskQueue class provides a priority-based task scheduling system for the task library. It enables executing tasks based on their importance level while managing concurrency to optimize system resource utilization. The queue ensures that high-priority tasks execute before lower-priority ones, regardless of their enqueue order.

TaskQueue serves as a central execution manager that can handle multiple tasks with different priorities, ensuring critical operations are performed first while limiting the number of simultaneously running tasks to prevent system overload.

Features

Class Interface

enum class TaskPriority {
    CRITICAL = 0,  // Highest priority, execute immediately
    HIGH = 1,      // High priority tasks
    NORMAL = 2,    // Normal priority tasks
    LOW = 3,       // Low priority, can wait
    BACKGROUND = 4 // Lowest priority, run when system is idle
};

class TaskQueue : public Task {
public:
    // Constructor & Destructor
    TaskQueue(unsigned int maxConcurrentTasks = 1, bool autoStart = true);
    ~TaskQueue();
    
    // Queue control
    void start();
    void stop(bool wait = true);
    void stopAll();
    
    // Task management
    bool enqueue(std::shared_ptr task, 
                 TaskPriority priority = TaskPriority::NORMAL,
                 const std::string& description = "");
    
    template 
    std::shared_ptr createAndEnqueue(TaskPriority priority, 
                                         const std::string& description, 
                                         Args&&... args);
    
    size_t clearQueue();
    
    // Status methods
    size_t pendingCount() const;
    size_t activeCount() const;
    bool isRunning() const;
    
    // Configuration
    void setMaxConcurrentTasks(unsigned int maxTasks);
    unsigned int getMaxConcurrentTasks() const;
};

Priority Levels

TaskQueue supports five priority levels for tasks:

Priority Value Description Use Cases
CRITICAL 0 Highest priority, execute immediately Error recovery, safety-critical operations, urgent UI updates
HIGH 1 Important tasks with high priority User-initiated actions, time-sensitive processing
NORMAL 2 Standard tasks with default priority Regular background operations, typical workloads
LOW 3 Tasks that can wait if system is busy Prefetching, precalculations, non-critical updates
BACKGROUND 4 Lowest priority, only run when system is idle Maintenance, cleanup, indexing, statistics collection
Note: Tasks are always executed in order of priority, regardless of when they were added to the queue. Within the same priority level, tasks are executed in FIFO (First-In, First-Out) order.

Signals

TaskQueue emits the following signals:

Signal Description Arguments
taskEnqueued When a task is added to the queue taskDescription, priorityValue
taskStarted When a task begins execution taskDescription, priorityValue
taskCompleted When a task completes successfully taskDescription, priorityValue
taskFailed When a task encounters an error taskDescription, priorityValue, errorMessage
queueStats When queue statistics are updated pendingCount, activeCount, maxConcurrentTasks

Usage Example

Creating and Using a TaskQueue
// Create a task queue with 3 concurrent tasks
auto taskQueue = std::make_shared(3);

// Connect to queue signals
taskQueue->connectData("taskCompleted", [](const ArgumentPack& args) {
    std::string taskName = args.get(0);
    std::cout << "Task completed: " << taskName << std::endl;
});

taskQueue->connectData("taskFailed", [](const ArgumentPack& args) {
    std::string taskName = args.get(0);
    std::string errorMsg = args.get(2);
    std::cerr << "Task failed: " << taskName << " - " << errorMsg << std::endl;
});

taskQueue->connectData("queueStats", [](const ArgumentPack& args) {
    size_t pending = args.get(0);
    size_t active = args.get(1);
    std::cout << "Queue stats: " << active << " active, " 
              << pending << " pending" << std::endl;
});

// Create and enqueue tasks with different priorities
auto criticalTask = std::make_shared("Critical operation", 1000);
taskQueue->enqueue(criticalTask, TaskPriority::CRITICAL, "Critical Task");

auto normalTask = std::make_shared("Normal operation", 2000);
taskQueue->enqueue(normalTask, TaskPriority::NORMAL, "Normal Task");

// Create and enqueue in one step
auto ioTask = taskQueue->createAndEnqueue(
    TaskPriority::LOW, 
    "Background IO Task",
    "IO operation", 
    3000
);

// Wait for completion
while (taskQueue->activeCount() > 0 || taskQueue->pendingCount() > 0) {
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    std::cout << "Active: " << taskQueue->activeCount() 
              << ", Pending: " << taskQueue->pendingCount() << std::endl;
}

// Change concurrency limit dynamically
taskQueue->setMaxConcurrentTasks(5);

// Stop the queue when done
taskQueue->stop();
Creating a Custom Task Type
// A custom task that can be added to the queue
class FileProcessorTask : public Runnable {
public:
    FileProcessorTask(const std::string& filePath, bool deleteAfter = false) 
        : m_filePath(filePath), m_deleteAfter(deleteAfter) {}
    
protected:
    // Implement the required runImpl method
    void runImpl() override {
        // Signal task start
        emitString("log", "Starting to process file: " + m_filePath);
        
        try {
            // Simulate file processing with progress updates
            for (int i = 0; i < 10; ++i) {
                // Check if we should stop
                if (stopRequested()) {
                    emitString("warn", "File processing cancelled");
                    return;
                }
                
                // Simulate work
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
                
                // Report progress
                float progress = static_cast(i + 1) / 10.0f;
                reportProgress(progress);
                
                if (i == 5) {
                    emitString("log", "File processing at 50%");
                }
            }
            
            // Simulate cleanup if needed
            if (m_deleteAfter) {
                emitString("log", "Deleting file: " + m_filePath);
                // Simulated deletion
                std::this_thread::sleep_for(std::chrono::milliseconds(50));
            }
            
            emitString("log", "Completed processing file: " + m_filePath);
        }
        catch (const std::exception& e) {
            emitString("error", std::string("Error processing file: ") + e.what());
            throw; // Re-throw to signal task failure
        }
    }
    
private:
    std::string m_filePath;
    bool m_deleteAfter;
};

// Usage with TaskQueue
void processFiles(const std::vector& files) {
    auto queue = std::make_shared(2); // Process 2 files at a time
    
    // Add files to the queue with different priorities
    for (size_t i = 0; i < files.size(); ++i) {
        // Determine priority based on file type or other criteria
        TaskPriority priority = TaskPriority::NORMAL;
        if (files[i].ends_with(".urgent")) {
            priority = TaskPriority::HIGH;
        }
        
        // Create and enqueue the task
        queue->createAndEnqueue(
            priority,
            "Process " + files[i],
            files[i],
            false // Don't delete after processing
        );
    }
    
    // Wait for all tasks to complete
    while (queue->pendingCount() > 0 || queue->activeCount() > 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
    
    std::cout << "All files processed" << std::endl;
}

How It Works

The TaskQueue operates using the following mechanisms:

Task Submission

When a task is enqueued:

Processing Thread

A dedicated processing thread handles task execution:

Concurrency Management

The queue carefully controls execution concurrency:

Task Completion

When a task completes:

Thread Safety

TaskQueue is designed to be thread-safe, allowing multiple threads to interact with it simultaneously:

Multiple threads can safely call enqueue(), setMaxConcurrentTasks(), and other methods concurrently without explicit synchronization from the caller.

Caution: When implementing custom Runnable tasks for use with TaskQueue, ensure that your task implementations are thread-safe if they access shared resources.

Best Practices

Use Cases

TaskQueue is particularly useful for:

Implementation Details

Key implementation aspects of the TaskQueue:

TaskQueue works well with several other components in the Task Framework: