
MemCP is an in-memory columnar database written in Go with a Scheme scripting layer for query compilation, planning and optimization. Its storage engine organizes data into shards, each containing a compressed main storage and a small append-only delta buffer for recent writes. Until now, the two primary scan paths — unordered aggregation (scan) and ordered retrieval (scan_order) — each contained their own inline loops for reading columns, dispatching between main and delta storage, calling the map function, and folding the reduce. This worked, but the duplication made it harder to optimize either path and — more critically — impossible to swap in a network-transparent implementation for distributed query execution.
This post describes the refactoring we just landed: ShardMapReducer, a universal batch map+reduce interface that unifies both scan paths, eliminates per-row heap allocations, and lays the groundwork for clustering.
The Problem: Two Scan Paths, Duplicated Logic
Before this change, scan.go and scan_order.go each independently:
- Opened column readers for the requested columns
- Iterated over matching record IDs
- For each ID, branched on whether it lived in main storage or the delta buffer
- Called
ColumnStorage.GetValue(id)orgetDelta(id - mainCount, col) - Invoked the map callback
- Folded the result into an accumulator via reduce
The two paths differed only in how record IDs were produced: scan iterates an index and filters inline; scan_order sorts filtered IDs per shard, then merges them globally via a priority queue. But the map+reduce hot loop was copy-pasted between them, and any optimization (JIT compilation, batch column reads, SIMD) would have to be applied twice.
Worse, both paths interleaved filter evaluation with map+reduce execution in a single loop. For each row, the code would read the filter columns, evaluate the condition, then — if the row passed — immediately read the map columns and call map+reduce. This meant the CPU cache had to juggle both filter-column data and map-column data simultaneously, evicting one to load the other on every row.
The Solution: ShardMapReducer
We introduced a single struct that encapsulates the map+reduce pipeline:
type ShardMapReducer struct {
shard *storageShard
mainCols []ColumnStorage // direct main storage references
colNames []string // for delta access
isUpdate []bool // $update columns get UpdateFunction
args []scm.Scmer // pre-allocated argument buffer
mapFn func(...scm.Scmer) scm.Scmer
reduceFn func(...scm.Scmer) scm.Scmer
mainCount uint
}
The key method is Stream(acc, recids) -> acc: given an accumulator and a batch of record IDs, it returns the new accumulator after applying map+reduce to each record. Column readers are opened once at construction (OpenMapReducer), and the args buffer is pre-allocated — no allocations in the hot path.
Run Detection: Main vs Delta
The Stream method doesn’t just iterate blindly. It partitions the incoming ID batch into contiguous runs of the same storage type:
recids: [3, 7, 12, 15, 1001, 1003, 18, 22, 1005]
\_____________/ \________/ \_____/ \__/
MainBlock(4) DeltaBlock(2) Main(2) Delta(1)
Each run is dispatched to either processMainBlock or processDeltaBlock. The main block is a tight loop with direct ColumnStorage.GetValue calls — no branching on storage type per row. In the common case (99%+ of data in compressed main storage, only a handful of recent delta rows), this means a single processMainBlock call covers almost the entire batch.
These two methods are the future JIT compilation targets — more on that below.
Stack Buffer in scan.go
The unordered scan path now collects filtered record IDs into a stack-allocated buffer before flushing them to the MapReducer:
var buf [1024]uint
bufN := 0
// ... inside index iteration:
buf[bufN] = idx
bufN++
if bufN == 1024 {
acc = mapper.Stream(acc, buf[:bufN])
bufN = 0
}
1024 items x 8 bytes = 8 KB — fits comfortably in L1 cache, requires zero heap allocation. The old code called map+reduce inline for every passing row; the new code batches them, letting the MapReducer process a dense block of IDs with optimal memory access patterns.
Batched Merge in scan_order
The ordered scan path merges sorted per-shard results via a global priority queue. The merge pops items one at a time from the heap, accumulating consecutive same-shard items into a buffer. When the next heap pop yields a different shard, the buffer is flushed to the previous shard’s MapReducer:
var buf [1024]uint
bufN := 0
var bufShard *shardqueue
for len(q.q) > 0 {
qx := q.q[0] // peek at heap root
item := qx.items[0] // take its front item
qx.items = qx.items[1:] // consume it
if bufShard != nil && bufShard != qx {
// shard changed — flush buffer to previous shard's mapper
acc = bufShard.mapper.Stream(acc, buf[:bufN])
bufN = 0
}
bufShard = qx
buf[bufN] = item
bufN++
if bufN == len(buf) { // buffer full — flush
acc = bufShard.mapper.Stream(acc, buf[:bufN])
bufN = 0
}
// re-heapify or remove exhausted shard
if len(qx.items) > 0 { heap.Fix(&q, 0) } else { heap.Pop(&q) }
}
// flush remaining
if bufN > 0 { acc = bufShard.mapper.Stream(acc, buf[:bufN]) }
This uses the same 8 KB stack buffer as the unordered path. When a single shard dominates a region of the sort order, the buffer fills with consecutive same-shard items and flushes in one Stream() call. For cluster mode, each Stream() call becomes a network RPC — batching N consecutive same-shard items into one call reduces round-trips from N to ~N/batchsize.
Benchmark Results
We benchmarked the new code against the previous version on a table with 2 million rows, 4 columns (INT, INT, INT, TEXT). Five runs per query, median reported:
| Query | Baseline | ShardMapReducer | Speedup |
|---|---|---|---|
SELECT COUNT(*) | 1273 ms | 946 ms | 1.35x (-26%) |
SELECT SUM(value) | 1394 ms | 1078 ms | 1.29x (-23%) |
ORDER BY value DESC LIMIT 100 | 1607 ms | 1096 ms | 1.47x (-32%) |
WHERE value > 500000 (filter+count) | 1614 ms | 1034 ms | 1.56x (-36%) |
GROUP BY category % 100 | — | — | — |
The variance also dropped significantly (e.g., SCAN+FILTER: +/-20 ms vs +/-278 ms on the baseline), confirming more predictable memory access patterns.
Where Does the Speedup Come From?
A 23-36% improvement from what is essentially a refactoring — no algorithmic changes, same data structures, same query plans — deserves a closer look. There is no single silver bullet; the speedup is the cumulative effect of removing many small inefficiencies that, at 2 million iterations, add up to hundreds of milliseconds.
Eliminated Main/Delta Branching Per Row
The old code checked for every column of every row whether the record lived in main storage or the delta buffer:
// Old: inside the per-row loop, for EACH column
if idx < t.main_count {
mdataset[i] = mcols[i].GetValue(idx)
} else {
mdataset[i] = t.getDelta(int(idx-t.main_count), col)
}
With 2M rows and 4 columns, that is 8 million conditional branches. The branch predictor learns “almost always main”, but each misprediction costs roughly 15 CPU cycles on modern hardware.
The new code partitions the batch once via run detection. processMainBlock contains no branch — just a tight loop calling col.GetValue(id). With 99.9% of data in main storage, the entire 2M-row scan typically executes as a single processMainBlock call.
Phase Separation: Filter vs Map+Reduce
This is likely the largest contributor, especially for filtered scans.
The old code ran everything in one interleaved loop:
for each row:
read condition columns -> load cache lines for filter columns
evaluate filter
if passes:
read callback columns -> load cache lines for map columns (evicts filter data!)
call map + reduce
The new code separates these into distinct phases:
Phase 1 (filter): for each row:
read condition columns only -> L1 stays hot for filter columns
if passes: buf[n++] = idx
Phase 2 (map+reduce): mapper.Stream(buf[:n])
read callback columns only -> L1 now fully dedicated to map columns
call map + reduce
In Phase 1, only the filter columns compete for cache space. In Phase 2, only the map columns are accessed. The old code forced both column sets to share the same cache lines, causing thrashing on every row that passed the filter.
Why SCAN+FILTER Benefits Most (1.56x)
WHERE value > 500000 filters roughly half the rows. The old code still interleaved filter and map column reads for every passing row — the cache pollution hit it on every match. The new code collects all passing IDs first (Phase 1 touches only the filter column), then processes them in a clean sequential sweep through the map columns (Phase 2). The more selective the filter, the denser and more cache-friendly the resulting ID batches become.
Why COUNT Benefits Least (1.35x)
COUNT has no real map phase — map is essentially identity, reduce is a counter increment. There are no “map columns” to read, so the phase-separation advantage barely applies. The 35% speedup for COUNT comes almost entirely from the eliminated per-row branching and the pre-allocated args buffer. It represents the “floor” of what the structural improvements buy you even without cache-locality gains.
Pre-Allocated Args Buffer
ShardMapReducer.args is allocated once at construction and reused for every row. The old code reused a mdataset slice too, but allocated it fresh per scan call. More importantly, the MapReducer is a longer-lived object that the Go runtime can place more efficiently, reducing GC tracing overhead across millions of iterations.
Stack Buffer Eliminates GC Pressure
var buf [1024]uint lives on the Go stack — no heap allocation, no GC tracking. The old code had no equivalent buffer and invoked map+reduce inline per row. With 2M rows, the cumulative GC scheduling overhead was measurable — the reduced variance (+/-20 ms vs +/-278 ms for SCAN+FILTER) is a direct indicator of less GC interference.
The Compound Effect
None of these improvements alone would explain 36%. But they multiply:
- 8M fewer branches saves ~10-20 ms (misprediction cost)
- Phase separation saves ~100-200 ms (cache locality)
- Stack buffer saves ~50-100 ms (GC pressure)
- Tighter inner loops save ~50-100 ms (instruction cache, branch predictor warmup)
At 2 million rows, “a few nanoseconds per row” becomes hundreds of milliseconds. The old code suffered death by a thousand cuts — each overhead was invisible in isolation, but collectively they consumed 23-36% of the scan time.
Cluster Readiness: Scmer Serialization
The OpenMapReducer constructor accepts the original scm.Scmer values for mapFn and reduceFn (instead of pre-optimized Go functions), storing them alongside the optimized callables. This separation enables future network serialization: the coordinator ships the Scmer ASTs to remote nodes, which call OptimizeProcToSerialFunction (and eventually JIT) on the receiver side. Combined with the batched merge in scan_order, where consecutive same-shard items are flushed as a single Stream() call, this means each RPC covers an entire batch rather than a single row — critical for keeping network overhead manageable during distributed ORDER BY queries.
What This Enables: Network-Transparent Distributed Scans
The real motivation behind ShardMapReducer is not just the local speedup — it is the abstraction boundary it creates for clustering.
MemCP’s cluster design uses a MOESI-inspired cache coherence protocol with CRUSH-based directory assignment. Each shard has a deterministic “home node” for coordination, but data can be cached on any node. When a query needs to scan a shard that lives on a remote node, we need to ship the computation there rather than pulling the data.
With ShardMapReducer, the interface is already in place:
Local: acc = mapper.Stream(acc, recids) // tight loop on column storage
Remote: acc = rpc.Stream(acc, recids) // send (acc, recids), receive new acc
The coordinator sends the serialized filter/map/reduce lambdas along with the batch of record IDs. The remote node executes processMainBlock/processDeltaBlock locally and returns only the accumulator — typically a single number for aggregates, or a small assoc-list for GROUP BY. The data never crosses the wire; only the computation result does.
For the unordered scan path, this maps directly onto the existing two-phase architecture:
- Phase 1 (per-shard, now per-node): filter + map + reduce -> one accumulator per node
- Phase 2 (coordinator): reduce2 over all node results -> final answer
For ordered scans, the remote node can perform the filter+sort phase locally and return sorted record metadata. The coordinator then drives the global merge via the priority queue, calling Stream on the appropriate node for each batch of consecutive same-shard items.
The batch size naturally adapts: local scans flush at 1024 items, ordered scans accumulate consecutive same-shard items during the priority-queue merge before streaming them through the MapReducer, and remote scans can bundle all of a node’s matching IDs into a single RPC. This “graceful degradation” from batch=all down to batch=1 falls out of the interface without special cases.
The JIT Opportunity: processMainBlock Is Almost Trivially Compilable
The 36% speedup we measured is with Go’s standard compiled code — interpreted Scheme lambdas for map and reduce, generic ColumnStorage.GetValue calls with interface dispatch. There is a lot of performance still left on the table.
Consider what processMainBlock actually does:
func (m *ShardMapReducer) processMainBlock(acc Scmer, recids []uint) Scmer {
for _, id := range recids {
for i, col := range m.mainCols {
m.args[i] = col.GetValue(id)
}
acc = m.reduceFn(acc, m.mapFn(m.args...))
}
return acc
}
This is a tight loop with exactly three responsibilities: read columns, call map, call reduce. Before ShardMapReducer, these responsibilities were buried inside a 200-line function that also handled filter evaluation, main/delta branching, lock management, delta column lookups, transaction visibility checks, and index iteration. Extracting processMainBlock into its own method was a prerequisite — you cannot JIT-compile a function that does everything.
Now that the method is isolated, JIT compilation becomes almost mechanical. At query planning time, we know:
- Which columns are read (their concrete storage types: StorageInt, StorageEnum, StorageFloat, …)
- What map does (the Scheme AST — e.g.,
(list col1 col2)for a simple projection, or(* col1 col2)for a computed expression) - What reduce does (e.g.,
(+ acc mapped)for SUM,(set_assoc acc key val)for GROUP BY collection)
A specialized processMainBlock for SELECT SUM(value) FROM t could compile down to:
// Generated at query time — no interface dispatch, no Scmer boxing
func processMainBlock_SUM_StorageInt(col *StorageInt, recids []uint) int64 {
acc := int64(0)
for _, id := range recids {
acc += col.values[id] // direct array access, no GetValue overhead
}
return acc
}
No interface{} boxing, no virtual method calls, no Scheme interpreter overhead — just a loop over a contiguous integer array. This is the kind of code that Go (or a codegen backend) can auto-vectorize with SIMD instructions. On AVX2 hardware, eight int64 additions per cycle are realistic; on AVX-512, sixteen.
The old scan architecture made this impossible because the JIT target would have been a monolithic function mixing filter logic, storage dispatch, lock handling, and reduce folding. ShardMapReducer cleanly separates concerns: the filter phase is the caller’s responsibility, storage dispatch happens via run detection in Stream, and processMainBlock is purely “read columns, compute, fold” — exactly the pattern that JIT backends (and LLVM, and even Go’s own compiler with sufficient inlining) can optimize aggressively.
The same applies to processDeltaBlock, though it matters less in practice — delta storage typically holds fewer than 0.1% of rows and is only accessed during the brief window between writes and the next shard rebuild.
Next Steps
The local foundation is in place. ShardMapReducer is the single point where all three future optimization axes converge:
Clustering (replace what is below the interface):
- Binary Scmer wire format — replace JSON serialization with a compact tag+payload encoding for efficient RPC
- Scan RPC protocol — ship filter/map/reduce lambdas to remote nodes, receive accumulator results
- Batched aggregate push-down — fuse multiple GROUP BY aggregates into a single remote scan instead of N separate passes
- Remote ShardMapReducer — implement
Stream()as an RPC call, transparent to the caller
JIT compilation (replace what is inside the interface):
- Type-specialized processMainBlock — generate Go code (or machine code) for concrete column types and known map/reduce functions, eliminating interface dispatch and Scmer boxing
- SIMD vectorization — for simple aggregates (SUM, COUNT, MIN, MAX) over integer/float columns, process 4-8 values per CPU cycle
Query optimization (improve what is above the interface):
- Predicate push-down into processMainBlock — fuse the filter phase into the map loop for cases where filter and map columns overlap, avoiding the two-pass overhead
- Columnar batch reads — instead of
GetValue(id)per row, read contiguous column segments into a temporary buffer for even better cache line utilization
The beauty of the current design is that none of these optimizations require changes to the scan logic in scan.go or scan_order.go. The callers see Stream(acc, recids) -> acc — whether that runs interpreted Scheme on local storage, JIT-compiled native code, or a network RPC to a remote node is entirely hidden behind the interface. That was the whole point.
Comments are closed