Building a Data Analysis Pipeline with DataFrame
A comprehensive tutorial on creating efficient data processing workflows
Introduction
The DataFrame library offers a powerful way to process and analyze data using modern C++ techniques. It's designed around the concept of Series (columns of data) and functional operations that can be chained together to create clear, expressive data processing pipelines.
This tutorial will guide you through building a complete data analysis workflow using the DataFrame library. We'll start with the basics and gradually move to more advanced techniques, showing how the functional approach enables clean, maintainable, and efficient data processing code.
By the end of this tutorial, you'll be able to:
- Create and manipulate Series of various data types
- Apply transformations, filters, and reductions on your data
- Combine multiple operations into efficient pipelines
- Handle real-world data analysis scenarios
- Implement a complete weather data analysis system
Installation and Setup
Before diving into the DataFrame library, you'll need to set up your development environment. The library can be installed through various methods:
Using CMake (Recommended)
# In your CMakeLists.txt
cmake_minimum_required(VERSION 3.14)
project(MyDataAnalysis)
# Option 1: Using FetchContent
include(FetchContent)
FetchContent_Declare(
dataframe
GIT_REPOSITORY https://github.com/xaliphostes/dataframe.git
GIT_TAG main
)
FetchContent_MakeAvailable(dataframe)
# Option 2: If already installed in your system
# find_package(dataframe REQUIRED)
# Create an executable
add_executable(data_analysis main.cpp)
target_link_libraries(data_analysis PRIVATE dataframe)
Manual Installation
Alternatively, you can clone the repository and include it in your project:
git clone https://github.com/xaliphostes/dataframe.git
cd dataframe
mkdir build && cd build
cmake ..
make
make install # May require sudo
Basic Project Structure
For this tutorial, we'll use the following project structure:
└── weather_analysis/
├── CMakeLists.txt
├── src/
│ ├── main.cpp
│ ├── data_loader.h
│ ├── data_processor.h
│ └── visualization.h
└── data/
└── weather_data.csv
Creating and Working with Series
The Serie class is the fundamental building block of the DataFrame library. It
represents
a column of data with elements of the same type, providing functionality similar to arrays but
with
powerful data processing capabilities.
Creating Series
Let's start by creating various types of Series:
#include <dataframe/Serie.h>
#include <iostream>
#include <string>
#include <vector>
int main() {
// Creating Series from initializer lists
df::Serie<int> numbers{1, 2, 3, 4, 5};
df::Serie<double> temperatures{22.5, 23.7, 25.2, 24.8, 21.9};
df::Serie<std::string> cities{"New York", "Paris", "Tokyo", "London", "Sydney"};
// Creating a Serie from a vector
std::vector<double> humidity_vec{45.2, 52.8, 60.3, 55.7, 48.9};
df::Serie<double> humidity(humidity_vec);
// Creating a Serie of a specific size
df::Serie<int> zeroes(5, 0); // 5 elements, all initialized to 0
// Print the Series
std::cout << "Numbers: " << numbers << std::endl;
std::cout << "Temperatures: " << temperatures << std::endl;
std::cout << "Number of cities: " << cities.size() << std::endl;
return 0;
}
Basic Serie Properties and Methods
Series provide several properties and methods for basic operations:
size(): Get the number of elements in the Serieempty(): Check if the Serie has no elementsoperator[]: Access an element by indexdata()orasArray(): Get the underlying data as a vectortype(): Get the type of the Serie elements as a string
Basic Operations on Series
The DataFrame library provides various operations to work with Series. Let's explore some of the most commonly used ones:
Iterating Through Elements
#include <dataframe/Serie.h>
#include <dataframe/forEach.h>
#include <iostream>
int main() {
df::Serie<int> numbers{1, 2, 3, 4, 5};
// Using the Serie method
numbers.forEach([](int value, size_t index) {
std::cout << "Element at index " << index << ": " << value << std::endl;
});
// Alternatively, using the free function
df::forEach([](int value, size_t index) {
std::cout << "Element at index " << index << ": " << value << std::endl;
}, numbers);
return 0;
}
Transforming Elements
#include <dataframe/Serie.h>
#include <dataframe/map.h>
#include <iostream>
int main() {
df::Serie<double> celsius{0.0, 10.0, 20.0, 30.0, 40.0};
// Convert Celsius to Fahrenheit using the Serie method
auto fahrenheit = celsius.map([](double c, size_t) {
return c * 9.0/5.0 + 32.0;
});
// Print results
std::cout << "Celsius: " << celsius << std::endl;
std::cout << "Fahrenheit: " << fahrenheit << std::endl;
return 0;
}
Filtering Elements
#include <dataframe/Serie.h>
#include <dataframe/filter.h>
#include <iostream>
int main() {
df::Serie<int> numbers{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Get even numbers using the Serie method
auto even_numbers = numbers.filter([](int n) {
return n % 2 == 0;
});
// Get odd numbers using the free function
auto odd_numbers = df::filter([](int n) {
return n % 2 != 0;
}, numbers);
// Print results
std::cout << "All numbers: " << numbers << std::endl;
std::cout << "Even numbers: " << even_numbers << std::endl;
std::cout << "Odd numbers: " << odd_numbers << std::endl;
return 0;
}
Reducing to a Single Value
#include <dataframe/Serie.h>
#include <dataframe/reduce.h>
#include <iostream>
int main() {
df::Serie<int> numbers{1, 2, 3, 4, 5};
// Calculate sum using the Serie method
int sum = numbers.reduce([](int acc, int value, size_t) {
return acc + value;
}, 0); // 0 is the initial value
// Calculate product using the free function
int product = df::reduce([](int acc, int value, size_t) {
return acc * value;
}, numbers, 1); // 1 is the initial value
// Print results
std::cout << "Numbers: " << numbers << std::endl;
std::cout << "Sum: " << sum << std::endl;
std::cout << "Product: " << product << std::endl;
return 0;
}
The Functional Approach to Data Processing
One of the strengths of the DataFrame library is its functional programming approach to data processing. This approach offers several benefits:
- Immutability: Operations create new Series without modifying the original data
- Composability: Operations can be easily combined into complex workflows
- Readability: The intent of the code is clear and follows a declarative style
- Maintainability: Logic is broken down into smaller, reusable functions
Combining Operations
Let's see how we can combine multiple operations to solve a more complex problem:
#include <dataframe/Serie.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>
int main() {
// Initial data: temperatures in Celsius
df::Serie<double> temperatures{
-5.2, 3.7, 10.5, 15.8, 22.3, 28.6, 32.1, 26.4, 18.9, 12.5, 4.2, -2.8
};
// Step 1: Convert to Fahrenheit
auto fahrenheit = temperatures.map([](double celsius, size_t) {
return celsius * 9.0/5.0 + 32.0;
});
// Step 2: Filter out freezing temperatures (below 32°F)
auto above_freezing = fahrenheit.filter([](double temp) {
return temp > 32.0;
});
// Step 3: Apply a temperature comfort formula
auto comfort_index = above_freezing.map([](double temp, size_t) {
// A simplified comfort formula
double optimal = 70.0;
double distance = std::abs(temp - optimal);
return 10.0 * std::exp(-0.05 * distance);
});
// Print results
std::cout << "Original temperatures (°C): " << temperatures << std::endl;
std::cout << "Temperatures in Fahrenheit: " << fahrenheit << std::endl;
std::cout << "Above freezing temperatures: " << above_freezing << std::endl;
std::cout << "Comfort indices: " << comfort_index << std::endl;
return 0;
}
In the example above, we sequentially applied multiple operations to transform our data. However, this approach requires creating intermediate variables for each step, which can make the code more verbose. The DataFrame library offers a solution to this problem with pipelines.
Working with Pipelines
Pipelines allow you to chain operations together in a more concise and readable way.
Instead of creating intermediate variables, you can use the pipe operator (|) or
the
pipe() function to pass data through a sequence of operations.
Using the Pipe Operator
#include <dataframe/Serie.h>
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>
int main() {
// Initial data: temperatures in Celsius
df::Serie<double> temperatures{
-5.2, 3.7, 10.5, 15.8, 22.3, 28.6, 32.1, 26.4, 18.9, 12.5, 4.2, -2.8
};
// Create bound operations for use in the pipeline
auto to_fahrenheit = df::bind_map<double>([](double celsius, size_t) {
return celsius * 9.0/5.0 + 32.0;
});
auto above_freezing = df::bind_filter<double>([](double temp) {
return temp > 32.0;
});
auto comfort_calculation = df::bind_map<double>([](double temp, size_t) {
double optimal = 70.0;
double distance = std::abs(temp - optimal);
return 10.0 * std::exp(-0.05 * distance);
});
// Apply the entire pipeline at once
auto result = temperatures
| to_fahrenheit
| above_freezing
| comfort_calculation;
std::cout << "Comfort indices: " << result << std::endl;
return 0;
}
Creating Reusable Pipeline Components
One of the benefits of pipelines is the ability to create reusable components that can be applied to different datasets:
#include <dataframe/Serie.h>
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <iostream>
#include <cmath>
// Define reusable pipeline components
auto temperature_processor = df::make_pipe(
// Convert to Fahrenheit
df::bind_map<double>([](double celsius, size_t) {
return celsius * 9.0/5.0 + 32.0;
}),
// Filter out freezing temperatures
df::bind_filter<double>([](double temp) {
return temp > 32.0;
})
);
int main() {
// Process multiple temperature datasets
df::Serie<double> winter_temps{-5.2, -3.1, 0.5, 2.8, -1.3, -7.2};
df::Serie<double> summer_temps{25.3, 28.6, 32.1, 30.4, 27.9, 26.5};
// Apply the processing pipeline to each dataset
auto winter_processed = temperature_processor(winter_temps);
auto summer_processed = temperature_processor(summer_temps);
std::cout << "Processed winter temperatures: " << winter_processed << std::endl;
std::cout << "Processed summer temperatures: " << summer_processed << std::endl;
return 0;
}
Case Study: Weather Data Analysis
Now let's build a more comprehensive example that brings together all the concepts we've learned. We'll create a weather data analysis system that can:
- Load and parse weather data from a CSV file
- Clean and preprocess the data
- Perform statistical analysis
- Detect anomalies and patterns
Step 1: Defining the Data Structures
#pragma once
#include <string>
#include <dataframe/Serie.h>
namespace weather {
// Structure to hold a single weather record
struct WeatherRecord {
std::string date;
double temperature;
double humidity;
double pressure;
double wind_speed;
double precipitation;
bool is_valid;
// Default constructor
WeatherRecord()
: temperature(0.0), humidity(0.0), pressure(0.0),
wind_speed(0.0), precipitation(0.0), is_valid(false) {}
};
using WeatherSerie = df::Serie<WeatherRecord>;
using DateSerie = df::Serie<std::string>;
using DoubleSerie = df::Serie<double>;
using BoolSerie = df::Serie<bool>;
} // namespace weather
Step 2: Loading and Parsing Data
#pragma once
#include "data_types.h"
#include <dataframe/io/csv.h>
#include <string>
#include <stdexcept>
namespace weather {
class DataLoader {
public:
// Load weather data from a CSV file
static WeatherSerie loadFromCSV(const std::string& filename) {
try {
// Use the built-in CSV reader to load the data
df::Dataframe data = df::io::read_csv(filename);
// Now we need to convert the Dataframe (collection of series) to our WeatherSerie
std::vector<WeatherRecord> records;
size_t n = data.size() > 0 ? data.get<std::string>("date").size() : 0;
for (size_t i = 0; i < n; ++i) {
WeatherRecord record;
record.date = data.get<std::string>("date")[i];
record.temperature = data.get<double>("temperature")[i];
record.humidity = data.get<double>("humidity")[i];
record.pressure = data.get<double>("pressure")[i];
record.wind_speed = data.get<double>("wind_speed")[i];
record.precipitation = data.get<double>("precipitation")[i];
record.is_valid = true;
records.push_back(record);
}
return WeatherSerie(records);
} catch (const std::exception& e) {
throw std::runtime_error("Failed to load CSV file: " + std::string(e.what()));
}
}
};
} // namespace weather
Step 3: Processing and Analyzing Data
#pragma once
#include "data_types.h"
#include <dataframe/pipe.h>
#include <dataframe/map.h>
#include <dataframe/filter.h>
#include <dataframe/reduce.h>
#include <dataframe/zip.h>
#include <dataframe/unzip.h>
#include <limits>
#include <cmath>
namespace weather {
class DataProcessor {
public:
// Clean the data by removing invalid records
static WeatherSerie cleanData(const WeatherSerie& data) {
return data
| df::bind_filter<WeatherRecord>([](const WeatherRecord& record) {
return record.is_valid;
});
}
// Extract temperature data for analysis
static DoubleSerie extractTemperatures(const WeatherSerie& data) {
return data.map([](const WeatherRecord& record, size_t) {
return record.temperature;
});
}
// Calculate average temperature
static double calculateAverageTemperature(const WeatherSerie& data) {
auto temperatures = extractTemperatures(data);
return temperatures.reduce([](double acc, double temp, size_t idx, const auto& serie) {
return acc + temp / serie.size();
}, 0.0);
}
};
} // namespace weather
Step 4: Building the Main Application
#include "data_types.h"
#include "data_loader.h"
#include "data_processor.h"
#include <iostream>
#include <string>
int main(int argc, char* argv[]) {
try {
weather::WeatherSerie data;
// Load data from file
if (argc > 1) {
std::string filename = argv[1];
std::cout << "Loading weather data from: " << filename << std::endl;
data = weather::DataLoader::loadFromCSV(filename);
} else {
std::cout << "No input file specified. Exiting." << std::endl;
return 1;
}
std::cout << "Loaded " << data.size() << " weather records." << std::endl;
// Clean the data
auto clean_data = weather::DataProcessor::cleanData(data);
std::cout << "After cleaning: " << clean_data.size() << " valid records." << std::endl;
// Calculate average temperature
double avg_temp = weather::DataProcessor::calculateAverageTemperature(clean_data);
std::cout << "Average temperature: " << avg_temp << "°C" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
}
Sample Output
Loading weather data from: weather_data.csv Loaded 365 weather records. After cleaning: 361 valid records. Average temperature: 15.37°C
Conclusion
In this tutorial, we've explored how to use the DataFrame library to build comprehensive data analysis pipelines. We've covered:
- Creating and manipulating Series of various data types
- Applying basic operations like map, filter, and reduce
- Building expressive data pipelines using the functional programming paradigm
- Creating reusable pipeline components
- Implementing a complete weather data analysis system
The functional approach provided by the DataFrame library makes complex data analysis tasks more manageable by breaking them down into composable, reusable operations. This leads to code that is more readable, maintainable, and expressive.
For more advanced topics, refer to the API documentation, which covers additional features like KDTree for spatial queries, zip/unzip operations, and various mathematical functions.