Building a Data Analysis Pipeline with DataFrame

A comprehensive tutorial on creating efficient data processing workflows

Introduction

The DataFrame library offers a powerful way to process and analyze data using modern C++ techniques. It's designed around the concept of Series (columns of data) and functional operations that can be chained together to create clear, expressive data processing pipelines.

This tutorial will guide you through building a complete data analysis workflow using the DataFrame library. We'll start with the basics and gradually move to more advanced techniques, showing how the functional approach enables clean, maintainable, and efficient data processing code.

By the end of this tutorial, you'll be able to:

  • Create and manipulate Series of various data types
  • Apply transformations, filters, and reductions on your data
  • Combine multiple operations into efficient pipelines
  • Handle real-world data analysis scenarios
  • Implement a complete weather data analysis system

Installation and Setup

Before diving into the DataFrame library, you'll need to set up your development environment. The library can be installed through various methods:

Using CMake (Recommended)

CMake Configuration

# In your CMakeLists.txt
cmake_minimum_required(VERSION 3.14)
project(MyDataAnalysis)

# Option 1: Using FetchContent
include(FetchContent)
FetchContent_Declare(
    dataframe
    GIT_REPOSITORY https://github.com/xaliphostes/dataframe.git
    GIT_TAG main
)
FetchContent_MakeAvailable(dataframe)

# Option 2: If already installed in your system
# find_package(dataframe REQUIRED)

# Create an executable
add_executable(data_analysis main.cpp)
target_link_libraries(data_analysis PRIVATE dataframe)

Manual Installation

Alternatively, you can clone the repository and include it in your project:

Terminal Commands

git clone https://github.com/xaliphostes/dataframe.git
cd dataframe
mkdir build && cd build
cmake ..
make
make install  # May require sudo

Basic Project Structure

For this tutorial, we'll use the following project structure:


└── weather_analysis/
    ├── CMakeLists.txt
    ├── src/
    │   ├── main.cpp
    │   ├── data_loader.h
    │   ├── data_processor.h
    │   └── visualization.h
    └── data/
        └── weather_data.csv

Creating and Working with Series

The Serie class is the fundamental building block of the DataFrame library. It represents a column of data with elements of the same type, providing functionality similar to arrays but with powerful data processing capabilities.

Creating Series

Let's start by creating various types of Series:

Creating Series View full file

#include <dataframe/Serie.h>
#include <iostream>
#include <string>
#include <vector>

int main() {
    // Creating Series from initializer lists
    df::Serie<int> numbers{1, 2, 3, 4, 5};
    df::Serie<double> temperatures{22.5, 23.7, 25.2, 24.8, 21.9};
    df::Serie<std::string> cities{"New York", "Paris", "Tokyo", "London", "Sydney"};
    
    // Creating a Serie from a vector
    std::vector<double> humidity_vec{45.2, 52.8, 60.3, 55.7, 48.9};
    df::Serie<double> humidity(humidity_vec);
    
    // Creating a Serie of a specific size
    df::Serie<int> zeroes(5, 0);  // 5 elements, all initialized to 0
    
    // Print the Series
    std::cout << "Numbers: " << numbers << std::endl;
    std::cout << "Temperatures: " << temperatures << std::endl;
    std::cout << "Number of cities: " << cities.size() << std::endl;
    
    return 0;
}

Basic Serie Properties and Methods

Series provide several properties and methods for basic operations:

  • size(): Get the number of elements in the Serie
  • empty(): Check if the Serie has no elements
  • operator[]: Access an element by index
  • data() or asArray(): Get the underlying data as a vector
  • type(): Get the type of the Serie elements as a string

Basic Operations on Series

The DataFrame library provides various operations to work with Series. Let's explore some of the most commonly used ones:

Iterating Through Elements

forEach Example View full file

#include <dataframe/Serie.h>
#include <dataframe/forEach.h>
#include <iostream>

