Joseph M OBrien joe
  • Joined on 2026-03-15

eddos (0.1.0)

Published 2026-05-31 14:24:56 -05:00 by joe

Installation

[registry]
default = "gitea"

[registries.gitea]
index = "sparse+" # Sparse index
# index = "" # Git

[net]
git-fetch-with-cli = true
cargo add eddos@0.1.0

About this package

Event-Driven Development Observability System - a generic streaming pipeline for aggregating events from multiple sources

eddos

Event-Driven Development Observability System — a generic Tokio streaming pipeline for aggregating events from multiple sources into batched insights.

What it does

eddos provides a four-stage async pipeline: ingestion via bounded MPSC channel, parallel enrichment with semaphore-controlled concurrency, timeout-based batch aggregation, and broadcast fan-out to multiple subscribers. The pipeline is generic over an Event trait, so it works with any domain type. Built-in sources (behind feature flags) watch git refs, Claude session files, and poll a GitLab Knowledge Graph HTTP endpoint.

Throughput target: 50,000+ events/sec with p99 latency under 100ms.

Public API

Type Description
Event Trait all pipeline events must implement (correlation_id, timestamp, event_type)
DataPipeline<E> Pipeline entry point — construct with PipelineConfig, call ingestion_handle() / subscribe()
IngestionHandle<E> Cloneable sender — call .ingest(event).await from any task
AggregatedInsight<E> Broadcast output — contains the event batch and BatchMetrics
PipelineConfig Buffer sizes, concurrency, batch size/timeout, retry config
Source<E> Trait for pluggable event producers; implement run(handle)
PipelineMetrics / MetricsSnapshot Counters for ingestion, enrichment success/failure, batches

Feature-gated sources

Feature Types
git GitWatcher, GitChange, GitChangeKind
claude ClaudeWatcher, SessionChange, SessionChangeKind
gkg GkgPoller, GkgUpdate
all-sources All of the above

Example

use eddos::{DataPipeline, PipelineConfig, Event};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct MyEvent { id: String, kind: String, ts: DateTime<Utc> }

impl Event for MyEvent {
    fn correlation_id(&self) -> Option<&str> { Some(&self.id) }
    fn timestamp(&self) -> DateTime<Utc> { self.ts }
    fn event_type(&self) -> &str { &self.kind }
}

#[tokio::main]
async fn main() {
    let pipeline: DataPipeline<MyEvent> = DataPipeline::new(PipelineConfig::default());
    let handle = pipeline.ingestion_handle();
    let mut rx = pipeline.subscribe();

    handle.ingest(MyEvent {
        id: "evt-1".into(),
        kind: "user_action".into(),
        ts: Utc::now(),
    }).await.unwrap();

    if let Ok(insight) = rx.recv().await {
        println!("batch: {} events", insight.metrics.event_count);
    }
}

Configuration defaults

Field Default
ingestion_buffer_size 10,000
enrichment_concurrency 100
batch_size 100
batch_timeout 500ms
broadcast_buffer_size 1,000

Dependencies

ID Version
chrono ^0.4
gix ^0.78
notify ^6.1
reqwest ^0.12
serde ^1.0
serde_json ^1.0
thiserror ^1.0
tokio ^1.38
tokio-stream ^0.1
tracing ^0.1
tempfile ^3
tokio ^1.38
wiremock ^0.6
Details
Cargo
2026-05-31 14:24:56 -05:00
2
MIT
36 KiB
Assets (1)
Versions (1) View all
0.1.0 2026-05-31