Home / Docs-Technical WhitePaper / 45-EFT.WP.Data.Pipeline v1.0
I. Chapter Purpose & Scope
layer’s specifications and engineering practices: connector types, credentials & security, idempotency/retry/checkpointing, dedup & dedupe keys, throughput & latency metrology, contract alignment (Σ_in/Σ_out), exception handling and audit exports; ensure consistency with Dataset/Model Cards, the Metrology chapter, and citation anchors.Sources & IngestFix theII. Terminology & Dependencies
- Terms: source/connector, ingest, checkpoint, cursor/watermark, idempotency, dedupe_key, SLA/SLO, orchestrator.
- Dependencies: data contracts & exports in Core.DataSpec v1.0; units & dimensional checks in Core.Metrology v1.0; split/quality alignment in DatasetCards v1.0; feature/I-O alignment in ModelCards v1.0.
- Math & symbols: wrap inline symbols like QPS, T_inf, ρ in backticks; any division/integral/composite operator must use parentheses; if path quantities such as T_arr are involved, register gamma(ell) and d ell; no Chinese in formulas/symbols/definitions.
III. Fields & Structure (Normative)
stage:
name: "<src.kind.name>"
type: "source.<s3|gcs|fs|db|kafka|http|custom>"
impl: "I16-1.<impl_id>"
params:
endpoint: "<url-or-bootstrap>"
bucket_or_db: "<bucket|db>"
prefix_or_table: "<prefix|schema.table>"
query_or_pattern: "<sql|glob>"
credentials_ref: "secrets://path/to/credential"
format: "<json|parquet|csv|avro|binary>"
watermark:
field: "<updated_at|offset|lsn>"
start: "<ISO8601|offset>"
step: "<PT5M|1000>"
checkpoint:
path: "s3://.../chk/<stage>"
mode: "exactly-once|at-least-once"
dedupe_key: ["<pk>", "<ts>"]
outputs: ["raw_blob|raw_rows|events"]
idempotent: true
retries: {max: 3, backoff: "expo", jitter_ms: 200}
timeout_s: 1800
on_fail: "quarantine|skip|block"
schema_ref: "<contracts/raw@vX.Y>"
IV. Connector Types & Specifications
- Object storage / filesystem (source.s3|gcs|fs): support sharding and resume; prefix + pattern explicitly bound the scan domain; record etag/mtime/size to enable dedup.
- Databases (source.db): enforce change watermark (watermark.field) or change log (lsn/scn); SQL must be parameterized and bounded; export a table-schema snapshot.
- Messaging/stream (source.kafka|pulsar): consumer group and offset management; parallelism by partition; provide replay policy and a max-lag threshold.
- HTTP/custom (source.http|custom): throttling & rate limits; idempotent signatures or dedupe keys; error classification and fallback.
- Format & contract: format is tightly bound to schema_ref; for semi-structured/binary inputs, decoding and validation must be completed downstream in validate.
V. Idempotency, Retry & Checkpointing
- Idempotency keys: prefer source primary key/offset/watermark (dedupe_key); use idempotent insert or UPSERT on the sink to avoid duplicates.
- Retry: exponential backoff (backoff:"expo") + jitter; error classes (retryable/non-retryable/escalate); both per-attempt and overall time limits (timeout_s) must be explicit.
- Checkpoint: checkpoint.mode:"exactly-once|at-least-once" with atomic commit semantics (commit includes: offset, cursor, and sink completion marker).
VI. Dedup & Ordering Guarantees
- Dedup: batch—by dedupe_key + watermark; stream—monotonic offsets per partition with idempotent aggregation.
- Ordering: when strict order is required, declare partition key, sort field, and lateness window; otherwise state “in-partition ordered, cross-partition unordered”.
VII. Metrology & Units (SI)
- Throughput: QPS = ( count / Δt ) (1/s).
- Latency: T_inf in ms, report {p50,p95,p99}.
- Utilization: ρ = ( λ / μ ).
- Bandwidth/storage: net_mbps, size_bytes.
- metrology:{units:"SI", check_dim:true} is mandatory; normalize units first before any composition.
VIII. Security, Credentials & Compliance
- Credentials: refer only via credentials_ref; plaintext secrets are forbidden; support rotation and least-privilege (read-only where possible).
- Privacy: when PII/sensitive fields exist, enforce minimization and de-identification; regional limits (e.g., EU-GDPR) appear in export_manifest.references[].
- Audit: log all connect/pull/failure events to a security audit stream; severe failures route to on_fail:"quarantine" and trigger alerts.
IX. Machine-Readable Fragment (Drop-in)
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params:
endpoint: "https://s3.amazonaws.com"
bucket_or_db: "eift-data"
prefix_or_table: "raw/2025/09/"
query_or_pattern: "*.jsonl"
credentials_ref: "secrets://aws/ingest_ro"
format: "json"
watermark: {field:"updated_at", start:"2025-09-01T00:00:00Z", step:"PT5M"}
checkpoint: {path:"s3://eift-meta/chk/src.s3.pull", mode:"at-least-once"}
dedupe_key: ["id","updated_at"]
outputs: ["raw_blob"]
idempotent: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 1800
on_fail: "quarantine"
schema_ref: "contracts/raw_json@v1.2"
X. Lint Rules (Excerpt, Normative)
lint_rules:
- id: SRC.TYPE_ALLOWED
when: "$.layers[*].stages[*].type"
assert: "value in ['source.s3','source.gcs','source.fs','source.db','source.kafka','source.http','source.custom']"
level: error
- id: SRC.CREDENTIALS_REF
when: "$.layers[*].stages[?(@.type^='source.')].params"
assert: "has_key('credentials_ref') and not has_key('plain_secret')"
level: error
- id: SRC.CHECKPOINT_DEFINED
when: "$.layers[*].stages[?(@.type^='source.')].params"
assert: "has_key('checkpoint') and has_key('watermark')"
level: error
- id: SRC.DEDUPE_OR_EXACTLY_ONCE
when: "$.layers[*].stages[?(@.type^='source.')]"
assert: "has_key('params.dedupe_key') or $.params.checkpoint.mode == 'exactly-once'"
level: error
- id: METROLOGY.SI_AND_CHECKDIM
when: "$.metrology"
assert: "units=='SI' and check_dim==true"
level: error
XI. Export Manifest & Audit Trail
export_manifest:
version: "v1.0"
artifacts:
- {path:"ingest/pulled.manifest.json", sha256:"..."}
- {path:"ingest/checkpoint.meta.json", sha256:"..."}
- {path:"security/audit.log", sha256:"..."}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.DatasetCards v1.0:Ch.6"
XII. Chapter Compliance Checklist
- Connector type/impl/params complete; credentials_ref valid; no plaintext secrets present.
- watermark and checkpoint configured; dedupe_key or exactly-once guarantees non-duplication.
- Retry/timeout/idempotency policies explicit; failure levels and on_fail behavior traceable.
- metrology.units="SI" and check_dim=true; units consistent for QPS/T_inf/ρ/net_mbps.
- schema_ref aligns with upstream contracts; audit artifacts and citation anchors are listed in export_manifest with sha256.
Copyright & License (CC BY 4.0)
Copyright: Unless otherwise noted, the copyright of “Energy Filament Theory” (text, charts, illustrations, symbols, and formulas) belongs to the author “Guanglin Tu”.
License: This work is licensed under the Creative Commons Attribution 4.0 International (CC BY 4.0). You may copy, redistribute, excerpt, adapt, and share for commercial or non‑commercial purposes with proper attribution.
Suggested attribution: Author: “Guanglin Tu”; Work: “Energy Filament Theory”; Source: energyfilament.org; License: CC BY 4.0.
First published: 2025-11-11|Current version:v5.1
License link:https://creativecommons.org/licenses/by/4.0/