Inherits from Task
The TaskQueue
class provides a priority-based task scheduling system for the task library.
It enables executing tasks based on their importance level while managing concurrency to optimize system
resource utilization. The queue ensures that high-priority tasks execute before lower-priority ones,
regardless of their enqueue order.
TaskQueue serves as a central execution manager that can handle multiple tasks with different priorities, ensuring critical operations are performed first while limiting the number of simultaneously running tasks to prevent system overload.
enum class TaskPriority {
CRITICAL = 0, // Highest priority, execute immediately
HIGH = 1, // High priority tasks
NORMAL = 2, // Normal priority tasks
LOW = 3, // Low priority, can wait
BACKGROUND = 4 // Lowest priority, run when system is idle
};
class TaskQueue : public Task {
public:
// Constructor & Destructor
TaskQueue(unsigned int maxConcurrentTasks = 1, bool autoStart = true);
~TaskQueue();
// Queue control
void start();
void stop(bool wait = true);
void stopAll();
// Task management
bool enqueue(std::shared_ptr task,
TaskPriority priority = TaskPriority::NORMAL,
const std::string& description = "");
template
std::shared_ptr createAndEnqueue(TaskPriority priority,
const std::string& description,
Args&&... args);
size_t clearQueue();
// Status methods
size_t pendingCount() const;
size_t activeCount() const;
bool isRunning() const;
// Configuration
void setMaxConcurrentTasks(unsigned int maxTasks);
unsigned int getMaxConcurrentTasks() const;
};
TaskQueue supports five priority levels for tasks:
Priority | Value | Description | Use Cases |
---|---|---|---|
CRITICAL | 0 | Highest priority, execute immediately | Error recovery, safety-critical operations, urgent UI updates |
HIGH | 1 | Important tasks with high priority | User-initiated actions, time-sensitive processing |
NORMAL | 2 | Standard tasks with default priority | Regular background operations, typical workloads |
LOW | 3 | Tasks that can wait if system is busy | Prefetching, precalculations, non-critical updates |
BACKGROUND | 4 | Lowest priority, only run when system is idle | Maintenance, cleanup, indexing, statistics collection |
TaskQueue emits the following signals:
Signal | Description | Arguments |
---|---|---|
taskEnqueued |
When a task is added to the queue | taskDescription, priorityValue |
taskStarted |
When a task begins execution | taskDescription, priorityValue |
taskCompleted |
When a task completes successfully | taskDescription, priorityValue |
taskFailed |
When a task encounters an error | taskDescription, priorityValue, errorMessage |
queueStats |
When queue statistics are updated | pendingCount, activeCount, maxConcurrentTasks |
// Create a task queue with 3 concurrent tasks
auto taskQueue = std::make_shared(3);
// Connect to queue signals
taskQueue->connectData("taskCompleted", [](const ArgumentPack& args) {
std::string taskName = args.get(0);
std::cout << "Task completed: " << taskName << std::endl;
});
taskQueue->connectData("taskFailed", [](const ArgumentPack& args) {
std::string taskName = args.get(0);
std::string errorMsg = args.get(2);
std::cerr << "Task failed: " << taskName << " - " << errorMsg << std::endl;
});
taskQueue->connectData("queueStats", [](const ArgumentPack& args) {
size_t pending = args.get(0);
size_t active = args.get(1);
std::cout << "Queue stats: " << active << " active, "
<< pending << " pending" << std::endl;
});
// Create and enqueue tasks with different priorities
auto criticalTask = std::make_shared("Critical operation", 1000);
taskQueue->enqueue(criticalTask, TaskPriority::CRITICAL, "Critical Task");
auto normalTask = std::make_shared("Normal operation", 2000);
taskQueue->enqueue(normalTask, TaskPriority::NORMAL, "Normal Task");
// Create and enqueue in one step
auto ioTask = taskQueue->createAndEnqueue(
TaskPriority::LOW,
"Background IO Task",
"IO operation",
3000
);
// Wait for completion
while (taskQueue->activeCount() > 0 || taskQueue->pendingCount() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Active: " << taskQueue->activeCount()
<< ", Pending: " << taskQueue->pendingCount() << std::endl;
}
// Change concurrency limit dynamically
taskQueue->setMaxConcurrentTasks(5);
// Stop the queue when done
taskQueue->stop();
// A custom task that can be added to the queue
class FileProcessorTask : public Runnable {
public:
FileProcessorTask(const std::string& filePath, bool deleteAfter = false)
: m_filePath(filePath), m_deleteAfter(deleteAfter) {}
protected:
// Implement the required runImpl method
void runImpl() override {
// Signal task start
emitString("log", "Starting to process file: " + m_filePath);
try {
// Simulate file processing with progress updates
for (int i = 0; i < 10; ++i) {
// Check if we should stop
if (stopRequested()) {
emitString("warn", "File processing cancelled");
return;
}
// Simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Report progress
float progress = static_cast(i + 1) / 10.0f;
reportProgress(progress);
if (i == 5) {
emitString("log", "File processing at 50%");
}
}
// Simulate cleanup if needed
if (m_deleteAfter) {
emitString("log", "Deleting file: " + m_filePath);
// Simulated deletion
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
emitString("log", "Completed processing file: " + m_filePath);
}
catch (const std::exception& e) {
emitString("error", std::string("Error processing file: ") + e.what());
throw; // Re-throw to signal task failure
}
}
private:
std::string m_filePath;
bool m_deleteAfter;
};
// Usage with TaskQueue
void processFiles(const std::vector& files) {
auto queue = std::make_shared(2); // Process 2 files at a time
// Add files to the queue with different priorities
for (size_t i = 0; i < files.size(); ++i) {
// Determine priority based on file type or other criteria
TaskPriority priority = TaskPriority::NORMAL;
if (files[i].ends_with(".urgent")) {
priority = TaskPriority::HIGH;
}
// Create and enqueue the task
queue->createAndEnqueue(
priority,
"Process " + files[i],
files[i],
false // Don't delete after processing
);
}
// Wait for all tasks to complete
while (queue->pendingCount() > 0 || queue->activeCount() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "All files processed" << std::endl;
}
The TaskQueue operates using the following mechanisms:
When a task is enqueued:
A dedicated processing thread handles task execution:
The queue carefully controls execution concurrency:
When a task completes:
TaskQueue is designed to be thread-safe, allowing multiple threads to interact with it simultaneously:
Multiple threads can safely call enqueue()
, setMaxConcurrentTasks()
, and other
methods concurrently without explicit synchronization from the caller.
TaskQueue is particularly useful for:
Key implementation aspects of the TaskQueue:
TaskQueue works well with several other components in the Task Framework: