Kwiz Computing Technologies Kwiz Computing Technologies
  • Home
  • Solutions
  • Environment
  • Technology
  • Kwiz Quants
  • Blog
  • About
  • Contact

Data Engineering in R: ETL Pipelines That Hold

enterprise-data-science
data-engineering
R’s {targets}, {arrow}, and DBI ecosystem builds reliable ETL pipelines under a few hundred GB. Full pipeline pattern with M-Pesa aggregation example.
Author

Kwiz Computing Technologies

Published

April 23, 2026

Keywords

data engineering R, ETL pipeline R, enterprise data science Africa, targets package R, DBI PostgreSQL R

Every data engineering conversation eventually assumes Python. The Airflow docs assume Python. The dbt tutorials assume Python. Most “modern data stack” blog posts treat Python as the only serious choice for pipeline orchestration. Your R team is left wondering whether to rewrite months of analytical work in a language chosen for its ecosystem brand rather than its fit for the problem.

R’s data engineering ecosystem is more capable than this framing suggests. The {targets} package provides DAG-based pipeline orchestration with dependency tracking and incremental execution. {arrow} gives you columnar storage and out-of-core processing. {dm} models relational schemas directly in R, enforcing foreign key constraints before anything hits the warehouse. {DBI} with {RPostgres} writes cleanly to any PostgreSQL-compatible warehouse. For analytical ETL workloads under a few hundred gigabytes, this stack matches what most Python teams build, without a rewrite.

The Realistic Data Mess: M-Pesa Aggregation

Consider a Nairobi-based B2B distributor that processes payments through Safaricom’s M-Pesa Bulk Disbursement API. Every morning, the finance team exports three separate data artefacts:

  • M-Pesa transaction CSVs from the Business Pay portal, one file per settlement batch, inconsistent column names across Safaricom API versions
  • USSD session logs from their internal USSD shortcode, a pipe-delimited flat file recording which merchants initiated which sessions
  • CRM export from their Salesforce instance, JSON-formatted, containing merchant master data including county, tier, and assigned sales rep

The current process: a finance analyst manually downloads all three, pastes them into a master Excel sheet, and sends it to the sales director by 10 a.m. When the analyst is out sick, the director works blind. When two M-Pesa API versions are running simultaneously (which Safaricom does during gradual rollouts), the manual merge produces silent duplicates.

This is the pipeline we will build.

Why {targets} Fits Data Engineering

{targets} is a Make-like pipeline orchestrator for R. Each unit of computation is a “target” defined by a name, a function, and its dependencies. When you run the pipeline, {targets} builds a directed acyclic graph (DAG) of all targets, checks which outputs are already up-to-date, and skips recomputation for any target whose inputs have not changed.

That behaviour matters for ETL. If the CRM export has not changed since yesterday’s run, {targets} will not re-parse and re-validate it. If only the M-Pesa CSV is new, only the targets downstream of that file get recomputed. This is incremental execution without writing any custom caching logic.

The entire pipeline lives in a _targets.R file at the project root:

# _targets.R
library(targets)
library(tarchetypes)

tar_option_set(
  packages = c(
    "dplyr", "readr", "jsonlite", "arrow",
    "DBI", "RPostgres", "pointblank", "lubridate"
  ),
  # Store intermediate targets as Arrow/Parquet files
  format = "parquet"
)

# Source pipeline functions
tar_source("R/")

list(
  # ── Raw ingest targets ───────────────────────────────────────────────────
  tar_target(
    mpesa_raw,
    ingest_mpesa_csvs("data/raw/mpesa/"),
    format = "parquet"
  ),
  tar_target(
    ussd_raw,
    ingest_ussd_logs("data/raw/ussd/session_log.psv"),
    format = "parquet"
  ),
  tar_target(
    crm_raw,
    ingest_crm_json("data/raw/crm/merchants.json"),
    format = "parquet"
  ),

  # ── Validation targets ───────────────────────────────────────────────────
  tar_target(mpesa_validated,   validate_mpesa(mpesa_raw)),
  tar_target(ussd_validated,    validate_ussd(ussd_raw)),
  tar_target(crm_validated,     validate_crm(crm_raw)),

  # ── Transform to canonical schema ───────────────────────────────────────
  tar_target(
    transactions_canonical,
    build_transactions(mpesa_validated, ussd_validated, crm_validated),
    format = "parquet"
  ),

  # ── Warehouse write ──────────────────────────────────────────────────────
  tar_target(
    warehouse_write,
    write_to_warehouse(transactions_canonical),
    format = "rds"   # Return a status object, not the data itself
  )
)

