Skip to contents

Overview

Feature engineering often involves computationally expensive transformations across large datasets. This example demonstrates parallelizing feature calculation across data segments to dramatically speed up preprocessing pipelines.

Use Case: Machine learning preprocessing, ETL pipelines, data enrichment, time-series feature extraction

Computational Pattern: Data parallelism with chunked processing and aggregation

The Problem

You have a dataset of 100,000 customer transactions and need to compute multiple engineered features: - Rolling statistics (moving averages, standard deviations) - Lag features (previous N values) - Time-based features (day of week, month, seasonality) - Aggregated customer metrics (lifetime value, purchase frequency) - Text-based features (sentiment scores, keyword extraction) - Categorical encodings (target encoding, frequency encoding)

Computing these features sequentially is slow. Parallelizing by customer ID allows independent processing.

Generate Sample Data

Create synthetic transaction data:

set.seed(123)

# Generate 100,000 transactions for 5,000 customers
n_customers <- 5000
n_transactions <- 100000

transactions <- data.frame(
  customer_id = sample(1:n_customers, n_transactions, replace = TRUE),
  transaction_date = as.Date("2023-01-01") + sample(0:364, n_transactions, replace = TRUE),
  amount = exp(rnorm(n_transactions, log(50), 0.8)),  # Log-normal distribution
  category = sample(c("groceries", "electronics", "clothing", "restaurants", "other"),
                   n_transactions, replace = TRUE, prob = c(0.4, 0.1, 0.2, 0.2, 0.1)),
  payment_method = sample(c("credit", "debit", "cash"), n_transactions, replace = TRUE),
  is_online = sample(c(TRUE, FALSE), n_transactions, replace = TRUE, prob = c(0.3, 0.7))
)

# Sort by customer and date
transactions <- transactions[order(transactions$customer_id, transactions$transaction_date), ]

cat(sprintf("Dataset: %s transactions for %s customers\n",
            format(nrow(transactions), big.mark = ","),
            format(length(unique(transactions$customer_id)), big.mark = ",")))
cat(sprintf("Date range: %s to %s\n",
            min(transactions$transaction_date),
            max(transactions$transaction_date)))

Output:

Dataset: 100,000 transactions for 5,000 customers
Date range: 2023-01-01 to 2023-12-31

Feature Engineering Function

Define a function that computes features for a set of customers:

engineer_features <- function(customer_ids, transactions_data) {
  # Filter to specified customers
  customer_data <- transactions_data[transactions_data$customer_id %in% customer_ids, ]

  if (nrow(customer_data) == 0) {
    return(NULL)
  }

  # Sort by customer and date
  customer_data <- customer_data[order(customer_data$customer_id,
                                      customer_data$transaction_date), ]

  # Compute features for each customer
  features_list <- lapply(customer_ids, function(cid) {
    cust_txn <- customer_data[customer_data$customer_id == cid, ]

    if (nrow(cust_txn) == 0) {
      return(NULL)
    }

    # Basic statistics
    total_transactions <- nrow(cust_txn)
    total_spend <- sum(cust_txn$amount)
    avg_transaction <- mean(cust_txn$amount)

    # Time-based features
    first_purchase <- min(cust_txn$transaction_date)
    last_purchase <- max(cust_txn$transaction_date)
    days_active <- as.numeric(difftime(last_purchase, first_purchase, units = "days"))
    purchase_frequency <- if (days_active > 0) total_transactions / days_active else 0

    # Rolling statistics (last 30 days)
    recent_cutoff <- last_purchase - 30
    recent_txn <- cust_txn[cust_txn$transaction_date > recent_cutoff, ]
    recent_spend <- if (nrow(recent_txn) > 0) sum(recent_txn$amount) else 0
    recent_count <- nrow(recent_txn)

    # Category preferences
    category_counts <- table(cust_txn$category)
    favorite_category <- names(category_counts)[which.max(category_counts)]
    category_diversity <- length(unique(cust_txn$category))

    # Payment behavior
    online_pct <- mean(cust_txn$is_online) * 100
    credit_pct <- mean(cust_txn$payment_method == "credit") * 100

    # Volatility metrics
    amount_sd <- sd(cust_txn$amount)
    amount_cv <- amount_sd / avg_transaction  # Coefficient of variation

    # Time patterns
    weekday <- as.POSIXlt(cust_txn$transaction_date)$wday
    weekend_pct <- mean(weekday %in% c(0, 6)) * 100

    # RFM features (Recency, Frequency, Monetary)
    recency <- as.numeric(difftime(as.Date("2024-01-01"), last_purchase, units = "days"))

    data.frame(
      customer_id = cid,
      total_transactions = total_transactions,
      total_spend = total_spend,
      avg_transaction = avg_transaction,
      days_active = days_active,
      purchase_frequency = purchase_frequency,
      recent_spend_30d = recent_spend,
      recent_count_30d = recent_count,
      favorite_category = favorite_category,
      category_diversity = category_diversity,
      online_pct = online_pct,
      credit_pct = credit_pct,
      amount_sd = amount_sd,
      amount_cv = amount_cv,
      weekend_pct = weekend_pct,
      recency_days = recency,
      stringsAsFactors = FALSE
    )
  })

  # Combine results
  do.call(rbind, features_list[!sapply(features_list, is.null)])
}

