Data Sources
The source code discussed in this chapter can be found in the datasource module of the KQuery project.
A query engine needs data to query. The data might come from CSV files, Parquet files, databases, or even data that already exists in memory. We want our query engine to work with any of these without caring about the details. This chapter introduces the data source abstraction that makes this possible.
Why Abstract Data Sources?
Consider a simple query:
SELECT name, salary FROM employees WHERE department = 'Engineering'
From the query engine’s perspective, it needs to:
- Know what columns exist (to validate that
name,salary, anddepartmentare real) - Know the types of those columns (to validate that comparing
departmentto a string makes sense) - Read the actual data, preferably just the columns it needs
Whether employees is a CSV file, a Parquet file, or an in-memory table, these requirements are the same. By defining a common interface, we can write query planning and execution logic once and have it work with any data source.
The DataSource Interface
KQuery defines a simple interface that all data sources must implement:
interface DataSource {
/ Return the schema for the underlying data source */
fun schema(): Schema
/ Scan the data source, selecting the specified columns */
fun scan(projection: List<String>): Sequence<RecordBatch>
}
schema() returns the schema of the data, the column names and their types. The query planner uses this during planning to validate queries. If you reference a column that does not exist, the planner can report an error before execution begins.
scan(projection) reads the data and returns it as a sequence of record batches. The projection parameter lists which columns to read. This is important for efficiency: if a query only uses three columns from a table with fifty columns, we should only read those three. Some file formats like Parquet support this natively. For others like CSV, we might read everything but only build vectors for the requested columns.
The return type Sequence<RecordBatch> enables streaming. Rather than loading an entire file into memory, we can process it batch by batch. This matters when data is larger than available memory.
CSV Data Source
CSV is the simplest format to understand but has some awkward properties. The file is just text with values separated by commas (or tabs, or semicolons). There is no schema embedded in the file, only optional column names in the first row.
Here is what a CSV file might look like:
id,name,department,salary
1,Alice,Engineering,95000
2,Bob,Sales,87000
3,Carol,Engineering,102000
To read this, we need to handle several things:
- Parse column names from the header row (if present)
- Infer or accept a schema (what types are these columns?)
- Parse text values into typed values
- Handle missing or malformed values
KQuery’s CsvDataSource accepts an optional schema. If provided, it uses that schema. If not, it infers one by reading the header row and treating all columns as strings:
class CsvDataSource(
val filename: String,
val schema: Schema?,
private val hasHeaders: Boolean,
private val batchSize: Int
) : DataSource {
private val finalSchema: Schema by lazy { schema ?: inferSchema() }
override fun schema(): Schema {
return finalSchema
}
// ...
}
The scan method streams through the file, parsing rows into batches. For each batch, it creates Arrow vectors and populates them with parsed values:
override fun scan(projection: List<String>): Sequence<RecordBatch> {
val readSchema =
if (projection.isNotEmpty()) {
finalSchema.select(projection)
} else {
finalSchema
}
val parser = buildParser(settings)
parser.beginParsing(file.inputStream().reader())
return ReaderAsSequence(readSchema, parser, batchSize)
}
The ReaderAsSequence class implements Kotlin’s Sequence interface, reading rows on demand and grouping them into batches. Each batch converts string values to the appropriate Arrow vector type based on the schema.
Type Conversion
CSV files are text, but our schema might specify that a column is an integer or a float. The CSV reader must parse these strings into typed values:
when (vector) {
is IntVector ->
rows.withIndex().forEach { row ->
val valueStr = row.value.getValue(field.value.name, "").trim()
if (valueStr.isEmpty()) {
vector.setNull(row.index)
} else {
vector.set(row.index, valueStr.toInt())
}
}
is Float8Vector ->
rows.withIndex().forEach { row ->
val valueStr = row.value.getValue(field.value.name, "")
if (valueStr.isEmpty()) {
vector.setNull(row.index)
} else {
vector.set(row.index, valueStr.toDouble())
}
}
// ... other types
}
Empty values become nulls. Invalid values (like “abc” in an integer column) will throw an exception, which is the right behaviour since the data does not match the declared schema.
Parquet Data Source
Parquet is a binary columnar format designed for analytics. Unlike CSV, Parquet files contain:
- Schema information (column names, types, nullability)
- Data organized in column chunks within row groups
- Compression (snappy, gzip, etc.)
- Optional statistics (min/max values per column chunk)
This makes Parquet much more efficient for query engines. Reading a single column means reading just that column’s data, not the entire file. The schema is known without inference. And compression reduces both storage and I/O.
KQuery’s ParquetDataSource is simpler than the CSV version because the Parquet library provides much of what we need:
class ParquetDataSource(private val filename: String) : DataSource {
override fun schema(): Schema {
return ParquetScan(filename, listOf()).use {
val arrowSchema = SchemaConverter().fromParquet(it.schema).arrowSchema
SchemaConverter.fromArrow(arrowSchema)
}
}
override fun scan(projection: List<String>): Sequence<RecordBatch> {
return ParquetScan(filename, projection)
}
}
The schema comes directly from the file’s metadata. The scan reads row groups one at a time, converting Parquet’s columnar format to Arrow vectors.
Projection Pushdown
Parquet’s columnar organization means projection pushdown is efficient. When we request only certain columns, the Parquet reader only decompresses and reads those column chunks. For wide tables (hundreds of columns) where queries touch only a few, this can be orders of magnitude faster than reading everything.
In-Memory Data Source
Sometimes data is already in memory, perhaps loaded from another source or generated by a previous query. The InMemoryDataSource wraps existing record batches:
class InMemoryDataSource(
val schema: Schema,
val data: List<RecordBatch>
) : DataSource {
override fun schema(): Schema {
return schema
}
override fun scan(projection: List<String>): Sequence<RecordBatch> {
val projectionIndices =
projection.map { name -> schema.fields.indexOfFirst { it.name == name } }
return data.asSequence().map { batch ->
RecordBatch(schema, projectionIndices.map { i -> batch.field(i) })
}
}
}
This is useful for testing and for queries that build on other queries. Projection here just selects which columns to include in the output batches.
Other Data Sources
Real query engines support many more data sources. Some common ones:
JSON: Structured but schema-less. Each line might be a JSON object. Schema inference is possible but complex since nested structures must be flattened or represented as Arrow’s nested types.
ORC: Similar to Parquet, another columnar format popular in the Hadoop ecosystem. Data is stored in columnar “stripes” with schema and statistics.
Databases: Query engines can read from other databases via JDBC or native protocols. The schema comes from the database’s catalog. Pushing predicates down to the source database can dramatically reduce data transfer.
Object Storage: Cloud systems like S3 or GCS can serve as data sources. The query engine lists files matching a pattern and reads them, often in parallel.
Streaming Sources: Kafka, Kinesis, or other message queues. These require different handling since data arrives continuously rather than being read from a file.
Schema-less Sources
Some data sources do not have fixed schemas. JSON documents can have different fields in each record. Schema-on-read systems defer schema decisions until query time.
Handling schema-less sources adds complexity. Options include:
- Require users to declare a schema explicitly
- Infer a schema from a sample of the data
- Use a flexible representation like a map of string to value
- Reject queries that reference non-existent fields at runtime rather than planning time
KQuery requires schemas at planning time, so schema-less sources must provide or infer a schema somehow.
Connecting Data Sources to the Query Engine
Data sources plug into the query engine through the execution context. In KQuery, you register data sources with names:
val ctx = ExecutionContext()
ctx.registerCsv("employees", "data/employees.csv")
ctx.registerParquet("sales", "data/sales.parquet")
val result = ctx.sql("SELECT * FROM employees e JOIN sales s ON e.id = s.employee_id")
The query planner resolves table names to data sources, retrieves schemas for validation, and creates scan operations in the query plan. The details of CSV versus Parquet versus anything else are hidden behind the DataSource interface.
This separation is powerful. Adding support for a new file format means implementing two methods. The rest of the query engine, from planning through optimization to execution, works without modification.
This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work
Copyright © 2020-2025 Andy Grove. All rights reserved.