Run the full pipeline with tar_make(). Inspect the dependency graph with tar_visnetwork(). Check which targets are outdated with tar_outdated(). These three functions cover most of the operator workflow.

Raw Ingest Functions

The M-Pesa portal produces CSVs with shifting column names. Safaricom’s API v2 uses TransactionID; v3 uses transaction_id. The ingest function normalises both:

# R/ingest.R

ingest_mpesa_csvs <- function(dir_path) {
  csv_files <- list.files(dir_path, pattern = "\\.csv$", full.names = TRUE)

  if (length(csv_files) == 0) {
    stop("No M-Pesa CSV files found in: ", dir_path)
  }

  purrr::map(csv_files, function(f) {
    df <- readr::read_csv(f, show_col_types = FALSE)

    # Normalise column names: handle v2 and v3 API schemas
    df |>
      dplyr::rename_with(tolower) |>
      dplyr::rename_with(~ gsub(" ", "_", .x)) |>
      # Coerce either naming convention to canonical names
      dplyr::rename(
        any_of(c(
          transaction_id    = "transactionid",
          merchant_phone    = "phone_number",
          amount_kes        = "amount",
          transaction_date  = "date",
          transaction_date  = "transactiondate"
        ))
      ) |>
      dplyr::mutate(
        source_file = basename(f),
        ingested_at = lubridate::now("Africa/Nairobi")
      )
  }) |>
    dplyr::bind_rows()
}

ingest_ussd_logs <- function(file_path) {
  readr::read_delim(
    file_path,
    delim = "|",
    col_types = readr::cols(
      session_id   = readr::col_character(),
      phone        = readr::col_character(),
      initiated_at = readr::col_datetime(format = "%Y-%m-%d %H:%M:%S"),
      menu_path    = readr::col_character(),
      completed    = readr::col_logical()
    )
  ) |>
    dplyr::mutate(ingested_at = lubridate::now("Africa/Nairobi"))
}

ingest_crm_json <- function(file_path) {
  raw <- jsonlite::read_json(file_path, simplifyVector = TRUE)
  tibble::as_tibble(raw) |>
    dplyr::mutate(ingested_at = lubridate::now("Africa/Nairobi"))
}

Keeping ingest functions pure (no side effects, no database calls) means {targets} can cache their outputs independently. If only the USSD log changes, the M-Pesa ingest result stays cached.

Validation with {pointblank}

Silent bad data is worse than a failed pipeline. A single duplicated transaction_id or a NULL merchant_phone can corrupt downstream aggregations without raising any errors. The validation step makes those problems loud:

# R/validate.R

validate_mpesa <- function(df) {
  agent <- pointblank::create_agent(tbl = df, label = "M-Pesa validation") |>
    pointblank::col_is_character(vars(transaction_id)) |>
    pointblank::col_vals_not_null(vars(transaction_id, amount_kes, merchant_phone)) |>
    pointblank::col_vals_gt(vars(amount_kes), value = 0) |>
    pointblank::col_vals_regex(
      vars(merchant_phone),
      regex = "^2547[0-9]{8}$"   # Safaricom 07xx -> 2547xx normalised
    ) |>
    pointblank::rows_distinct(vars(transaction_id)) |>
    pointblank::interrogate()

  # Fail the pipeline target if critical checks do not pass
  if (pointblank::all_passed(agent) == FALSE) {
    stop(
      "M-Pesa validation failed. Run pointblank::get_agent_report(agent) to inspect.\n",
      "Failed checks: ",
      paste(pointblank::get_sundered_data(agent, type = "fail") |> nrow(), "rows rejected")
    )
  }

  # Return only rows that passed all checks
  pointblank::get_sundered_data(agent, type = "pass")
}

The same pattern applies to USSD and CRM targets. Failed validation stops the pipeline at the right point, before bad data reaches the warehouse.

Arrow/Parquet for Intermediate Storage