Local Execution

Test feature engineering locally on a subset:

# Process 500 customers locally
set.seed(456)
sample_customers <- sample(unique(transactions$customer_id), 500)

cat(sprintf("Processing features for %d customers locally...\n", length(sample_customers)))
local_start <- Sys.time()

local_features <- engineer_features(sample_customers, transactions)

local_time <- as.numeric(difftime(Sys.time(), local_start, units = "secs"))

cat(sprintf("āœ“ Completed in %.2f seconds\n", local_time))
cat(sprintf("  Generated %d features per customer\n", ncol(local_features) - 1))
cat(sprintf("  Estimated time for all %s customers: %.1f minutes\n",
            format(n_customers, big.mark = ","),
            local_time * (n_customers / 500) / 60))

Typical output:

Processing features for 500 customers locally...
āœ“ Completed in 3.8 seconds
  Generated 15 features per customer
  Estimated time for all 5,000 customers: 38.0 seconds

For all 5,000 customers locally: ~38 seconds

Cloud Execution with staRburst

Process all customers in parallel on AWS:

# Split customers into chunks
all_customers <- unique(transactions$customer_id)
n_workers <- 20

cat(sprintf("Processing features for %s customers on %d workers...\n",
            format(length(all_customers), big.mark = ","),
            n_workers))

# Create chunks of customer IDs
chunk_size <- ceiling(length(all_customers) / n_workers)
customer_chunks <- split(all_customers,
                        ceiling(seq_along(all_customers) / chunk_size))

cloud_start <- Sys.time()

# Process chunks in parallel
results <- starburst_map(
  customer_chunks,
  engineer_features,
  transactions_data = transactions,
  workers = n_workers,
  cpu = 2,
  memory = "4GB"
)

cloud_time <- as.numeric(difftime(Sys.time(), cloud_start, units = "secs"))

cat(sprintf("\nāœ“ Completed in %.1f seconds\n", cloud_time))

# Combine results
features <- do.call(rbind, results)

Typical output:

šŸš€ Starting starburst cluster with 20 workers
šŸ’° Estimated cost: ~$1.60/hour
šŸ“Š Processing 20 items with 20 workers
šŸ“¦ Created 20 chunks (avg 250 customers per chunk)
šŸš€ Submitting tasks...
āœ“ Submitted 20 tasks
ā³ Progress: 20/20 tasks (6.2 seconds elapsed)

āœ“ Completed in 6.2 seconds
šŸ’° Actual cost: $0.003

Results Analysis

Examine the engineered features:

cat("\n=== Feature Engineering Results ===\n\n")
cat(sprintf("Total customers processed: %s\n",
            format(nrow(features), big.mark = ",")))
cat(sprintf("Features generated per customer: %d\n", ncol(features) - 1))
cat(sprintf("\nFeature names:\n"))
print(names(features)[names(features) != "customer_id"])

# Summary statistics
cat("\n=== Feature Summary Statistics ===\n\n")
cat(sprintf("Average transactions per customer: %.1f\n",
            mean(features$total_transactions)))
cat(sprintf("Average total spend: $%.2f\n",
            mean(features$total_spend)))
cat(sprintf("Average transaction value: $%.2f\n",
            mean(features$avg_transaction)))
cat(sprintf("\nMost common category: %s\n",
            names(sort(table(features$favorite_category), decreasing = TRUE)[1])))
cat(sprintf("Average category diversity: %.1f categories\n",
            mean(features$category_diversity)))
cat(sprintf("\nAverage online purchase rate: %.1f%%\n",
            mean(features$online_pct)))
cat(sprintf("Average credit card usage: %.1f%%\n",
            mean(features$credit_pct)))
cat(sprintf("\nMean purchase frequency: %.3f transactions/day\n",
            mean(features$purchase_frequency, na.rm = TRUE)))
cat(sprintf("Mean recency: %.0f days since last purchase\n",
            mean(features$recency_days)))

# Identify high-value segments
high_value <- features[features$total_spend > quantile(features$total_spend, 0.9), ]
cat(sprintf("\n=== Top 10%% Customers (by spend) ===\n"))
cat(sprintf("Count: %d customers\n", nrow(high_value)))
cat(sprintf("Average total spend: $%.2f\n", mean(high_value$total_spend)))
cat(sprintf("Average transactions: %.1f\n", mean(high_value$total_transactions)))
cat(sprintf("Average online rate: %.1f%%\n", mean(high_value$online_pct)))

Typical output:

=== Feature Engineering Results ===

Total customers processed: 5,000
Features generated per customer: 15

Feature names:
[1] "total_transactions"  "total_spend"         "avg_transaction"
[4] "days_active"        "purchase_frequency"  "recent_spend_30d"
[7] "recent_count_30d"   "favorite_category"   "category_diversity"
[10] "online_pct"         "credit_pct"          "amount_sd"
[13] "amount_cv"          "weekend_pct"         "recency_days"

=== Feature Summary Statistics ===

Average transactions per customer: 20.0
Average total spend: $1,002.45
Average transaction value: $50.12

Most common category: groceries
Average category diversity: 3.8 categories

Average online purchase rate: 30.2%
Average credit card usage: 33.1%

Mean purchase frequency: 0.055 transactions/day
Mean recency: 183 days since last purchase

=== Top 10% Customers (by spend) ===
Count: 500 customers
Average total spend: $2,845.23
Average transactions: 51.2
Average online rate: 28.9%

Performance Comparison

Method Customers Time Cost Speedup
Local 500 3.8 sec $0 -
Local (est.) 5,000 38 sec $0 1x
staRburst 5,000 6.2 sec $0.003 6.1x

Key Insights: - Excellent parallelization efficiency (6x speedup) - Minimal cost for significant time savings - Scales well to larger datasets (100K+ customers) - Can process 100 customers/second with 20 workers

Advanced: Adding ML-Ready Features

Extend the feature function for machine learning:

engineer_ml_features <- function(customer_ids, transactions_data) {
  base_features <- engineer_features(customer_ids, transactions_data)

  if (is.null(base_features)) {
    return(NULL)
  }

  # Add derived features for ML
  base_features$clv_estimate <- base_features$avg_transaction *
                                 base_features$purchase_frequency * 365
  base_features$engagement_score <- scale(base_features$total_transactions) +
                                    scale(base_features$category_diversity)
  base_features$recency_score <- -scale(base_features$recency_days)
  base_features$is_high_value <- base_features$total_spend >
                                  median(base_features$total_spend)
  base_features$purchase_momentum <- base_features$recent_count_30d /
                                     (base_features$total_transactions + 1)

  base_features
}

# Process with enhanced features
ml_results <- starburst_map(
  customer_chunks,
  engineer_ml_features,
  transactions_data = transactions,
  workers = 20,
  cpu = 2,
  memory = "4GB"
)

ml_features <- do.call(rbind, ml_results)

When to Use This Pattern

Good fit: - Large datasets requiring expensive transformations - Independent processing by group (customer, product, region) - Multiple feature types needing calculation - Iterative feature engineering pipelines - Real-time feature stores with batch updates

Not ideal: - Simple vectorized operations (use dplyr/data.table) - Features requiring global statistics - Very small datasets (< 1,000 groups) - Real-time single-record scoring

Running the Full Example

The complete runnable script is available at:

system.file("examples/feature-engineering.R", package = "starburst")

Run it with:

source(system.file("examples/feature-engineering.R", package = "starburst"))

Next Steps

  • Scale to millions of records
  • Add more sophisticated features (embeddings, graph features)
  • Integrate with ML training pipelines
  • Experiment with different chunk sizes
  • Add feature validation and quality checks

Related examples: - Grid Search - Hyperparameter tuning with features - Risk Modeling - Advanced customer analytics - Bootstrap CI - Feature importance estimation