Back to Index

TaskObserver

Inherits from Task

Contents

The TaskObserver class is a powerful monitoring tool for the task library that collects and analyzes statistics about task execution across the system. It provides insights into performance metrics, success rates, and execution patterns of tasks without requiring code changes to the monitored tasks.

Features

Class Interface

class TaskObserver : public Task {
public:
    // Statistics structure for a single task
    struct TaskStats {
        std::string taskName;
        std::string taskType;
        int executionCount;
        int successCount;
        int failureCount;
        int64_t totalExecutionTimeMs;
        int64_t minExecutionTimeMs;
        int64_t maxExecutionTimeMs;
        std::chrono::system_clock::time_point lastExecutionTime;
        double lastProgress;
        std::map<std::string, double> customMetrics;
    };
    
    // Constructor
    explicit TaskObserver(const std::string& name = "TaskObserver");
    
    // Monitoring methods
    bool attach(Task* task, const std::string& taskName = "");
    bool detach(Task* task);
    
    // Statistics access
    const TaskStats* getTaskStats(Task* task) const;
    std::vector<TaskStats> getAllTaskStats() const;
    void resetAllStats();
    
    // Metrics management
    bool addCustomMetric(Task* task, const std::string& metricName, double value);
    
    // Analysis methods
    double getAverageExecutionTime(Task* task) const;
    double getSuccessRate(Task* task) const;
    std::string generateSummaryReport() const;
};

Task Statistics Structure

The TaskStats structure contains comprehensive execution statistics for a single task:

Field Type Description
taskName std::string User-provided name or identifier for the task
taskType std::string The type name of the task class
executionCount int Total number of times the task has been executed
successCount int Number of successful task executions
failureCount int Number of failed task executions
totalExecutionTimeMs int64_t Total execution time in milliseconds across all runs
minExecutionTimeMs int64_t Shortest execution time recorded
maxExecutionTimeMs int64_t Longest execution time recorded
lastExecutionTime time_point Timestamp of most recent execution
lastProgress double Most recent progress value reported (0.0-1.0)
customMetrics map User-defined metrics for application-specific tracking

Signals

TaskObserver emits the following signals:

Signal Description Arguments
statsUpdated Emitted when task statistics are updated taskName, taskType, execCount, successCount, failCount, avgTime
taskStarted Emitted when a monitored task begins execution taskName, taskType
taskFinished Emitted when a monitored task completes successfully taskName, taskType, execTimeMs, success
taskFailed Emitted when a monitored task encounters an error taskName, taskType, errorMsg

Usage Examples

Basic Monitoring Setup

Attaching Observer to Tasks
// Create observer
auto observer = std::make_shared<TaskObserver>("MainObserver");

// Attach to tasks
observer->attach(myTask, "ImportantTask");
observer->attach(otherTask);  // Uses type name if no name provided

// Subscribe to observer events
observer->connectData("statsUpdated", [](const ArgumentPack& args) {
    std::string taskName = args.get<std::string>(0);
    int execCount = args.get<int>(2);
    double avgTime = args.get<double>(5);
    
    std::cout << "Task " << taskName << " stats updated: "
              << execCount << " executions, "
              << avgTime << "ms average time" << std::endl;
});

// Add custom metrics
observer->addCustomMetric(myTask, "MemoryUsageMB", 42.5);

// Get statistics
auto stats = observer->getTaskStats(myTask);
if (stats) {
    std::cout << "Success rate: " 
              << (static_cast<double>(stats->successCount) / stats->executionCount) * 100.0 
              << "%" << std::endl;
}

// Generate a report
std::string report = observer->generateSummaryReport();
std::cout << report << std::endl;

Performance Monitoring

Tracking Execution Performance
// Create a performance monitor
auto perfMonitor = std::make_shared<TaskObserver>("PerformanceMonitor");

// Create a logger to record performance data
auto logger = std::make_shared<Logger>("Performance");

// Connect observer signals to the logger
perfMonitor->connectData("taskFinished", [logger](const ArgumentPack& args) {
    std::string taskName = args.get<std::string>(0);
    int64_t execTime = args.get<int64_t>(2);
    
    // Log performance data
    ArgumentPack logArgs;
    logArgs.add<std::string>("Task " + taskName + " completed in " + 
                           std::to_string(execTime) + " ms");
    logger->log(logArgs);
});

// Create and monitor tasks
auto tasks = createTasks();  // Creates several task instances

// Attach all tasks to the monitor
for (auto& task : tasks) {
    perfMonitor->attach(task.get());
}

// Execute tasks and collect performance data
executeTasksInSequence(tasks);

// Get performance statistics
for (auto& task : tasks) {
    auto stats = perfMonitor->getTaskStats(task.get());
    if (stats) {
        std::cout << "Task: " << stats->taskName << "\n"
                  << "  Avg time: " << (stats->totalExecutionTimeMs / stats->executionCount) << " ms\n"
                  << "  Min time: " << stats->minExecutionTimeMs << " ms\n"
                  << "  Max time: " << stats->maxExecutionTimeMs << " ms\n"
                  << "  Total executions: " << stats->executionCount << std::endl;
    }
}

Identifying Bottlenecks

Performance Analysis for System Tuning
// Create a bottleneck analyzer
auto analyzer = std::make_shared<TaskObserver>("BottleneckAnalyzer");

// Create and attach to all system components
std::vector<std::shared_ptr<Task>> systemComponents = initializeSystem();
for (auto& component : systemComponents) {
    analyzer->attach(component.get());
}

// Run standard system workload
runSystemBenchmark();

// Identify slow components
std::vector<TaskObserver::TaskStats> allStats = analyzer->getAllTaskStats();

// Sort by average execution time (descending)
std::sort(allStats.begin(), allStats.end(), 
    [](const TaskObserver::TaskStats& a, const TaskObserver::TaskStats& b) {
        double avgA = a.executionCount > 0 ? 
            static_cast<double>(a.totalExecutionTimeMs) / a.executionCount : 0;
        double avgB = b.executionCount > 0 ? 
            static_cast<double>(b.totalExecutionTimeMs) / b.executionCount : 0;
        return avgA > avgB;
    });

// Output the slowest components
std::cout << "Top 5 Slowest Components:" << std::endl;
for (size_t i = 0; i < std::min(size_t(5), allStats.size()); ++i) {
    const auto& stats = allStats[i];
    double avgTime = stats.executionCount > 0 ? 
        static_cast<double>(stats.totalExecutionTimeMs) / stats.executionCount : 0;
    
    std::cout << i+1 << ". " << stats.taskName << " (" << stats.taskType << ")\n"
              << "   Average time: " << avgTime << " ms\n"
              << "   Executions: " << stats.executionCount << "\n"
              << "   Success rate: " 
              << (stats.executionCount > 0 ? 
                 (static_cast<double>(stats.successCount) / stats.executionCount * 100) : 0) 
              << "%" << std::endl;
}

How It Works

TaskObserver connects to task signals to monitor their lifecycle:

1. Task Observation

When attach() is called, the observer connects to the task's signals:

2. Statistics Tracking

For each task event, the observer:

3. Metrics Retrieval

Applications can retrieve statistics at any time using the getTaskStats() or getAllTaskStats() methods.

Attach Implementation
bool TaskObserver::attach(Task* task, const std::string& taskName) {
    if (!task) {
        emitString("error", "Cannot attach to null task");
        return false;
    }
    
    // If task is already being monitored, detach first
    if (m_taskStats.find(task) != m_taskStats.end()) {
        detach(task);
    }
    
    // Create a new statistics entry
    TaskStats stats;
    
    // Use provided name or get type name
    stats.taskName = taskName.empty() ? 
        typeid(*task).name() : taskName;
    stats.taskType = typeid(*task).name();
    
    // Initialize counters
    stats.executionCount = 0;
    stats.successCount = 0;
    stats.failureCount = 0;
    stats.totalExecutionTimeMs = 0;
    stats.minExecutionTimeMs = std::numeric_limits<int64_t>::max();
    stats.maxExecutionTimeMs = 0;
    stats.lastProgress = 0.0;
    
    // Store the statistics object
    m_taskStats[task] = stats;
    
    // Connect to task signals
    m_connections[task].push_back(
        task->connectSimple("started", 
            [this, task]() { this->onTaskStarted(task); }));
            
    m_connections[task].push_back(
        task->connectSimple("finished", 
            [this, task]() { this->onTaskFinished(task); }));
            
    m_connections[task].push_back(
        task->connectData("error", 
            [this, task](const ArgumentPack& args) { 
                this->onTaskError(task, args); 
            }));
            
    m_connections[task].push_back(
        task->connectData("progress", 
            [this, task](const ArgumentPack& args) { 
                this->onTaskProgress(task, args); 
            }));
    
    emitString("log", "Started monitoring task: " + stats.taskName);
    return true;
}

Monitoring Capabilities

TaskObserver provides various monitoring capabilities:

1. Execution Counting

Tracks how many times each task has been executed, succeeded, or failed:

