Technical deep dive
This document briefly describes the abstractions and mechanisms used in d8a tracker. It's suitable as a development resource, can also be consumed by an LLM to get better understanding of the landscape.
Package layout
Most important packages and abstractions of the tracker project:
pkg/receiver- receives hits from the HTTP endpoint and places them into the message queue as taskspkg/protosessions- reads tasks from the queue and groups hits into proto-sessions, then closes them into sessionspkg/sessions- handles session closing, column processing, spooling, and writing to the warehousepkg/columnsandpkg/schema- column definitions (interfaces) and column implementations (how values are computed and written)pkg/warehouse- provides abstractions for writing data to various data warehousespkg/splitter- splits and filters sessions based on configurable conditions (UTM change, user ID change, max events, time limit)pkg/spools- crash-safe keyed framed-file append+flush primitive used for persistent session spooling
Other, utility packages:
pkg/cmd- command line arguments parsing, configuration loading, and full pipeline wiringpkg/worker- abstractions for the queue logic (publisher, consumer, task, worker, middleware)pkg/bolt- BoltDB-backed implementations for KV, set, and proto-session I/O primitivespkg/storage- generic KV and set storage interfaces with in-memory and monitoring implementationspkg/protocol- tracking protocol abstractions and implementations (GA4, Matomo, D8A)pkg/properties- property settings registry and configurationpkg/hits- coreHitdata structure representing a single tracking requestpkg/encoding- pluggable encoder/decoder function pairs (CBOR+zlib, JSON, gzip+JSON, gob)pkg/storagepublisher- thin adapter that serializes a batch of hits into a worker task and publishes it
The essence
The tracking pipeline, in its essence looks as follows:
-
The HTTP request containing tracked data is received by the
receiverpackage. It contains mappings from the HTTP path to specificprotocol.Protocolimplementation (like GA4, Matomo, etc). Protocol helps thereceiverto create ahits.Hitobject. It's a very narrow wrapper over a http request, containing some additional attributes essential for later processing (ClientID,PropertyID) and session creation. -
After a hit is created, it's pushed to implementation of
receiver.Storageinterface. It's a very simple interface, that just accepts a hit and stores it. Under the hood, there's a batcher that buffers hits and pushes them to a generic queue (worker.Publisherimplementation). -
On the other side, a
worker.Consumerimplementation reads the tasks, aworker.TaskHandlerdeserializes generic bytes back intohits.Hitobjects. The protosession logic kicks in. -
A protosession is a collection of hits, that may form a session in the future. It's perfectly possible, that a collection of hits will be split into multiple sessions. The logic in
protosessionsin essence groups the potentially related hits into a single collection. When a given period of time since the last hit was added to given protosession is reached, the protosession is closed usingprotosessions.Closerimplementation. -
The
sessionspackage handles closing.DirectCloserconverts proto-sessions (groups of*hits.Hit) intoschema.Sessionobjects and delegates to aSessionWriter. The writer runs the columns machinery, splits sessions via thesplitterpackage, converts results to rows via aLayout, and writes them to the warehouse. The writer may be decorated with spooling layers (inMemSpoolWriterfor in-memory buffering,persistentSpoolWriterfor crash-safe disk spooling viapkg/spools) before the actual warehouse write. -
After the columns machinery creates rows for specific tables, it writes them to
warehouse.Driverimplementation. The types of columns are defined in columns machinery using Apache Arrow types, the drivers are responsible for mapping them to their native types.
1. Hit creation
Everything begins in the receiver package. It's a HTTP server, that receives requests and creates hits.Hit objects. It's currently implemented in fasthttp, but it's very loosely coupled to the underlying HTTP server.
The main goal of receiver package is to create a hits.Hit object from every incoming request and pass it ASAP to some persistent storage, so it won't be lost.
The Hit structure looks something like this:
type Hit struct {
ID string `cbor:"i"`
ClientID ClientID `cbor:"ci"`
PropertyID string `cbor:"pi"`
IP string `cbor:"ip"`
Host string `cbor:"h"`
ServerReceivedTime string `cbor:"srt"`
QueryParams url.Values `cbor:"qp"`
BodyBase64 string `cbor:"bd"`
// Other HTTP-related fields
}
Basically it wraps all the HTTP request fields with some additional info, usable with next pipeline steps, namely:
ClientID, which is deeply described in identifiers. Basically it's a unique, anonymous (by itself) identifier of a client, stored on the client side (for example using cookies) and used to identify the client across multiple requests. TheClientIDis later used to connect individual hits into proto-sessions and also for partitioning (in d8a cloud).PropertyID, which is a unique identifier of a property, as GA4 understands it. Other protocols are forced to use GA4 nomenclature, but are free to store the analogous identifiers in this field (likeMatomousesidSite). Later pipeline steps configuration, use thePropertyIDto get the entities, that may be configured for given property, like:- table layout (single merged table or separate tables for sessions and events)
- table columns
- destination warehouse
The two above are obviously protocol-specific, that's why receiver delegates the parsing of HTTP request when creating those, to the respective protocol.Protocol implementation.
Key interfaces
protocol.Protocol - defines a tracking protocol implementation (GA4, Matomo, D8A). Parses HTTP requests into hits, provides protocol-specific columns and endpoints.
ga4Protocol- GA4-compatible protocol that parses Google Analytics 4 Measurement Protocol requestsmatomoProtocol- Matomo/Piwik protocol that parses single and bulk tracking requestsd8aProtocol- D8A native protocol wrapping GA4 with rewritten endpoints and interface IDs
protocol.Registry - resolves the appropriate Protocol for a given property ID.
staticProtocolRegistry- map-based registry with a default protocol fallback
protocol.PropertyIDExtractor - extracts property ID from a parsed request.
fromTidByMeasurementIDExtractor- extracts from GA4tidquery parameterfromIDSiteExtractor- extracts from Matomoidsitequery parameter
receiver.HitValidatingRule - validation strategy for incoming hits; rules are composable.
multipleHitValidatingRule- composite rule that runs all child rules and joins errorssimpleHitValidatingRule- wraps a plain function as a validation rule- Pre-built rules:
ClientIDNotEmpty,PropertyIDNotEmpty,HitHeadersNotEmpty,EventNameNotEmpty,TotalHitSizeDoesNotExceed(max), etc.
receiver.RawLogStorage - optional side-channel for storing raw requests before hit conversion (debugging/auditing).
NoopRawLogStorage- discards all data
properties.SettingsRegistry - looks up property configuration by measurement ID or property ID.
StaticSettingsRegistry- static in-memory registry backed by two maps with optional default fallback
2. Receiver storage & batching
All the hits in server package are batched and pushed to a receiver.Storage implementation.
// Storage is a storage interface for storing hits
type Storage interface {
Push([]*hits.Hit) error
}
In theory it can be any storage, which gives a lot of flexibility in future configurations. Currently, all the passed hits are batched and pushed to a worker.Publisher implementation. This means, that you can have as many receivers as you want, but on the other side of the queue (worker.Consumer) you'll have only one instance.
Key interfaces
receiver.Storage - core abstraction for persisting/forwarding hits after they are received.
BatchingStorage- accumulates hits in aBatchingBackendand flushes to a childStorageon batch-size or timeout thresholdstoragepublisher.Adapter- serializes hits into aworker.Taskand publishes them viaworker.Publisher(production child storage)dropToStdoutStorage- writes each hit as pretty-printed JSON to stdout (debug)
receiver.BatchingBackend - pluggable persistence layer for staging hits between arrival and flush.
memoryBatchingBackend- in-memory slice, clears after flush (default)fileBatchingBackend- durable staging to disk using an append-only framed-JSON file
3. Queue processing
Queue implemented for tracker-api is generic, and can be used in later steps (for example after session is closed and before it's written to the warehouse - currently it's not - for quicker MVP delivery). It's implemented in worker package.
The semantics are dead simple - you publish to named queue, that accepts only one type of task, something on the other side consumes it. There are no sophisticated features like AMQP's bindings, exponential backoff and such. Such dead simple approach is limiting, but offers a wide range of possible implementations (currently we have filesystem and object storage implementations). The interfaces are again very simple. There are two interfaces, that operate on Task objects, that are really generic:
// Consumer defines an interface for task consumers
type Consumer interface {
Consume(handler TaskHandlerFunc) error
}
// TaskHandlerFunc is a function that handles a task
type TaskHandlerFunc func(task *Task) error
// Task represents a unit of work with type, headers and data
type Task struct {
Type string
Headers map[string]string
Body []byte
}
// Publisher defines an interface for task publishers that can publish tasks
type Publisher interface {
Publish(task *Task) error
}
And on top of that, there's a worker.Worker struct, that helps mapping task types to given queues, using generics - it automatically unmarshalls the task body and passes it to the respective handler with correct type.
w := worker.NewWorker(
[]worker.TaskHandler{
worker.NewGenericTaskHandler(
hits.HitProcessingTaskName,
encoding.ZlibCBORDecoder,
func(headers map[string]string, data *hits.Hit) *worker.Error {
// Process the hit, return specific (retryable or droppable) error
return nil
},
),
},
[]worker.Middleware{
// middleware using the headers, used for partitioning and such
},
)
Key interfaces
worker.Publisher - publishes a single task to a queue.
FilesystemDirectoryPublisher- writes tasks atomically to timestamped.taskfiles in a directoryobjectstorage.Publisher- uploads serialized tasks to an object storage bucket via Go CDKmonitoringPublisher- decorator that records OpenTelemetry metrics, then delegates
worker.Consumer - consumes tasks from a queue via a handler callback.
FilesystemDirectoryConsumer- polls a directory for.taskfiles, processes in timestamp orderobjectstorage.Consumer- polls an object storage bucket for task objects via Go CDK
worker.Middleware - wraps task processing with a next-chain pattern (similar to HTTP middleware).
- No production implementations in core packages currently
worker.TaskHandler - handler for a specific task type.
genericTaskHandler[T]- type-parameterized handler that decodes the body intoTand calls a typed processor
worker.MessageFormat - serialization/deserialization of *Task to/from []byte.
binaryMessageFormat- binary wire format with type-length prefix, JSON-encoded headers, and raw body
4. Protosession logic
There's a specific handler for tasks containing hits.Hit objects. It's implemented in protosessions package - protosessions.Handler function creates it. Here we meet the main principle of this design:
Consecutive hits belonging to the same proto sessions must be processed by the same worker.
It's connected to how the session closing logic works. For each ClientID, the protosessions.Handler holds a clock. If 30 minutes (configurable) passed since the last hit was added to the proto-session, the session is closed.
This is implemented using concept loosely based on timing wheels. Every second a tick is emitted, that checks if for given second, any proto-sessions are ready to be closed. More detailed description is in the code itself, in protosessions/trigger.go file.
The current implementation doesn't allow a single proto-session to be processed by multiple workers, hence the requirement above. Due to this property, we introduced partitioning in d8a cloud.
The protosessions package also handles some dynamic logic via protosessions.Middleware interface:
- Evicting - a proto-session may be evicted from given worker if the system detects, that it should be connected to another proto-session. This may happen for example if the system detects, that two proto-sessions are coming from the same device (have the same session stamp). This may mean, that user removed cookies or used different browser, and two proto-sessions are preliminarily connected into one.
- Compaction - all the information about proto-sessions is stored in simple and generic data-structures - a
storage.Setandstorage.KVimplementations (currentlybolt). If a - future -storage.Setorstorage.KVis memory-constrained (for example Redis), it may happen that even in 30-minute window, the system will have too many proto-sessions to process. To avoid that,protosessionscalculates the size of each proto-session and compacts it if it's too big. Currently the compaction is done in-place, by replacing raw hits with compressed ones in the samestorage.Set. Nevertheless, the interfaces are already laid in a way, that allows adding layered storage, that would allow for more efficient compaction (for example, storing compressed proto-sessions in a separatestorage.Setbacked by Object Storage).
The closing of protosessions happens by the protosessions.Closer interface.
// Closer defines an interface for closing and processing hit sessions
type Closer interface {
Close(protosession [][]*hits.Hit) error
}
It's prepared for asynchronous closing, where the task system described in Queue Processing is used. Currently, the closing is done in-place, the Close method synchronously writes the session to the warehouse. This is not perfect, but it's a good compromise for now.
Key interfaces
protosessions.Closer - processes and closes proto-sessions (groups of hits that form a session).
shardingCloser- distributes batches of proto-sessions across N child closers using FNV hash-based sharding on isolated client IDsessions.DirectCloser- converts proto-sessions intoschema.Sessionobjects, groups by property, and writes them to aSessionWriter(production closer)printingCloser- logs each hit to stdout for debugging
protosessions.BatchedIOBackend - batched I/O for proto-session storage (identifier conflict detection, hit append/get, timing-wheel bucket marking, cleanup).
bolt.boltBatchedIOBackend- BoltDB-backed backend with in-memory session-to-bucket cache; all operations run in single transactionsdeduplicatingBatchedIOBackend- decorator that deduplicates identical requests before forwarding
protosessions.TimingWheelStateBackend - persistence for the timing wheel's cursor position.
genericKVTimingWheelBackend- stores bucket state in astorage.KVunder a prefixed key
protosessions.IdentifierIsolationGuardFactory / IdentifierIsolationGuard - ensures cross-property data isolation by hashing client IDs with property ID.
defaultIdentifierIsolationFactory/defaultIdentifierIsolationGuard- SHA-256 hashes client IDs with property ID for isolationnoIsolationFactory/noIsolationGuard- returns IDs as-is, no isolation (single-tenant only)
storage.KV - simple key-value store abstraction.
bolt.boltKV- BoltDB-backed persistent KV storestorage.InMemoryKV- in-memory map-based KV with RWMutexmonitoringKV- decorator that records OpenTelemetry latency histograms
storage.Set - set-of-values-per-key storage.
bolt.boltSet- BoltDB-backed persistent set using nested bucketsstorage.InMemorySet- in-memory map-of-setsmonitoringSet- decorator that records OpenTelemetry latency histograms
4.1 Isolation
The isolation mechanism ensures that proto-sessions from different properties are kept separate, even when they share the same client identifiers. Without isolation, users from different properties, under some conditions, could have their hits incorrectly grouped into a single proto-session.
The isolation is implemented through the IdentifierIsolationGuard interface, which provides three key capabilities:
type IdentifierIsolationGuardFactory interface {
New(settings *properties.Settings) IdentifierIsolationGuard
}
type IdentifierIsolationGuard interface {
IsolatedClientID(hit *hits.Hit) hits.ClientID
IsolatedSessionStamp(hit *hits.Hit) string
IsolatedUserID(hit *hits.Hit) string
}
The IsolatedClientID method transforms the AuthoritativeClientID used for storage keys, ensuring that the same client ID from different properties results in different isolated identifiers. The default implementation hashes the property ID together with the Client ID.
The IsolatedSessionStamp method produces property-scoped session stamps, IsolatedUserID similar, but for user ID.
5. Columns machinery
5.1 Columns
Columns machinery is quite complex, it offers the following capabilities:
- Ability to define a column "Interface" (
schema.Interface), a struct that describes the column:- Column id (to be used in dependency system)
- Column version (as above)
- Column name
- Column type
- Ability to separately define the behavior, that writes data to this column from a given hit.
Writemethod- Separate interfaces for
Event,Session, andSessionScopedEventcolumns
This decouples the concept of
- What is the column name and what it stores
- And how it's written from a given hit
Allowing us to centrally define the core interfaces columns/core.go and then implement some them in respective protocol implementations.
type Interface struct {
ID InterfaceID
Version Version
Field *arrow.Field
}
// Column represents a column with metadata and dependencies.
type Column interface {
Implements() Interface
DependsOn() []DependsOnEntry
}
// EventColumn represents a column that can be written to during event processing.
type EventColumn interface {
Column
Write(event *Event) error // Event is a simple struct with map[string]any to write values to
}
// SessionColumn represents a column that can be written to during session processing.
type SessionColumn interface {
Column
Write(session *Session) error // As Event, but also has the collection of all the events in the session as a separate field (only for reading)
}
// SessionScopedEventColumn represents a column that writes per-event values with session-wide context.
type SessionScopedEventColumn interface {
Column
Write(session *Session, i int) error // Takes the session and event index
}
It also allows parallel implementations for the same column interface, for example as paid extras (competing geoip implementations - we don't need to select between MaxMind or DbIP - we can use both and let the user decide which one to use).
Most column implementations are in columns/eventcolumns and columns/sessioncolumns packages, some will be scattered across protocol implementations. General interfaces are in columns package.
Most of the machinery itself is implemented in sessions package, both the Closer implementation and utilities for combining everything together.
Key interfaces
schema.Column (base) / schema.EventColumn / schema.SessionColumn / schema.SessionScopedEventColumn - the three column types.
simpleEventColumn- generic event column; constructed viaNewSimpleEventColumn,FromQueryParamEventColumn,URLElementColumn,AlwaysNilEventColumn, etc.simpleSessionColumn- generic session column; constructed viaNewSimpleSessionColumn,FromQueryParamSessionColumn,NthEventMatchingPredicateValueColumn, etc.simpleSessionScopedEventColumn- generic session-scoped event column; constructed viaNewSimpleSessionScopedEventColumn,NewValueTransitionColumn,NewFirstLastMatchingEventColumn, etc.
schema.ColumnsRegistry - resolves the full set of columns for a given property ID.
staticColumnsRegistry- maps property IDs to columns with a default fallbackmerger- merges multipleColumnsRegistryinstances and topologically sorts the result
schema.OrderKeeper - determines output column ordering for stable Arrow schemas.
InterfaceOrdering- derives order from Go struct field positionsnoParticicularOrderKeeper- assigns order in first-seen order (for testing)
schema.D8AColumnWriteError - error interface for column write operations with retryability semantics.
BrokenSessionError- non-retryable, marks the session as brokenBrokenEventError- non-retryable, marks the event as brokenRetryableError- retryable, the pipeline retries the whole batch
Notable pre-built columns:
- Event:
EventIDColumn,EventNameColumn,ClientIDColumn,UserIDColumn,IPAddressColumn, UTM columns, click ID columns, device columns (dd2-based) - Session:
SessionIDColumn,DurationColumn,TotalEventsColumn,ReferrerColumn,SplitCauseColumn, source/medium/term columns - Session-scoped event:
SSESessionHitNumber,SSESessionPageNumber,SSETrafficFilterName
columns.SourceMediumTermDetector - detects session source, medium, and term from events.
compositeSourceMediumTermDetector- runs a chain of child detectors in priority orderdirectSourceMediumTermDetector- returns(direct) / nonewhen no referrerpageLocationParamsDetector- detects from URL query params (gclid, fbclid, etc.)searchEngineDetector- matches referrer against search engine databasesocialsDetector- matches referrer against social networks databaseaiDetector- matches referrer against AI tools databasevideoDetector- matches referrer against video sites databaseemailDetector- matches referrer against email provider databasegenericReferralDetector- fallback: any referrer hostname becomessource=hostname / medium=referral
5.2 Tables
Tables are also customizable. They're defined using the following concepts:
// Layout is the interface for a table layout, implementations take control over
// the final schema and dictate the format of writing the session data to the table.
type Layout interface {
Tables(columns Columns) []WithName
ToRows(columns Columns, sessions ...*Session) ([]TableRows, error)
}
// WithName adds a table name to the schema
type WithName struct {
Schema *arrow.Schema
Table string
}
// TableRows are a collection of rows with a table to write them to
type TableRows struct {
Table string
Rows []map[string]any
}
Basically, Layout interface tells what tables and with what schema should be created, and ToRows method takes the columns and sessions and returns a collection of rows to write to given tables.
schema.Layout - controls final schema and dictates the format of writing session data to tables.
eventsWithEmbeddedSessionColumnsLayout- single-table layout that embeds session columns into the events table with a configurable prefixbatchingLayout- decorator that merges rows from the same table into a single batch entrybrokenFilteringLayout- decorator that filters out broken sessions and events before writing
schema.LayoutRegistry - resolves a Layout for a given property ID.
staticLayoutRegistry- maps property IDs to layouts with a default fallback
6. Session closing and writing
When a protosession is closed, the sessions.DirectCloser converts proto-sessions (groups of *hits.Hit) into *schema.Session objects, groups them by property, and delegates to a SessionWriter.
The SessionWriter is assembled as a decorator chain whose shape depends on the delivery mode:
- Best-effort (default):
inMemSpoolWriter→persistentSpoolWriter→sessionWriterImpl - At-least-once:
persistentSpoolWriter→sessionWriterImpl(no in-memory buffer; sessions go straight to disk spool) - Spooling disabled:
sessionWriterImpldirectly
The spooling decorators provide resilience:
-
inMemSpoolWriter- buffers sessions in memory per property and flushes to its child writer when a count threshold (maxSessions) or age threshold (maxAge) is reached. A background goroutine sweeps periodically. On flush failure, sessions are retained in the buffer for retry. -
persistentSpoolWriter- encodes sessions viaencoding.EncoderFunc(Gob by default), appends to a crash-safespools.Spoolkeyed by property, and periodically flushes via a background actor loop that decodes and delegates to the child writer. Failure handling (max consecutive failures, delete vs. quarantine strategy) is configured on the underlyingspools.Spool, not on the writer itself.
The core sessionWriterImpl then:
- Resolves warehouse driver, layout, and columns per property (cached with TTL)
- Runs event columns (
EventColumn.Write) for each event - Splits sessions via the
splitterpackage - Runs session-scoped event columns (
SessionScopedEventColumn.Write) for each event in context - Runs session columns (
SessionColumn.Write) for each session - Converts to rows via
Layout.ToRows - Writes rows to each table in parallel via
warehouse.Driver
Key interfaces
sessions.SessionWriter - core interface for writing closed sessions through the column pipeline to the warehouse.
sessionWriterImpl- the main writer: resolves layout/columns/warehouse per property, runs columns, splits, converts to rows, writes to warehouseinMemSpoolWriter- decorator that buffers sessions in memory and flushes on count/age thresholdspersistentSpoolWriter- decorator that encodes sessions to disk spool and flushes periodicallynoopWriter- does nothing (testing)
spools.FailureStrategy - defines behavior when a spool file exceeds maximum consecutive failures. Configured on the spools.Spool via spools.WithFailureStrategy().
spools.deleteStrategy- deletes the spool file (best-effort, data loss acceptable)spools.quarantineStrategy- renames spool file to.quarantinesuffix (preserves for manual recovery)
spools.Spool - crash-safe keyed framed-file append+flush primitive.
fileSpool-afero.Fs-backed implementation using length-prefixed binary frames, rename-before-read flush isolation, and mutex-protected concurrent access
splitter.SessionModifier - takes a session and returns zero or more split/filtered sessions.
splitterImpl- core splitter that evaluates a list ofConditions sequentially against eventsfilterModifier- filters events from a session using compiled expr-lang expressionsMultiModifier- chains multiple modifiers, feeding output of one into the next
splitter.Condition - decides whether a session should be split at a given event.
nullableStringColumnValueChangedCondition- splits when a nullable-string column value changes (UTM, user ID)maxXEventsCondition- splits when event count exceeds a thresholdtimeSinceFirstEventCondition- splits when elapsed time since first event exceeds a duration
splitter.Registry - provides a SessionModifier for a given property ID.
fromPropertySettingsRegistry- builds a modifier from property settings (conditions + optional filter)cachingRegistry- wraps another registry with a ristretto TTL cachestaticRegistry- always returns the same modifier regardless of property
7. Warehouse
The warehouse package provides a unified interface for writing session data to various data warehouses. It abstracts away warehouse-specific details while maintaining compatibility with Apache Arrow schemas used throughout the columns machinery.
7.1 Driver interface
The core abstraction is the Driver interface, which defines operations for table management and data ingestion:
// Driver abstracts data warehouse operations for table management and data ingestion.
// Implementations handle warehouse-specific DDL/DML operations while maintaining
// compatibility with Apache Arrow schemas.
type Driver interface {
// CreateTable creates a new table with the specified Arrow schema.
CreateTable(table string, schema *arrow.Schema) error
// AddColumn adds a new column to an existing table.
AddColumn(table string, field *arrow.Field) error
// Write inserts batch data into the specified table.
Write(ctx context.Context, table string, schema *arrow.Schema, rows []map[string]any) error
// MissingColumns compares provided schema against existing table structure.
MissingColumns(table string, schema *arrow.Schema) ([]*arrow.Field, error)
// Close releases resources held by the driver.
Close() error
}
The Write method accepts rows as []map[string]any, where each map represents a row with column names as keys. This format is produced by the Layout.ToRows method from the columns machinery.
7.2 Type mapping
Warehouse drivers must convert between Apache Arrow types and warehouse-native types. This is handled through the FieldTypeMapper interface:
// FieldTypeMapper provides bidirectional conversion between Arrow types and warehouse-specific types.
type FieldTypeMapper[WHT SpecificWarehouseType] interface {
ArrowToWarehouse(arrowType ArrowType) (WHT, error)
WarehouseToArrow(warehouseType WHT) (ArrowType, error)
}
Each relevant warehouse driver implementation (BigQuery, ClickHouse) provides its own type mapper that handles:
- Primitive types (integers, floats, strings, booleans)
- Complex types (timestamps, dates, arrays, structs)
- Nullability handling
- Type-specific formatting for data insertion
The type mapping system also supports compatibility rules, allowing certain type conversions (e.g., INT32 ↔ INT64) to be considered valid during schema comparisons.
7.3 Schema management
Drivers handle schema evolution through three main operations:
-
CreateTable - Creates a new table with the specified Arrow schema. The driver converts Arrow field definitions to warehouse-specific DDL statements. For SQL-based warehouses, this uses the
QueryMapperinterface to generate CREATE TABLE statements. -
AddColumn - Adds a new column to an existing table. Before adding, the driver checks if the column already exists to avoid errors. This enables schema evolution as new columns are added to the column definitions.
-
MissingColumns - Detects schema drift by comparing the expected schema (from column definitions) with the actual table schema. This is used before writes to automatically add missing columns. The method also performs type compatibility checking to ensure existing columns match expected types.
The FindMissingColumns function provides common logic for comparing schemas, handling both missing columns and type incompatibilities. It uses a FieldCompatibilityChecker to determine if existing columns are compatible with expected types.
7.4 Key interfaces and implementations
warehouse.Driver - the central abstraction for table DDL and data ingestion.
bigQueryTableDriver- full BigQuery driver with partitioning, streaming insert or load job write strategies, type compatibility rulesclickhouseDriver- full ClickHouse driver using native protocol batch inserts, column ordering, TTL cacheSpoolDriver- file-based driver that writes rows to local disk via aFormat, seals segments by size/age, and uploads via anUploadernoopDriver- silent no-op, all methods return nilconsoleDriver- prints rows as JSON to stdout, delegates to nooploggingDriver- logs operation summaries via logrus, then delegates to a wrapped driver
warehouse.Registry - looks up a Driver by property ID.
staticDriverRegistry- returns the same driver for all properties (single-tenant deployments)
warehouse.QueryMapper - generates warehouse-specific SQL DDL fragments from Arrow schemas.
clickhouseQueryMapper- generates ClickHouse DDL with configurable ENGINE, PARTITION BY, ORDER BY
warehouse.FieldTypeMapper[T] - bidirectional Arrow ↔ warehouse type conversion (generic interface).
- BigQuery: 11 mappers covering string, int32/64, float32/64, bool, timestamp, date32, arrays, nested, nullable
- ClickHouse: 13 mappers covering the above plus low-cardinality, nullability-as-default, restricted nested
TypeMapperImpl[T]- composite mapper that chains multiple mappers, returning the first successful mappingdeferredMapper[T]- lazy proxy for breaking circular dependencies during construction
warehouse.FieldCompatibilityChecker - determines whether two Arrow fields are type-compatible.
bigQueryTableDriver- checks compatibility with int32/int64 and float32/float64 leniencyclickhouseDriver- relaxed scalar nullability, strict struct/list nullability
files.Format - defines how data is serialized to files.
csvFormat- writes data as CSV with optional gzip compression
files.Uploader - handles uploading local files to a destination.
blobUploader- uploads to cloud blob storage viagocloud.dev/blob, then deletes localfilesystemUploader- moves files to a local destination directory
bigquery.Writer - BigQuery-specific write strategy.
streamingWriter- uses BigQuery streaming insert APIloadJobWriter- uses BigQuery load jobs with NDJSON (free-tier compatible)
7.5 Integration with session closing
When a protosession is closed, the sessions.SessionWriter uses the warehouse registry to get the appropriate driver for the property. It then:
- Retrieves the table layout and columns for the property
- Processes sessions through the columns machinery to generate rows
- Converts sessions to table rows using the layout's
ToRowsmethod - Writes rows to each table in parallel using the warehouse driver
The writer handles schema management automatically - if columns are missing, they are added before writing. This ensures that schema changes in column definitions are automatically reflected in the warehouse tables.