Block a user
eddos (0.1.0)
Published 2026-05-31 14:24:56 -05:00 by joe
Installation
[registry]
default = "gitea"
[registries.gitea]
index = "sparse+ " # Sparse index
# index = " " # Git
[net]
git-fetch-with-cli = truecargo add eddos@0.1.0About this package
Event-Driven Development Observability System - a generic streaming pipeline for aggregating events from multiple sources
eddos
Event-Driven Development Observability System — a generic Tokio streaming pipeline for aggregating events from multiple sources into batched insights.
What it does
eddos provides a four-stage async pipeline: ingestion via bounded MPSC channel, parallel
enrichment with semaphore-controlled concurrency, timeout-based batch aggregation, and broadcast
fan-out to multiple subscribers. The pipeline is generic over an Event trait, so it works with
any domain type. Built-in sources (behind feature flags) watch git refs, Claude session files,
and poll a GitLab Knowledge Graph HTTP endpoint.
Throughput target: 50,000+ events/sec with p99 latency under 100ms.
Public API
| Type | Description |
|---|---|
Event |
Trait all pipeline events must implement (correlation_id, timestamp, event_type) |
DataPipeline<E> |
Pipeline entry point — construct with PipelineConfig, call ingestion_handle() / subscribe() |
IngestionHandle<E> |
Cloneable sender — call .ingest(event).await from any task |
AggregatedInsight<E> |
Broadcast output — contains the event batch and BatchMetrics |
PipelineConfig |
Buffer sizes, concurrency, batch size/timeout, retry config |
Source<E> |
Trait for pluggable event producers; implement run(handle) |
PipelineMetrics / MetricsSnapshot |
Counters for ingestion, enrichment success/failure, batches |
Feature-gated sources
| Feature | Types |
|---|---|
git |
GitWatcher, GitChange, GitChangeKind |
claude |
ClaudeWatcher, SessionChange, SessionChangeKind |
gkg |
GkgPoller, GkgUpdate |
all-sources |
All of the above |
Example
use eddos::{DataPipeline, PipelineConfig, Event};
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct MyEvent { id: String, kind: String, ts: DateTime<Utc> }
impl Event for MyEvent {
fn correlation_id(&self) -> Option<&str> { Some(&self.id) }
fn timestamp(&self) -> DateTime<Utc> { self.ts }
fn event_type(&self) -> &str { &self.kind }
}
#[tokio::main]
async fn main() {
let pipeline: DataPipeline<MyEvent> = DataPipeline::new(PipelineConfig::default());
let handle = pipeline.ingestion_handle();
let mut rx = pipeline.subscribe();
handle.ingest(MyEvent {
id: "evt-1".into(),
kind: "user_action".into(),
ts: Utc::now(),
}).await.unwrap();
if let Ok(insight) = rx.recv().await {
println!("batch: {} events", insight.metrics.event_count);
}
}
Configuration defaults
| Field | Default |
|---|---|
ingestion_buffer_size |
10,000 |
enrichment_concurrency |
100 |
batch_size |
100 |
batch_timeout |
500ms |
broadcast_buffer_size |
1,000 |
Dependencies
| ID | Version |
|---|---|
| chrono | ^0.4 |
| gix | ^0.78 |
| notify | ^6.1 |
| reqwest | ^0.12 |
| serde | ^1.0 |
| serde_json | ^1.0 |
| thiserror | ^1.0 |
| tokio | ^1.38 |
| tokio-stream | ^0.1 |
| tracing | ^0.1 |
| tempfile | ^3 |
| tokio | ^1.38 |
| wiremock | ^0.6 |