Skip to content

Rewrite the parquet input adapter manager#704

Open
arhamchopra wants to merge 11 commits into
mainfrom
ac/parquet_input_adapter
Open

Rewrite the parquet input adapter manager#704
arhamchopra wants to merge 11 commits into
mainfrom
ac/parquet_input_adapter

Conversation

@arhamchopra
Copy link
Copy Markdown
Collaborator

@arhamchopra arhamchopra commented Apr 23, 2026

Rewrite the parquet input adapter for RecordBatch-based streaming

Replaces the old ParquetReader / ParquetReaderColumnAdapter class hierarchy with a new three-layer architecture that operates on Arrow RecordBatch data directly. The new design is simpler (fewer virtual calls, no per-type reader subclasses) and supports reading from parquet files, Arrow IPC streams, and in-memory Arrow Tables through a unified RecordBatchStreamSource interface.

Motivation

The old implementation had:

  • A deep class hierarchy (FileReaderWrapperParquetFileReaderWrapper, per-type ReaderColumnAdapter subclasses) that was hard to extend
  • No support for the Arrow C Stream Interface (couldn't accept RecordBatchReader from external sources)
  • Row-by-row Parquet reads without RecordBatch-level column projection
  • ~2500 lines of boilerplate reader classes that duplicated Arrow's own type dispatch

The new implementation:

  • Reads entire RecordBatches and iterates rows within them (cache-friendly columnar access)
  • Uses Arrow's native column projection to read only requested columns from parquet
  • Exposes a RecordBatchStreamSource interface that cleanly separates file management from row processing
  • Reduces total C++ reader code by ~40%

Architecture

RecordBatchStreamSource          — file/stream boundary management
  └→ RecordBatchRowProcessor     — row-level cursor across N sources, alignment validation
       └→ ColumnDispatcher       — per-column type-erased read + dispatch to csp adapters
            └→ FieldReader       — Arrow array → typed value extraction (incl. nested structs)

RecordBatchStreamSource (new interface) abstracts file iteration with two implementations:

  • NativeParquetStreamSource — C++ opens parquet files directly with leaf-level column projection
  • PyRecordBatchStreamSource — Python yields RecordBatchReader objects via Arrow C Stream Interface (IPC, memory tables)

RecordBatchRowProcessor (new) binds to N RecordBatchReader* (one per split-column file), provides readRowAndAdvance() / skipRow() / dispatchRow(). Validates split-column alignment at runtime.

ColumnDispatcher (new) is a type-erased wrapper combining FieldReader + value storage + ValueDispatcher, one per subscribed column.

ParquetInputAdapterManager is rewritten to orchestrate via the above layers. It no longer touches Arrow arrays directly.

What's removed

  • ParquetReader / ParquetReaderColumnAdapter (~2500 lines) — the old per-type reader class hierarchy
  • FileReaderWrapper / ParquetFileReaderWrapper / ArrowIPCFileReaderWrapper — old file abstractions
  • DialectGenericListReaderInterface — unused reader interface
  • Dead m_rbSources member in DictBasketReaderRecord (declared/cleared but never populated)

Bug fixes

  • Nested struct column projection: Parquet stores struct sub-fields as separate leaf columns. The old projection logic used Arrow field indices directly, causing only the first sub-field to be read for struct columns. Added countLeafColumns() to correctly expand struct fields into all their leaf indices.
  • Null deref on schema change: if the time column disappears between files with allow_missing_columns=True, the adapter now throws a clear RuntimeError instead of segfaulting.
  • Stale basket processor sources: when basket columns are absent from a new stream, basket processor sources are now explicitly cleared, preventing use-after-free of dangling reader pointers.
  • Split-column row count alignment: runtime validation that all split-column files have matching row counts per batch.

Performance

Benchmarked on Linux (Python 3.13, Arrow 23.0.1, GCC 14.3). Two suites: bench_large measures full-file consumption (1M–5M ticks, all rows read), bench_comprehensive measures partial reads (86K ticks from larger files, realistic time-windowed workloads).

Scenario main PR Δ
Full-file dict basket 10sym 10.4ms 9.7ms -7%
Full-file dict basket 50sym 19.0ms 17.5ms -9%
Partial 2M×10 int 126.2ms 108.0ms -14%
Partial 500K×10 string 117.5ms 106.2ms -10%
Partial struct (3 fields) 57.6ms 41.7ms -28%
Partial struct (10 fields) 95.7ms 56.0ms -42%
Partial projection 50-of-50 105.8ms 97.7ms -8%
Partial IPC 2M×10 105.3ms 89.3ms -15%

Full-file: 3 faster, 0 slower, 20 unchanged. Partial-file: 22 faster, 0 slower, 13 unchanged (±3% threshold).

Key wins come from RecordBatch-level column projection (skips unrequested columns entirely), the InlineReader zero-overhead hot loop (verified by assembly to match hand-written typed access), and struct bulk-read eliminating per-field virtual dispatch.

API compatibility

The public Python API (ParquetReader.subscribe, subscribe_all, subscribe_dict_basket) is unchanged. All 128 existing + new tests pass (covering all Arrow types, null handling, struct projection, split columns, dict baskets, multi-file, IPC, and in-memory tables).

@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from a40d7a1 to bc4b134 Compare April 23, 2026 17:50
@timkpaine timkpaine added type: feature Issues and PRs related to new features adapter: parquet Issues and PRs related to our Apache Parquet/Arrow adapter labels Apr 23, 2026
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from bc4b134 to 5122477 Compare April 27, 2026 21:37
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…iguity

The introduction of namespace csp::adapters::arrow (for the new
ColumnDispatcher/RecordBatchRowProcessor classes) creates ambiguity when
writer-side headers use unqualified arrow:: inside namespace
csp::adapters::parquet. The compiler finds the sibling csp::adapters::arrow
namespace before the global ::arrow namespace.

Also forward-declares ColumnDispatcher and RecordBatchRowProcessor in
ParquetInputAdapterManager.h (moving full includes to .cpp) and adds
direct includes for csp/core/Exception.h and arrow/table.h that were
previously provided transitively through the now-deleted reader headers.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from 86d9f43 to 2ac4688 Compare April 28, 2026 15:21
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review April 30, 2026 01:16

virtual std::shared_ptr<arrow::DataType> getDataType() = 0;
virtual std::shared_ptr<arrow::ArrayBuilder> getBuilder() = 0;
virtual std::shared_ptr<::arrow::DataType> getDataType() = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the :: additions are kind of noisy here

ColumnAdapterReference m_valueCountColumn;
std::unique_ptr<ParquetReader> m_reader;
std::string m_basketName;
std::string m_basketSymbolColumn;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This member doesn't seem to be used at all, just written to in setupDictBaskets

return getOrCreateStructColumnAdapter( m_simInputAdapters, type, symbol, dictFieldMap, pushMode );
}
CSP_THROW( RuntimeException, "Reached unreachable code" );
properties.get<std::string>( "field_map" );
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this line here? Not added in your PR but seems like it was a mistake to begin with

