Adapter pattern¶
This page documents the shared framework every adapter in this package
implements. The goal is that adding a new source — SWPC, GIM, GOES,
DSCOVR — is a matter of subclassing BaseAdapter and filling in the
HTTP details, not re-deciding lifecycle, rate-limiting, caching, or
provenance semantics.
High-level shape¶
┌─────────────────────────────────────────────────────────────┐
│ BaseAdapter (ABC) │
│ │
│ source_id : SourceID (class var, must be set) │
│ fetch() : AsyncIterator[NormalizedRecord] (abstract) │
│ fetch_sync(): list[NormalizedRecord] │
│ _emit_provenance() -> ProvenanceRecord │
│ _ratelimiter / _cache / _client / _owns_client │
└──────────────────────────┬──────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ DonkiAdapter│ │ SwpcAdapter │ │ GoesAdapter │
│ (v0.1) │ │ (v0.2) │ │ (v0.3) │
└─────────────┘ └─────────────┘ └─────────────┘
Every adapter exposes:
- An async streaming
fetch(start, end, **kwargs)returning anAsyncIterator[NormalizedRecord]. This is the production interface; downstream pipelines can apply backpressure by consuming records one at a time. - A sync
fetch_sync(start, end, **kwargs)convenience wrapper that drains the async iterator into a list. Useful for notebooks and one-shot scripts. Refuses to run inside an active event loop. - Source-specific endpoint conveniences (e.g.
fetch_cme,fetch_gst) that delegate tofetch()with the right kwargs. - A
source_id: ClassVar[SourceID]so downstream code can dispatch on the source without string matching.
The NormalizedRecord contract¶
python
@dataclass
class NormalizedRecord:
source: SourceID # which upstream system produced this
record_type: str # source-local discriminator (e.g. "CME")
event_time: datetime # UTC
value: dict[str, Any] # JSON-serializable science payload
value_units: str # e.g. "pfu", "nT", "none"
provenance: ProvenanceRecord
raw: dict[str, Any] # the unmodified upstream response
Every adapter produces these. The shape is deliberately wider than any
single source needs so we can support both scalar (e.g. Kp index) and
structured (e.g. CME with linked flares) payloads in value.
Provenance: feature-level lineage¶
python
@dataclass(frozen=True)
class ProvenanceRecord:
id: str # globally unique
schema_version: str
model_id: str # e.g. "donki/CME", "swpc/kp-3-hour"
dataset_refs: tuple[str, ...]
timestamp: datetime
value: Any
value_units: str
ingestion_timestamp: datetime # when this adapter observed the upstream
lineage: tuple[str, ...] # upstream events / models contributing
For DONKI, lineage is populated from the linkedEvents array
(intelligent linkages). For other sources, lineage carries whatever
upstream identifier chain makes the record reproducible — e.g. for a
SEP Scoreboard A entry, the lineage tuple is the list of model IDs
contributing to that fused row.
This is a placeholder type in helios_connectors.schema that will
be replaced by an import from helios-provenance once that package's
v0.1 schema is pinned. The swap is a single-line change because the
field shape was designed to match.
Concurrency model¶
- The base class wraps every adapter in an async context manager so
the underlying
httpx.AsyncClientis closed cleanly even if the caller bails out mid-iteration. - Per-adapter rate limiters (
RateLimitConfigtoken bucket) live inside the adapter instance. Two adapters on different sources never share a limiter; concurrent calls to a slow source can't back-pressure a fast one. - For the unified DONKI
fetch(types=[...])call we useasyncio.gatherto issue per-endpoint calls concurrently. Each call passes through the same rate limiter, so the aggregate rate stays inside CCMC's limit even when the underlying HTTP calls overlap.
Caching¶
- Default cache lives under
~/.cache/helios-connectors/<source>/.... - Override via the
HELIOS_CACHE_ROOTenv var. - Keyed by
(source_id, sorted(query_params))→ SHA-256 fingerprint. - Contents are pandas DataFrames serialized to parquet via pyarrow, so any DuckDB / Polars / pandas user can inspect them out of band.
- Pass
cache=Falseon construction to disable; pass an explicitFileCacheinstance to use a custom one.
HTTP and retry¶
- Every adapter uses
helios_connectors.http.make_client()for itsAsyncClient. This sets the User-Agent NASA APIs prefer, a 30s total timeout, and a connection pool capped at 20. request_with_retry()retrieshttpx.TransportErrorand HTTP 429 / 5xx with exponential backoff (1s → 30s, 4 attempts).- API keys are passed as query parameters per NASA convention and are
never logged.
safe_log_paramsallowlists which params are included in debug logs.
Adding a new adapter — the recipe¶
- Pick a
SourceID— add a member tohelios_connectors.schema.SourceID. - Subclass
BaseAdapterinhelios_connectors.adapters.<name>. - Set
source_idas aClassVar[SourceID]. - Override
_default_rate_limitif the source needs a non-10-RPS default. NOAA SWPC: 5 RPS. DEMO_KEY-gated calls: 1 RPS. - Implement
fetch()— it must be an async generator. Useawait self._ratelimiter.acquire()before every outbound call. - Build each
NormalizedRecordwithself._emit_provenance()so timestamps are UTC-normalized and IDs are stable. - Tests: capture a handful of real upstream responses into
tests/fixtures/<source>/, write atest_<source>.pymodeled ontest_donki.py, mark a live integration test with@pytest.mark.live. - Docs: add
docs/adapters/<source>.mdand update the table indocs/index.md. - Add to
helios_connectors.adapters.__init__so the adapter is importable fromhelios_connectors.
The next four adapters (SEP Scoreboards, SWPC, CDDIS GIMs, GOES /
DSCOVR wrappers) are designed to be cloned from DonkiAdapter's
shape; if you find yourself wanting to break the base class to make
your adapter fit, that's a signal the base class needs to widen,
not that your adapter is a special case.