int main() {
    df::Serie<int> numbers{1, 2, 3, 4, 5};
    
    // Using the Serie method
    numbers.forEach([](int value, size_t index) {
        std::cout << "Element at index " << index << ": " << value << std::endl;
    });
    
    // Alternatively, using the free function
    df::forEach([](int value, size_t index) {
        std::cout << "Element at index " << index << ": " << value << std::endl;
    }, numbers);
    
    return 0;
}

Transforming Elements

map Example View full file

#include <dataframe/Serie.h>
#include <dataframe/map.h>
#include <iostream>

int main() {
    df::Serie<double> celsius{0.0, 10.0, 20.0, 30.0, 40.0};
    
    // Convert Celsius to Fahrenheit using the Serie method
    auto fahrenheit = celsius.map([](double c, size_t) {
        return c * 9.0/5.0 + 32.0;
    });
    
    // Print results
    std::cout << "Celsius: " << celsius << std::endl;
    std::cout << "Fahrenheit: " << fahrenheit << std::endl;
    
    return 0;
}

Filtering Elements

filter Example View full file

#include <dataframe/Serie.h>
#include <dataframe/filter.h>
#include <iostream>

int main() {
    df::Serie<int> numbers{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    
    // Get even numbers using the Serie method
    auto even_numbers = numbers.filter([](int n) {
        return n % 2 == 0;
    });
    
    // Get odd numbers using the free function
    auto odd_numbers = df::filter([](int n) {
        return n % 2 != 0;
    }, numbers);
    
    // Print results
    std::cout << "All numbers: " << numbers << std::endl;
    std::cout << "Even numbers: " << even_numbers << std::endl;
    std::cout << "Odd numbers: " << odd_numbers << std::endl;
    
    return 0;
}

Reducing to a Single Value

reduce Example View full file

#include <dataframe/Serie.h>
#include <dataframe/reduce.h>
#include <iostream>

int main() {
    df::Serie<int> numbers{1, 2, 3, 4, 5};
    
    // Calculate sum using the Serie method
    int sum = numbers.reduce([](int acc, int value, size_t) {
        return acc + value;
    }, 0);  // 0 is the initial value
    
    // Calculate product using the free function
    int product = df::reduce([](int acc, int value, size_t) {
        return acc * value;
    }, numbers, 1);  // 1 is the initial value
    
    // Print results
    std::cout << "Numbers: " << numbers << std::endl;
    std::cout << "Sum: " << sum << std::endl;
    std::cout << "Product: " << product << std::endl;
    
    return 0;
}

The Functional Approach to Data Processing

One of the strengths of the DataFrame library is its functional programming approach to data processing. This approach offers several benefits:

  • Immutability: Operations create new Series without modifying the original data
  • Composability: Operations can be easily combined into complex workflows
  • Readability: The intent of the code is clear and follows a declarative style
  • Maintainability: Logic is broken down into smaller, reusable functions

Combining Operations

Let's see how we can combine multiple operations to solve a more complex problem:

Combining Operations View full file

#include <dataframe/Serie.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>

int main() {
    // Initial data: temperatures in Celsius
    df::Serie<double> temperatures{
        -5.2, 3.7, 10.5, 15.8, 22.3, 28.6, 32.1, 26.4, 18.9, 12.5, 4.2, -2.8
    };
    
    // Step 1: Convert to Fahrenheit
    auto fahrenheit = temperatures.map([](double celsius, size_t) {
        return celsius * 9.0/5.0 + 32.0;
    });
    
    // Step 2: Filter out freezing temperatures (below 32°F)
    auto above_freezing = fahrenheit.filter([](double temp) {
        return temp > 32.0;
    });
    
    // Step 3: Apply a temperature comfort formula
    auto comfort_index = above_freezing.map([](double temp, size_t) {
        // A simplified comfort formula
        double optimal = 70.0;
        double distance = std::abs(temp - optimal);
        return 10.0 * std::exp(-0.05 * distance);
    });
    
    // Print results
    std::cout << "Original temperatures (°C): " << temperatures << std::endl;
    std::cout << "Temperatures in Fahrenheit: " << fahrenheit << std::endl;
    std::cout << "Above freezing temperatures: " << above_freezing << std::endl;
    std::cout << "Comfort indices: " << comfort_index << std::endl;
    
    return 0;
}

In the example above, we sequentially applied multiple operations to transform our data. However, this approach requires creating intermediate variables for each step, which can make the code more verbose. The DataFrame library offers a solution to this problem with pipelines.

Working with Pipelines

Pipelines allow you to chain operations together in a more concise and readable way. Instead of creating intermediate variables, you can use the pipe operator (|) or the pipe() function to pass data through a sequence of operations.

Using the Pipe Operator

Pipeline Example View full file

#include <dataframe/Serie.h>
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>

int main() {
    // Initial data: temperatures in Celsius
    df::Serie<double> temperatures{
        -5.2, 3.7, 10.5, 15.8, 22.3, 28.6, 32.1, 26.4, 18.9, 12.5, 4.2, -2.8
    };
    
    // Create bound operations for use in the pipeline
    auto to_fahrenheit = df::bind_map<double>([](double celsius, size_t) {
        return celsius * 9.0/5.0 + 32.0;
    });
    
    auto above_freezing = df::bind_filter<double>([](double temp) {
        return temp > 32.0;
    });
    
    auto comfort_calculation = df::bind_map<double>([](double temp, size_t) {
        double optimal = 70.0;
        double distance = std::abs(temp - optimal);
        return 10.0 * std::exp(-0.05 * distance);
    });
    
    // Apply the entire pipeline at once
    auto result = temperatures
        | to_fahrenheit
        | above_freezing
        | comfort_calculation;
    
    std::cout << "Comfort indices: " << result << std::endl;
    
    return 0;
}

Creating Reusable Pipeline Components

One of the benefits of pipelines is the ability to create reusable components that can be applied to different datasets:

Reusable Pipeline Components View full file

#include <dataframe/Serie.h>
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>

// Define reusable pipeline components
auto temperature_processor = df::make_pipe(
    // Convert to Fahrenheit
    df::bind_map<double>([](double celsius, size_t) {
        return celsius * 9.0/5.0 + 32.0;
    }),
    
    // Filter out freezing temperatures
    df::bind_filter<double>([](double temp) {
        return temp > 32.0;
    })
);

int main() {
    // Process multiple temperature datasets
    df::Serie<double> winter_temps{-5.2, -3.1, 0.5, 2.8, -1.3, -7.2};
    df::Serie<double> summer_temps{25.3, 28.6, 32.1, 30.4, 27.9, 26.5};
    
    // Apply the processing pipeline to each dataset
    auto winter_processed = temperature_processor(winter_temps);
    auto summer_processed = temperature_processor(summer_temps);
    
    std::cout << "Processed winter temperatures: " << winter_processed << std::endl;
    std::cout << "Processed summer temperatures: " << summer_processed << std::endl;
    
    return 0;
}

Case Study: Weather Data Analysis

Now let's build a more comprehensive example that brings together all the concepts we've learned. We'll create a weather data analysis system that can:

  • Load and parse weather data from a CSV file
  • Clean and preprocess the data
  • Perform statistical analysis
  • Detect anomalies and patterns

Step 1: Defining the Data Structures

data_types.h View full file

#pragma once
#include <string>
#include <dataframe/Serie.h>

namespace weather {

// Structure to hold a single weather record
struct WeatherRecord {
    std::string date;
    double temperature;
    double humidity;
    double pressure;
    double wind_speed;
    double precipitation;
    bool is_valid;
    
    // Default constructor
    WeatherRecord() 
        : temperature(0.0), humidity(0.0), pressure(0.0),
          wind_speed(0.0), precipitation(0.0), is_valid(false) {}
};

using WeatherSerie = df::Serie<WeatherRecord>;
using DateSerie = df::Serie<std::string>;
using DoubleSerie = df::Serie<double>;
using BoolSerie = df::Serie<bool>;

} // namespace weather

Step 2: Loading and Parsing Data

data_loader.h View full file

#pragma once
#include "data_types.h"
#include <dataframe/io/csv.h>
#include <string>
#include <stdexcept>

namespace weather {

class DataLoader {
public:
    // Load weather data from a CSV file
    static WeatherSerie loadFromCSV(const std::string& filename) {
        try {
            // Use the built-in CSV reader to load the data
            df::Dataframe data = df::io::read_csv(filename);
            
            // Now we need to convert the Dataframe (collection of series) to our WeatherSerie
            std::vector<WeatherRecord> records;
            size_t n = data.size() > 0 ? data.get<std::string>("date").size() : 0;
            
            for (size_t i = 0; i < n; ++i) {
                WeatherRecord record;
                
                record.date = data.get<std::string>("date")[i];
                record.temperature = data.get<double>("temperature")[i];
                record.humidity = data.get<double>("humidity")[i];
                record.pressure = data.get<double>("pressure")[i];
                record.wind_speed = data.get<double>("wind_speed")[i];
                record.precipitation = data.get<double>("precipitation")[i];
                record.is_valid = true;
                
                records.push_back(record);
            }
            
            return WeatherSerie(records);
        } catch (const std::exception& e) {
            throw std::runtime_error("Failed to load CSV file: " + std::string(e.what()));
        }
    }
};

} // namespace weather

Step 3: Processing and Analyzing Data

data_processor.h View full file

#pragma once
#include "data_types.h"
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <dataframe/zip.h>
#include <dataframe/unzip.h>
#include <limits>
#include <cmath>

namespace weather {

class DataProcessor {
public:
    // Clean the data by removing invalid records
    static WeatherSerie cleanData(const WeatherSerie& data) {
        return data
            | df::bind_filter<WeatherRecord>([](const WeatherRecord& record) {
                return record.is_valid;
              });
    }
    
    // Extract temperature data for analysis
    static DoubleSerie extractTemperatures(const WeatherSerie& data) {
        return data.map([](const WeatherRecord& record, size_t) {
            return record.temperature;
        });
    }
    
    // Calculate average temperature
    static double calculateAverageTemperature(const WeatherSerie& data) {
        auto temperatures = extractTemperatures(data);
        
        return temperatures.reduce([](double acc, double temp, size_t idx, const auto& serie) {
            return acc + temp / serie.size();
        }, 0.0);
    }
};

} // namespace weather

Step 4: Building the Main Application

main.cpp View full file

#include "data_types.h"
#include "data_loader.h"
#include "data_processor.h"
#include <iostream>
#include <string>

int main(int argc, char* argv[]) {
    try {
        weather::WeatherSerie data;
        
        // Load data from file
        if (argc > 1) {
            std::string filename = argv[1];
            std::cout << "Loading weather data from: " << filename << std::endl;
            data = weather::DataLoader::loadFromCSV(filename);
        } else {
            std::cout << "No input file specified. Exiting." << std::endl;
            return 1;
        }
        
        std::cout << "Loaded " << data.size() << " weather records." << std::endl;
        
        // Clean the data
        auto clean_data = weather::DataProcessor::cleanData(data);
        std::cout << "After cleaning: " << clean_data.size() << " valid records." << std::endl;
        
        // Calculate average temperature
        double avg_temp = weather::DataProcessor::calculateAverageTemperature(clean_data);
        std::cout << "Average temperature: " << avg_temp << "°C" << std::endl;
        
        return 0;
    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }
}

Sample Output

Loading weather data from: weather_data.csv
Loaded 365 weather records.
After cleaning: 361 valid records.
Average temperature: 15.37°C

Conclusion

In this tutorial, we've explored how to use the DataFrame library to build comprehensive data analysis pipelines. We've covered:

  • Creating and manipulating Series of various data types
  • Applying basic operations like map, filter, and reduce
  • Building expressive data pipelines using the functional programming paradigm
  • Creating reusable pipeline components
  • Implementing a complete weather data analysis system

The functional approach provided by the DataFrame library makes complex data analysis tasks more manageable by breaking them down into composable, reusable operations. This leads to code that is more readable, maintainable, and expressive.

For more advanced topics, refer to the API documentation, which covers additional features like KDTree for spatial queries, zip/unzip operations, and various mathematical functions.