for( auto && record : m_dictBasketReaders )
{
auto numValues = record.getValueCount();
const char * phase = dispatch ? "dispatch" : "skip";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This string doesn't need to be created per loop iteration, can move to function level scope

[]( const ::arrow::HalfFloatArray & arr, int64_t i ) -> double {
return ::arrow::util::Float16::FromBits( arr.Value( i ) ).ToDouble();
} );
case ::arrow::Type::STRING:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambdas for STRING, LARGE_STRING, BINARY, LARGE_BINARY, FIXED_SIZE_BINARY are all exactly the same except the arg type, can you just define it once and templatize it? Might just be able to declare the arr arg as auto too and put all of them as a single case

}

// Pull first non-empty batch
for( ;; )
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just the same logic as fetchNextBatch directly below? Can probably just pass a default constructed entry to it and not need to duplicate the logic

void RecordBatchRowProcessor::rebindSource( SourceEntry & entry )
{
for( size_t i = 0; i < entry.dispatchers.size(); ++i )
entry.dispatchers[i] -> bindColumn( entry.currentBatch -> column( entry.colIndices[i] ).get() );
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler may do this anyways, but you can get vector & cols = entry.currentBatch -> columns() before the loop and then just index into it within the loop and avoid some indirections

properties.tryGet( "time_shift", m_time_shift );

CSP_TRUE_OR_THROW_RUNTIME( m_timeColumn != "", "Time column can't be empty" );
CSP_TRUE_OR_THROW_RUNTIME( m_defaultTimezone == "UTC",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even accept tz as an argument then?

Comment thread csp/adapters/parquet.py
raise TypeError("CSP Cannot load binary arrows derived from pyarrow versions less than 4.0.1")
wrapped = self._filenames_gen
self._filenames_gen = lambda starttime, endtime: self._arrow_c_data_interface(wrapped, starttime, endtime)
self._table_gen = self._filenames_gen # Alias: memory-table path uses _table_gen
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this about? I don't see why we need the two of these

void doReadNextValue( int64_t row, void * optionalOut ) override
{
auto & out = *static_cast<std::optional<ValueT> *>( optionalOut );
auto & typed = static_cast<const ArrowArrayT &>( *this -> m_column );
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can replace 196-200 with if( !doExtract( row, out ) out.reset()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

adapter: parquet Issues and PRs related to our Apache Parquet/Arrow adapter type: feature Issues and PRs related to new features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants