Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

DataFrame API

The source code discussed in this chapter can be found in the logical-plan module of the KQuery project.

The previous chapter showed how to represent queries as logical plans. But constructing those plans by hand is tedious. This chapter introduces the DataFrame API, a fluent interface that makes building logical plans natural and readable.

Building Plans The Hard Way

Consider this query:

SELECT id, first_name, last_name, state, salary
FROM employee
WHERE state = 'CO'

To build this as a logical plan, we construct each piece separately and then wire them together:

val csv = CsvDataSource("employee.csv")
val scan = Scan("employee", csv, listOf())
val filterExpr = Eq(Column("state"), LiteralString("CO"))
val selection = Selection(scan, filterExpr)
val projectionList = listOf(
    Column("id"),
    Column("first_name"),
    Column("last_name"),
    Column("state"),
    Column("salary")
)
val plan = Projection(selection, projectionList)

This works, but it is verbose and the code structure does not mirror the query structure. We can improve this slightly by nesting the constructors:

val plan = Projection(
    Selection(
        Scan("employee", CsvDataSource("employee.csv"), listOf()),
        Eq(Column("state"), LiteralString("CO"))
    ),
    listOf(
        Column("id"),
        Column("first_name"),
        Column("last_name"),
        Column("state"),
        Column("salary")
    )
)

This is more compact, but harder to read. The nesting goes from inside out, opposite to how we think about query execution (scan first, then filter, then project).

The DataFrame Approach

A DataFrame wraps a logical plan and provides methods that return new DataFrames. Each method call adds a node to the plan. This creates a fluent API where code reads top-to-bottom in execution order:

val df = ctx.csv("employee.csv")
    .filter(Eq(Column("state"), LiteralString("CO")))
    .project(listOf(
        Column("id"),
        Column("first_name"),
        Column("last_name"),
        Column("state"),
        Column("salary")
    ))

Read this as: start with the CSV file, filter to Colorado, project the columns we want. The code structure matches the mental model.

The DataFrame Interface

The interface is simple:

interface DataFrame {

  / Apply a projection */
  fun project(expr: List<LogicalExpr>): DataFrame

  / Apply a filter */
  fun filter(expr: LogicalExpr): DataFrame

  / Aggregate */
  fun aggregate(
      groupBy: List<LogicalExpr>,
      aggregateExpr: List<AggregateExpr>
  ): DataFrame

  / Join with another DataFrame */
  fun join(right: DataFrame, joinType: JoinType, condition: LogicalExpr): DataFrame

  / Returns the schema of the data that will be produced by this DataFrame. */
  fun schema(): Schema

  / Get the logical plan */
  fun logicalPlan(): LogicalPlan
}

Each transformation method (project, filter, aggregate, join) returns a new DataFrame. This enables method chaining. The schema method exposes the output schema for inspection. The logicalPlan method retrieves the underlying plan when we need it for optimization or execution.

Implementation

The implementation wraps a logical plan and creates new plan nodes on each method call:

class DataFrameImpl(private val plan: LogicalPlan) : DataFrame {

  override fun project(expr: List<LogicalExpr>): DataFrame {
    return DataFrameImpl(Projection(plan, expr))
  }

  override fun filter(expr: LogicalExpr): DataFrame {
    return DataFrameImpl(Selection(plan, expr))
  }

  override fun aggregate(
      groupBy: List<LogicalExpr>,
      aggregateExpr: List<AggregateExpr>
  ): DataFrame {
    return DataFrameImpl(Aggregate(plan, groupBy, aggregateExpr))
  }

  override fun join(
      right: DataFrame,
      joinType: JoinType,
      condition: LogicalExpr
  ): DataFrame {
    return DataFrameImpl(Join(plan, right.logicalPlan(), joinType, condition))
  }

  override fun schema(): Schema {
    return plan.schema()
  }

  override fun logicalPlan(): LogicalPlan {
    return plan
  }
}

Each method constructs a new logical plan node with the current plan as input, then wraps it in a new DataFrame. The original DataFrame is unchanged (DataFrames are immutable), so you can branch from any point:

val employees = ctx.csv("employee.csv")
val colorado = employees.filter(Eq(Column("state"), LiteralString("CO")))
val texas = employees.filter(Eq(Column("state"), LiteralString("TX")))

Joins combine two DataFrames based on a condition:

val employees = ctx.csv("employee.csv")
val departments = ctx.csv("department.csv")

val joined = employees.join(
    departments,
    JoinType.INNER,
    Eq(Column("dept_id"), Column("id"))
)

The Joins chapter covers join types and algorithms in detail.

Execution Context

We need a starting point for building DataFrames. The execution context creates initial DataFrames from data sources:

class ExecutionContext {

  fun csv(filename: String): DataFrame {
    return DataFrameImpl(Scan(filename, CsvDataSource(filename), listOf()))
  }

  fun parquet(filename: String): DataFrame {
    return DataFrameImpl(Scan(filename, ParquetDataSource(filename), listOf()))
  }
}

Later chapters expand this context to handle query execution. For now, it just creates DataFrames.

Convenience Methods

The basic API works but is still verbose. Kotlin’s features let us make it more expressive.

Helper functions create expressions concisely:

fun col(name: String) = Column(name)
fun lit(value: String) = LiteralString(value)
fun lit(value: Long) = LiteralLong(value)
fun lit(value: Double) = LiteralDouble(value)

Infix operators let us write expressions naturally:

infix fun LogicalExpr.eq(rhs: LogicalExpr): LogicalExpr = Eq(this, rhs)
infix fun LogicalExpr.neq(rhs: LogicalExpr): LogicalExpr = Neq(this, rhs)
infix fun LogicalExpr.gt(rhs: LogicalExpr): LogicalExpr = Gt(this, rhs)
infix fun LogicalExpr.gteq(rhs: LogicalExpr): LogicalExpr = GtEq(this, rhs)
infix fun LogicalExpr.lt(rhs: LogicalExpr): LogicalExpr = Lt(this, rhs)
infix fun LogicalExpr.lteq(rhs: LogicalExpr): LogicalExpr = LtEq(this, rhs)
infix fun LogicalExpr.mult(rhs: LogicalExpr): LogicalExpr = Multiply(this, rhs)
infix fun LogicalExpr.alias(name: String): LogicalExpr = Alias(this, name)

Now we can write queries that read almost like SQL:

val df = ctx.csv("employee.csv")
    .filter(col("state") eq lit("CO"))
    .project(listOf(
        col("id"),
        col("first_name"),
        col("last_name"),
        col("salary"),
        (col("salary") mult lit(0.1)) alias "bonus"
    ))
    .filter(col("bonus") gt lit(1000))

This reads: take the CSV, keep rows where state equals CO, compute id, names, salary, and a 10% bonus column, then keep only rows where bonus exceeds 1000.

DataFrames vs SQL

Both DataFrames and SQL describe what data you want. Why offer both?

SQL is familiar to analysts and can be embedded in applications as strings. But SQL strings are opaque to the compiler. Typos and type errors only appear at runtime.

DataFrames integrate with the programming language. The compiler catches method name typos. IDEs provide autocomplete. You can build queries dynamically using normal programming constructs:

var df = ctx.csv("employee.csv")

if (stateFilter != null) {
    df = df.filter(col("state") eq lit(stateFilter))
}

if (minSalary != null) {
    df = df.filter(col("salary") gteq lit(minSalary))
}

Most modern query engines support both interfaces. Users choose based on their needs.

The Underlying Plan

However we build it, the result is a logical plan. Our example query:

val df = ctx.csv("employee.csv")
    .filter(col("state") eq lit("CO"))
    .project(listOf(col("id"), col("first_name"), col("last_name"), col("salary")))

Produces this plan:

Projection: #id, #first_name, #last_name, #salary
  Filter: #state = 'CO'
    Scan: employee.csv; projection=None

The DataFrame is just a convenient way to construct this tree. Once built, the plan goes through optimization and physical planning regardless of how it was created.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2025 Andy Grove. All rights reserved.