Tracking Execution Counts
void TaskObserver::onTaskFinished(Task* task) {
    auto it = m_taskStats.find(task);
    if (it == m_taskStats.end()) return;
    
    auto& stats = it->second;
    
    // Record execution end time
    auto now = std::chrono::system_clock::now();
    
    // Calculate execution duration
    auto execTime = std::chrono::duration_cast<std::chrono::milliseconds>(
        now - m_executionStartTimes[task]).count();
    
    // Update statistics
    stats.executionCount++;
    stats.successCount++;
    stats.totalExecutionTimeMs += execTime;
    stats.minExecutionTimeMs = std::min(stats.minExecutionTimeMs, execTime);
    stats.maxExecutionTimeMs = std::max(stats.maxExecutionTimeMs, execTime);
    stats.lastExecutionTime = now;
    
    // Emit statistics update
    ArgumentPack args;
    args.add<std::string>(stats.taskName);
    args.add<std::string>(stats.taskType);
    args.add<int>(stats.executionCount);
    args.add<int>(stats.successCount);
    args.add<int>(stats.failureCount);
    args.add<double>(getAverageExecutionTime(task));
    emit("statsUpdated", args);
    
    // Emit task finished signal
    ArgumentPack finishArgs;
    finishArgs.add<std::string>(stats.taskName);
    finishArgs.add<std::string>(stats.taskType);
    finishArgs.add<int64_t>(execTime);
    finishArgs.add<bool>(true); // success
    emit("taskFinished", finishArgs);
    
    // Clean up
    m_executionStartTimes.erase(task);
}

2. Timing Statistics

Collects detailed timing information across executions:

3. Success Rate Analysis

Calculates task reliability based on success and failure rates:

Success Rate Calculation
double TaskObserver::getSuccessRate(Task* task) const {
    auto it = m_taskStats.find(task);
    if (it == m_taskStats.end() || it->second.executionCount == 0) {
        return 0.0; // No executions yet
    }
    
    const auto& stats = it->second;
    return static_cast<double>(stats.successCount) / stats.executionCount * 100.0;
}

4. Progress Tracking

Monitors task progress reporting to identify stalled or slow-progressing tasks:

Progress Monitoring
void TaskObserver::onTaskProgress(Task* task, const ArgumentPack& args) {
    auto it = m_taskStats.find(task);
    if (it == m_taskStats.end() || args.empty()) return;
    
    // Extract progress value (typically 0.0 to 1.0)
    double progress = args.get<float>(0);
    
    // Update statistics
    it->second.lastProgress = progress;
    
    // Could implement progress rate calculations or stall detection here
}

Custom Metrics

TaskObserver supports application-specific metrics beyond the standard statistics:

Custom Metric Usage
// Adding custom metrics
void trackResourceUsage(std::shared_ptr<TaskObserver> observer, 
                        std::shared_ptr<DataProcessor> processor) {
    // Monitor memory usage
    size_t memoryUsed = processor->getMemoryUsage();
    observer->addCustomMetric(processor.get(), "MemoryUsageMB", 
                             static_cast<double>(memoryUsed) / (1024 * 1024));
    
    // Monitor CPU utilization
    double cpuUsage = processor->getCpuUtilization();
    observer->addCustomMetric(processor.get(), "CpuUsagePercent", cpuUsage * 100.0);
    
    // Monitor I/O operations
    int ioOps = processor->getIoOperationCount();
    observer->addCustomMetric(processor.get(), "IoOperations", ioOps);
    
    // Retrieve and use custom metrics
    auto stats = observer->getTaskStats(processor.get());
    if (stats) {
        for (const auto& [metricName, value] : stats->customMetrics) {
            std::cout << "Metric: " << metricName << " = " << value << std::endl;
        }
    }
}

Custom metrics can be used for:

Statistical Analysis

TaskObserver provides built-in and extensible analysis capabilities:

1. Summary Reports

Generate comprehensive reports about task execution:

Summary Report Generation
std::string TaskObserver::generateSummaryReport() const {
    std::stringstream report;
    
    report << "======================================\n";
    report << "         TASK EXECUTION REPORT        \n";
    report << "======================================\n\n";
    
    report << "Total tasks monitored: " << m_taskStats.size() << "\n\n";
    
    // Loop through all tasks
    for (const auto& [task, stats] : m_taskStats) {
        report << "Task: " << stats.taskName << " (" << stats.taskType << ")\n";
        report << "--------------------------------------\n";
        report << "  Executions: " << stats.executionCount << "\n";
        report << "  Successes: " << stats.successCount << "\n";
        report << "  Failures: " << stats.failureCount << "\n";
        
        if (stats.executionCount > 0) {
            report << "  Success Rate: " 
                   << (static_cast<double>(stats.successCount) / stats.executionCount * 100.0)
                   << "%\n";
                   
            double avgTime = static_cast<double>(stats.totalExecutionTimeMs) / 
                             stats.executionCount;
            report << "  Average Time: " << avgTime << " ms\n";
            report << "  Minimum Time: " << stats.minExecutionTimeMs << " ms\n";
            report << "  Maximum Time: " << stats.maxExecutionTimeMs << " ms\n";
        }
        
        if (!stats.customMetrics.empty()) {
            report << "  Custom Metrics:\n";
            for (const auto& [metricName, value] : stats.customMetrics) {
                report << "    - " << metricName << ": " << value << "\n";
            }
        }
        
        report << "\n";
    }
    
    report << "======================================\n";
    return report.str();
}

2. Performance Analysis

Identify performance issues and bottlenecks:

Finding Performance Bottlenecks
// Find the slowest tasks in the system
std::vector<Task*> findBottlenecks(std::shared_ptr<TaskObserver> observer, 
                                  double timeThresholdMs = 100.0) {
    std::vector<Task*> slowTasks;
    std::vector<TaskObserver::TaskStats> allStats = observer->getAllTaskStats();
    
    // Find tasks with average execution time above threshold
    for (const auto& stats : allStats) {
        Task* task = findTaskByName(stats.taskName); // Hypothetical lookup function
        if (!task) continue;
        
        double avgTime = observer->getAverageExecutionTime(task);
        if (avgTime > timeThresholdMs) {
            slowTasks.push_back(task);
        }
    }
    
    // Sort by average execution time (descending)
    std::sort(slowTasks.begin(), slowTasks.end(), 
        [observer](Task* a, Task* b) {
            return observer->getAverageExecutionTime(a) > 
                   observer->getAverageExecutionTime(b);
        });
    
    return slowTasks;
}

3. Reliability Analysis

Identify tasks with high failure rates:

Finding Unreliable Tasks
// Find tasks with low success rates
std::vector<Task*> findUnreliableTasks(std::shared_ptr<TaskObserver> observer, 
                                       double minSuccessRate = 95.0) {
    std::vector<Task*> unreliableTasks;
    
    // Check all tasks
    auto allStats = observer->getAllTaskStats();
    for (const auto& stats : allStats) {
        Task* task = findTaskByName(stats.taskName); // Hypothetical lookup function
        if (!task) continue;
        
        // Skip tasks with too few executions for meaningful analysis
        if (stats.executionCount < 5) continue;
        
        double successRate = observer->getSuccessRate(task);
        if (successRate < minSuccessRate) {
            unreliableTasks.push_back(task);
        }
    }
    
    return unreliableTasks;
}

Benefits

TaskObserver provides several benefits for applications:

Performance Optimization

Reliability Monitoring

Resource Management

System Tuning

Diagnostics

Thread Safety

TaskObserver is designed to be thread-safe for monitoring tasks across different threads:

Note: Task observation is thread-safe, but observer methods should not be called from within task signal handlers to avoid potential deadlocks. Use signals emitted by the observer instead.

Best Practices

Implementation Details

Advanced Usage: Performance Dashboard
// Create a performance dashboard for system monitoring
class PerformanceDashboard {
public:
    PerformanceDashboard() {
        m_observer = std::make_shared<TaskObserver>("SystemMonitor");
        
        // Connect to observer signals
        m_observer->connectData("statsUpdated", 
            [this](const ArgumentPack& args) { this->onStatsUpdated(args); });
            
        m_observer->connectData("taskFailed", 
            [this](const ArgumentPack& args) { this->onTaskFailed(args); });
    }
    
    // Monitor a component
    void addComponent(std::shared_ptr<Task> component, const std::string& name) {
        m_observer->attach(component.get(), name);
        m_components[name] = component;
    }
    
    // Generate a dashboard report
    std::string generateDashboard() {
        std::stringstream dash;
        
        dash << "===== SYSTEM PERFORMANCE DASHBOARD =====\n\n";
        
        // Get all statistics
        auto allStats = m_observer->getAllTaskStats();
        
        // Overall system metrics
        int totalExecs = 0;
        int totalSuccesses = 0;
        int totalFailures = 0;
        int64_t totalTime = 0;
        
        for (const auto& stats : allStats) {
            totalExecs += stats.executionCount;
            totalSuccesses += stats.successCount;
            totalFailures += stats.failureCount;
            totalTime += stats.totalExecutionTimeMs;
        }
        
        dash << "System Overview:\n";
        dash << "  Components: " << allStats.size() << "\n";
        dash << "  Total Executions: " << totalExecs << "\n";
        dash << "  Success Rate: " << (totalExecs > 0 ? 
            (static_cast<double>(totalSuccesses) / totalExecs * 100) : 0) << "%\n";
        dash << "  Total Execution Time: " << totalTime << "ms\n\n";
        
        // Performance issues section
        dash << "Performance Hotspots:\n";
        
        // Sort by average execution time
        std::vector<TaskObserver::TaskStats> sortedStats = allStats;
        std::sort(sortedStats.begin(), sortedStats.end(), 
            [](const auto& a, const auto& b) {
                double avgA = a.executionCount > 0 ? 
                    static_cast<double>(a.totalExecutionTimeMs) / a.executionCount : 0;
                double avgB = b.executionCount > 0 ? 
                    static_cast<double>(b.totalExecutionTimeMs) / b.executionCount : 0;
                return avgA > avgB;
            });
        
        // List top 3 slowest components
        for (size_t i = 0; i < std::min(size_t(3), sortedStats.size()); ++i) {
            const auto& stats = sortedStats[i];
            double avgTime = stats.executionCount > 0 ? 
                static_cast<double>(stats.totalExecutionTimeMs) / stats.executionCount : 0;
                
            dash << "  " << (i+1) << ". " << stats.taskName << "\n";
            dash << "     Avg Time: " << avgTime << "ms\n";
            dash << "     Max Time: " << stats.maxExecutionTimeMs << "ms\n";
        }
        
        dash << "\nReliability Issues:\n";
        
        // Sort by failure rate
        std::sort(sortedStats.begin(), sortedStats.end(), 
            [](const auto& a, const auto& b) {
                double failRateA = a.executionCount > 0 ? 
                    static_cast<double>(a.failureCount) / a.executionCount : 0;
                double failRateB = b.executionCount > 0 ? 
                    static_cast<double>(b.failureCount) / b.executionCount : 0;
                return failRateA > failRateB;
            });
        
        // List top 3 unreliable components
        for (size_t i = 0; i < std::min(size_t(3), sortedStats.size()); ++i) {
            const auto& stats = sortedStats[i];
            
            // Skip if no failures
            if (stats.failureCount == 0) continue;
            
            double failRate = stats.executionCount > 0 ? 
                static_cast<double>(stats.failureCount) / stats.executionCount * 100 : 0;
                
            dash << "  " << (i+1) << ". " << stats.taskName << "\n";
            dash << "     Failure Rate: " << failRate << "%\n";
            dash << "     Total Failures: " << stats.failureCount << "\n";
        }
        
        return dash.str();
    }
    
private:
    void onStatsUpdated(const ArgumentPack& args) {
        std::string taskName = args.get<std::string>(0);
        int execCount = args.get<int>(2);
        double avgTime = args.get<double>(5);
        
        // Update dashboard UI (in a real implementation)
        // updateComponentStats(taskName, execCount, avgTime);
    }
    
    void onTaskFailed(const ArgumentPack& args) {
        std::string taskName = args.get<std::string>(0);
        std::string errorMsg = args.get<std::string>(2);
        
        // Alert on component failure (in a real implementation)
        // showFailureAlert(taskName, errorMsg);
        
        // Log the failure
        std::cout << "ALERT: Component " << taskName << " failed: " 
                  << errorMsg << std::endl;
    }
    
    std::shared_ptr<TaskObserver> m_observer;
    std::map<std::string, std::shared_ptr<Task>> m_components;
};

// Usage example
void monitorSystemPerformance() {
    // Create dashboard
    auto dashboard = std::make_shared<PerformanceDashboard>();
    
    // Add system components
    dashboard->addComponent(std::make_shared<NetworkService>(), "Network");
    dashboard->addComponent(std::make_shared<DatabaseService>(), "Database");
    dashboard->addComponent(std::make_shared<FileSystem>(), "FileSystem");
    dashboard->addComponent(std::make_shared<UserInterface>(), "Interface");
    
    // Run normal system operations for a while
    runSystem(std::chrono::minutes(10));
    
    // Generate and display dashboard
    std::string report = dashboard->generateDashboard();
    std::cout << report << std::endl;
}

Visualization Integration

The statistics collected by TaskObserver can be visualized for better analysis:

Integration Note: TaskObserver is designed to work with visualization libraries and reporting tools. Its data structures can be easily mapped to JSON, CSV, or other formats for external processing and visualization.

Monitoring Overhead

TaskObserver is designed for minimal performance impact:

For most applications, the overhead is negligible compared to the benefits of performance insights and diagnostic capabilities.