The format = "parquet" argument in tar_option_set() tells {targets} to serialise each target’s output as a Parquet file rather than an RDS object. For a few million M-Pesa rows, this matters: Parquet files are 5-10x smaller than equivalent RDS files, read faster with columnar access patterns, and can be queried directly by DuckDB or shared with Python colleagues without conversion.

For targets that need to cross the R-to-R boundary within the pipeline, this is transparent. {targets} reads and writes Parquet automatically using {arrow}. For targets that need further processing before the warehouse write, you can work with Arrow tables directly:

# R/transform.R

build_transactions <- function(mpesa, ussd, crm) {
  # Join M-Pesa transactions with USSD session data on merchant phone
  transactions <- mpesa |>
    dplyr::left_join(
      ussd |> dplyr::select(phone, session_id, initiated_at, completed),
      by = c("merchant_phone" = "phone"),
      relationship = "many-to-one"
    ) |>
    # Enrich with CRM merchant data
    dplyr::left_join(
      crm |> dplyr::select(phone, merchant_name, county, tier, sales_rep),
      by = c("merchant_phone" = "phone")
    ) |>
    dplyr::mutate(
      transaction_date = lubridate::as_date(transaction_date),
      week_start       = lubridate::floor_date(transaction_date, "week"),
      county           = dplyr::coalesce(county, "Unknown"),
      amount_kes       = as.numeric(amount_kes)
    ) |>
    dplyr::select(
      transaction_id, transaction_date, week_start,
      merchant_phone, merchant_name, county, tier, sales_rep,
      amount_kes, session_id, completed, source_file, ingested_at
    )

  transactions
}

Writing to PostgreSQL with DBI

The warehouse write target connects to PostgreSQL and upserts the canonical transactions table. The pattern uses DBI::dbWriteTable() with a staging table for idempotent loads: write the new batch to a staging table, then merge into the production table on transaction_id. That way, re-running the pipeline after a partial failure does not create duplicates.

# R/warehouse.R

write_to_warehouse <- function(transactions) {
  con <- DBI::dbConnect(
    RPostgres::Postgres(),
    host     = Sys.getenv("PG_HOST"),
    port     = as.integer(Sys.getenv("PG_PORT", "5432")),
    dbname   = Sys.getenv("PG_DB"),
    user     = Sys.getenv("PG_USER"),
    password = Sys.getenv("PG_PASSWORD")
  )
  on.exit(DBI::dbDisconnect(con), add = TRUE)

  # Write to staging table (overwrite on each run)
  DBI::dbWriteTable(
    con,
    name      = "mpesa_transactions_staging",
    value     = transactions,
    overwrite = TRUE,
    row.names = FALSE
  )

  # Upsert from staging to production table
  DBI::dbExecute(con, "
    INSERT INTO mpesa_transactions (
      transaction_id, transaction_date, week_start,
      merchant_phone, merchant_name, county, tier, sales_rep,
      amount_kes, session_id, completed, source_file, ingested_at
    )
    SELECT
      transaction_id, transaction_date, week_start,
      merchant_phone, merchant_name, county, tier, sales_rep,
      amount_kes, session_id, completed, source_file, ingested_at
    FROM mpesa_transactions_staging
    ON CONFLICT (transaction_id)
    DO UPDATE SET
      amount_kes   = EXCLUDED.amount_kes,
      ingested_at  = EXCLUDED.ingested_at
  ")

  rows_written <- DBI::dbGetQuery(
    con,
    "SELECT COUNT(*) AS n FROM mpesa_transactions_staging"
  )$n

  list(status = "ok", rows = rows_written, timestamp = Sys.time())
}

The function returns a small status list rather than the full dataset. {targets} stores this as an RDS file, keeping the cache small and the pipeline log readable.

Scheduling: Cron and GitHub Actions

The pipeline runs daily. Two scheduling options work cleanly with this {targets} setup.

Cron on a Linux server: A crontab entry calls an R script that sets environment variables from a .Renviron file and runs tar_make():

# Run pipeline Monday-Friday at 07:00 Nairobi time (EAT = UTC+3)
0 4 * * 1-5 /usr/bin/Rscript /opt/pipelines/mpesa-etl/run_pipeline.R >> /var/log/mpesa-etl.log 2>&1

The run_pipeline.R script:

#!/usr/bin/env Rscript
readRenviron("/opt/pipelines/.Renviron")
targets::tar_make(reporter = "timestamp")

GitHub Actions: For teams that store the pipeline in Git (recommended), a workflow file handles scheduling and sends failures to a Slack webhook:

name: M-Pesa ETL Pipeline

on:
  schedule:
    - cron: "0 4 * * 1-5"   # 07:00 EAT, weekdays
  workflow_dispatch:          # Allow manual trigger

jobs:
  run-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: r-lib/actions/setup-r@v2
        with:
          r-version: "4.4"

      - name: Restore targets cache
        uses: actions/cache@v4
        with:
          path: _targets/
          key: targets-${{ hashFiles('_targets.R', 'R/**') }}
          restore-keys: targets-

      - name: Run pipeline
        env:
          PG_HOST:     ${{ secrets.PG_HOST }}
          PG_DB:       ${{ secrets.PG_DB }}
          PG_USER:     ${{ secrets.PG_USER }}
          PG_PASSWORD: ${{ secrets.PG_PASSWORD }}
        run: Rscript -e "targets::tar_make(reporter = 'timestamp')"

      - name: Notify on failure
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          payload: '{"text": "M-Pesa ETL failed. Check the Actions log."}'
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

The actions/cache step restores the _targets/ store between runs. GitHub Actions only re-runs targets whose inputs changed, because the cached store persists across workflow runs. For a pipeline that ingests daily increments, this means only the new M-Pesa batch gets processed on most days.

Error Handling and Observability

{targets} surfaces errors at the target level. When validate_mpesa throws, the pipeline stops there, the error message goes to stdout (and to the Actions log), and all downstream targets are marked as failed. The CRM and USSD targets, which have no dependency on mpesa_validated, still succeed and get cached.

For production alerting, wrap tar_make() in a tryCatch and notify on failure:

# run_pipeline.R
readRenviron(".Renviron")

tryCatch(
  {
    targets::tar_make(reporter = "timestamp")
    message("Pipeline completed: ", Sys.time())
  },
  error = function(e) {
    msg <- paste0("ETL pipeline failed at ", Sys.time(), "\n", conditionMessage(e))
    message(msg)
    httr2::request(Sys.getenv("SLACK_WEBHOOK")) |>
      httr2::req_body_json(list(text = msg)) |>
      httr2::req_perform()
    quit(status = 1)
  }
)

This pattern keeps the scheduling layer thin. Whether you use cron, GitHub Actions, or a task runner like {rrq}, the pipeline itself handles error propagation through the DAG.

What This Solves in Practice

The Nairobi distributor scenario from the opening is representative of dozens of companies operating at this scale across East Africa: telecom payment data, field agent apps generating USSD logs, and CRM systems that do not talk to each other. The data volumes sit comfortably below 100 GB. The analytical questions (weekly revenue by county, merchant tier performance, sales rep attribution) do not require Spark.

What they require is a pipeline that runs reliably, fails loudly, and does not need a Python engineer to maintain. The {targets} stack delivers that. The finance analyst stops building spreadsheets at 9:45 a.m. The dashboard updates itself. The sales director gets county-level M-Pesa attribution by the time the morning stand-up starts.

A few practical notes from running this pattern with clients in Nairobi and Mombasa. First, store the _targets/ directory on a mounted volume if you run in Docker; it is your cache and you do not want it wiped on container restart. Second, the {dm} package fits naturally between the transform and warehouse-write steps: define your relational model once, use dm_examine_constraints() to catch foreign key violations before the upsert, and you get an extra layer of guarantees without writing SQL assertions by hand. Third, run tar_prune() monthly to clean up cached targets that are no longer referenced by _targets.R; the _targets/objects/ directory grows faster than most teams expect.

If you are managing heterogeneous data sources at this scale, the Python assumption may be costing you more than you realise. The R tooling is ready. The question is whether your architecture is.

For teams that want help designing or reviewing a pipeline like this, Kwiz Computing builds data engineering systems for East African businesses across sectors. We also cover how these pipelines connect to production R deployments in our business leaders guide to R and the R Shiny hosting guide for teams that expose pipeline outputs as interactive dashboards.

What part of your current pipeline breaks most often: ingest, validation, or the warehouse write?

© 2026 Kwiz Computing Technologies. All rights reserved.
Data Science & Technology | Environmental Analytics | Quantitative Finance

 

Built with Quarto