Home / Docs-Technical WhitePaper / 16-EFT.WP.Methods.Cleaning v1.0
Chapter 11 Streaming Cleansing and the Backpressure Control Loop
One-Sentence Goal
Implement controllable-throughput online cleansing on an execution graph G=(V,E). Use backpressure and watermarks to form a stable closed loop, and maintain continuous publication and replay only after quality assertions are satisfied.
I. Scope & Objects
- Objects & I/O
- Input: event stream S_in whose payload fields follow the Chapter 3 standard schema SRef; internal time on tau_mono, published time on ts.
- Execution graph: G=(V,E), where V are I10-* cleansing operator nodes and E are bounded channels chan.
- Output: continuous product stream S_out plus a rolling manifest.stream (including wm, rho, q_len, P99, etc.).
- Constraints & boundaries
- Resources & capacity: each channel declares cap and instantaneous queue length q_len.
- Semantics: at-least-once delivery; downstream sinks implement idempotent(pk) for idempotence.
- Quality: all assertions from Chapters 4–10 must hold continuously within sliding windows.
II. Terms & Variables
- Throughput & service
- Arrival rate: lambda_in; service rate: mu_proc; utilization: rho = lambda_in / mu_proc.
- Queue length: q_len; capacity: cap; waiting: W_q; service time: S_service.
- End-to-end latency: W_e2e = W_q + S_service + W_retry.
- Backpressure & shaping
- Credit window: credit = max( 0 , cap - q_len ); token bucket: r_token, B_burst.
- Watermark: wm (event-time watermark, units/fields per Chapter 5); lateness bound: lateness_max.
- Reliability & idempotence
Retry rate: r_retry; drop rate: p_drop; duplicate marker: dup_tag; traceability: TraceID. - Quality & SLO
Indicators: P99, drift, q_score, TS.sli.ingest_lag, TS.sli.queue_depth, TS.sli.throughput.
III. Axioms (P111-*)
- P111-01 Stability
In steady state each chan must satisfy rho < 1, with target rho ≤ rho_target < 1. - P111-02 Idempotence first
The sink must provide idempotent(pk); commit upstream offsets only after sink_ack ∧ dedup_ok. - P111-03 Time base & watermarks
Windowed computations run on tau_mono; publishing uses ts. wm is non-decreasing; late events may revise windows only within lateness_max. - P111-04 Graceful load shedding under backpressure
As q_len → cap, prefer throttling and deferral over unbounded queuing; if necessary, apply bounded drops via shed(policy). - P111-05 Two-form consistency (streaming)
Any online computation involving T_arr must output both forms and delta_form, and assert tol_Tarr within the window. - P111-06 Replayability
Persisted events must be replayable without breaking idempotent(pk) or the history of assertions.
IV. Minimal Equations (S111-*)
- S111-01 Stability condition
rho = lambda_in / mu_proc, require rho < 1. - S111-02 Little’s law (steady state)
L = lambda_in * W, where L is average in-system concurrency and W the average sojourn time; use for estimating W_q and capacity planning. - S111-03 Queue evolution (discrete step)
q_len(t+Delta_t) = max( 0 , min( cap , q_len(t) + lambda_in * Delta_t - mu_proc * Delta_t ) ). - S111-04 Credit-based backpressure
credit = max( 0 , cap - q_len ); upstream send window win_send = min( credit , B_burst ). - S111-05 Token-bucket shaping
tokens(t+Delta_t) = min( B_burst , tokens(t) + r_token * Delta_t ); allow when tokens ≥ cost(msg) and then tokens -= cost(msg). - S111-06 Watermark advancement
wm_out = min( wm_in , ( min( ts(batch) ) - lateness_max ) ); require non_decreasing(wm_out). - S111-07 End-to-end SLO
P99(W_e2e) ≤ tol_p99; TS.sli.ingest_lag ≤ tol_lag; p_drop ≤ tol_drop; dup_rate ≤ tol_dup. - S111-08 Arrival-time two forms (streaming window)
delta_form = | ( 1 / c_ref ) * ( ∫ n_eff d ell ) - ( ∫ ( n_eff / c_ref ) d ell ) |, and within sliding window Delta_t require delta_form ≤ tol_Tarr.
V. Cleaning Process (M10-11 Streaming Loop)
- Topology & policy assembly
Load G=(V,E) and policy = { cap, rho_target, lateness_max, r_token, B_burst }; register TS.sli.* metrics for every chan. - Ingress & decode
Validate schema (Chapter 3) and unit/dim (Chapter 4); on failure, set m=0 and quarantine to S_quarantine. - Time & watermarks
Map ts -> tau_mono, record offset/skew/J (Chapter 5); advance/broadcast wm; allow late-event window recomputation within lateness_max. - Path & arrival time
Where needed, compute both T_arr forms in parallel and produce delta_form (Chapter 6). - Online governance of missingness & anomalies
Maintain m ∈ {0,1}; record imputation via corr_env(x; RefCond) (Chapter 7); detect outliers and apply down-weighting tags (Chapter 8). - Deduplication & referential checks
Deduplicate within Delta_t using keys {pk, k_biz}; verify foreign_key (Chapter 9). - Backpressure & shaping
Compute q_len, credit, tokens; if rho > rho_target or q_len ≥ cap, trigger throttle or shed(policy). - Writes & commits
Write to downstream via idempotent(pk); commit upstream offsets only after sink_ack ∧ dedup_ok; persist manifest.stream slices. - Gating & alerts
Enforce online assertions on P99(W_e2e), dup_rate, p_drop, delta_form; on violations, activate degradation and rollback policies.
VI. Contracts & Assertions (Streaming Increments)
- Throughput & stability
rho ≤ rho_target, q_len ≤ cap, TS.sli.queue_depth_p99 ≤ tol_qp99. - Time & watermark
non_decreasing(wm), TS.sli.ingest_lag ≤ tol_lag, late_ratio ≤ tol_late. - Quality & anomalies
q_score ≥ q_min, drift ≤ tol_drift, outlier_share ≤ tol_outlier. - Arrival-time two forms
arrival_forms(delta_form, tol_Tarr) holds continuously within window Delta_t. - Idempotence & duplicates
dup_rate ≤ tol_dup, idempotency_fail_rate = 0, offset_commit_after_sink = true.
VII. Implementation Binding (I10-11, Core Prototypes)
- run_stream_graph(G, policy) -> sli_stream: orchestrate the execution graph and policies; emit TS.sli.* in real time.
- credit_backpressure(chan, cap, rho_target) -> control: compute credit and throttling signals.
- token_bucket(name, r_token, B_burst) -> gate(msg) -> pass|delay|drop: unified traffic shaping.
- emit_watermark(stream, lateness_max, ts_field) -> wm: advance the event-time watermark.
- idempotent_sink(sink, pk) -> writer: provides put(msg), ack(trace) and ensures idempotent writes.
- window_contract(stream, Delta_t, tests) -> report: execute Chapters 4–10 assertions within a sliding window.
- shed(policy, msg) -> {drop|defer|route}: bounded drops and bypass under backpressure.
Invariants
- All I10-11 operators do not modify unit(x), dim(x), gamma(ell); they compute and tag within windows only.
- Replaying the same TraceID yields identical writer outcomes (idempotence).
VIII. Cross-References
- Channels & concurrency model: EFT.WP.Core.Threads v1.0.
- Acquisition & arrival time: EFT.WP.Core.Sea v1.0.
- Schema & contracts: EFT.WP.Core.DataSpec v1.0.
- Normalization & density conventions: EFT.WP.Core.Density v1.0.
- This volume’s Chapters 4–10 for units, time base, path, missingness, anomalies, referential integrity, and release gate.
IX. Quality Metrics & Risk Control
- SLIs / indicators
- TS.sli.throughput, TS.sli.queue_depth, TS.sli.ingest_lag, P99(W_e2e), dup_rate, p_drop, delta_form_p95.
- Example targets: rho_target = 0.7, plus tol_p99, tol_lag, tol_dup, tol_drop.
- Risk actions
- If rho > rho_target → activate credit_backpressure and token_bucket.
- If P99(W_e2e) > tol_p99 → tighten B_burst, expand lateness_max, or downgrade computation fidelity.
- If dup_rate > tol_dup → reinforce idempotent(pk) indexing and windowed dedup granularity.
- If delta_form exceeds threshold → throttle related operators, switch to read-only bypass, and trigger Chapter 10 rollback channel.
Summary
This chapter establishes the streaming-cleansing stability condition rho < 1, the watermark advancement rule, and a dual credit/token mechanism for a backpressure loop. It reuses the two-form arrival-time checks and contract assertions online, forming a continuous “throttle → validate → publish → replay” operating mode. Artifacts include S_out and the rolling manifest.stream, keyed by TraceID, wm, rho, q_len, P99, dup_rate, delta_form.
Copyright & License (CC BY 4.0)
Copyright: Unless otherwise noted, the copyright of “Energy Filament Theory” (text, charts, illustrations, symbols, and formulas) belongs to the author “Guanglin Tu”.
License: This work is licensed under the Creative Commons Attribution 4.0 International (CC BY 4.0). You may copy, redistribute, excerpt, adapt, and share for commercial or non‑commercial purposes with proper attribution.
Suggested attribution: Author: “Guanglin Tu”; Work: “Energy Filament Theory”; Source: energyfilament.org; License: CC BY 4.0.
First published: 2025-11-11|Current version:v5.1
License link:https://creativecommons.org/licenses/by/4.0/