Skip to content

Workers

Target Architecture — Final-State Design

This page describes the final-state background-processing fleet of the Knowledge Platform. Workers are MassTransit consumers (and a few scheduled jobs) that perform the asynchronous, idempotent, retry-safe work behind ingestion, indexing, and lifecycle management.

The Knowledge Platform runs 20 workers. Most are event-triggered MassTransit consumers reacting to envelope events; a few are scheduled (Quartz-style) jobs for lifecycle and retention. Every worker derives an idempotency key per the metadata schema:

idempotencyKey = sha256( eventId + ":" + workerName + ":" + targetAggregateId )

The key + result is persisted so duplicate deliveries and replays return the original outcome rather than re-executing side effects. Failed messages retry with exponential backoff and ultimately dead-letter with the full envelope preserved for replay.

Worker Catalog

Worker Trigger Purpose Input Output Retry Idempotency
KnowledgeIngestionWorker ArtifactCreated, ArtifactIngested Orchestrate the ingestion pipeline (dedup → classify → chunk → embed → index → project) Artifact metadata + storageRef MemoryRecord + pipeline events 5× exp. backoff → DLQ eventId+worker+memoryRecordId
ArtifactChunkingWorker MemoryRecordCreated Split artifact bodies into VectorChunks with overlap MemoryRecord, body VectorChunk[] 5× exp. backoff eventId+worker+memoryRecordId
EmbeddingWorker ChunksReady Embed chunks via ConnectSoft.Extensions.AI.* into Qdrant VectorChunk[] VectorDocument, EmbeddingCompleted 3× backoff → EmbeddingRetryWorker eventId+worker+embeddingJobId
EmbeddingRetryWorker EmbeddingFailed Retry transient embedding failures (rate limits, timeouts) failed EmbeddingJob EmbeddingCompleted / DLQ 10× capped backoff embeddingJobId+attempt
EmbeddingRefreshWorker Scheduled + EmbeddingModelUpgraded Re-embed stale/old-model vectors to current embedding model VectorDocument cohort refreshed VectorDocument, EmbeddingRefreshed resume from cursor vectorDocumentId+modelVersion
CodebaseIndexingWorker RepositoryIndexed request / CodeIndexJob Drive repository indexing across symbols, deps, embeddings CodeRepository, commit CodeIndexJob progress resumable by commit codeIndexJobId+stage
CodeSymbolExtractionWorker CodeIndexJob (symbols stage) Parse code into CodeSymbols (types, methods, contracts) repository tree CodeSymbol[], CodeSymbolExtracted 3× backoff codeIndexJobId+filePath
CodeEmbeddingWorker CodeSymbolExtracted Embed code symbols for code-aware semantic search CodeSymbol[] VectorDocument (code) 3× backoff → retry codeSymbolId+modelVersion
CodeDependencyGraphWorker CodeSymbolExtracted Build the code dependency graph as graph edges CodeSymbol[] refs KnowledgeEdge[] (DEPENDS_ON) idempotent upsert codeIndexJobId+edgeKey
DocumentationIndexingWorker DocumentationIndexed request Drive documentation ingestion and embedding doc source ref MemoryRecord, VectorDocument 5× backoff eventId+worker+sourceRef
MarkdownChunkingWorker doc ingest Heading-aware chunking of Markdown/MkDocs content Markdown body VectorChunk[] 3× backoff sourceRef+chunkRange
GraphProjectionWorker MemoryRecordCreated, *Recorded, *Created Project entities/relationships into KnowledgeNode/KnowledgeEdge domain events GraphProjection, GraphProjectionUpdated idempotent upsert eventId+worker+nodeKey
RuntimeSignalWorker RuntimeSignalReceived (Observability) Persist runtime signals/incidents/feedback and link to artifacts runtime signal RuntimeSignal, RuntimeSignalRecorded 5× backoff eventId+worker+runtimeSignalId
QualityAssessmentWorker MemoryRecordCreated, MemoryClassified Score records against QualityRules MemoryRecord KnowledgeQualityAssessment, KnowledgeQualityAssessed 3× backoff memoryRecordId+ruleSetVersion
ClassificationWorker MemoryRecordCreated Classify sensitivity and assign MemoryClassification MemoryRecord, content MemoryClassification, MemoryClassified 3× backoff memoryRecordId+classifierVersion
RedactionWorker MemoryClassified (Confidential/Secret) Produce redacted projections per audience classified record redacted blob ref, MemoryRedacted 3× backoff memoryRecordId+audience
RetentionWorker Scheduled (daily) Apply retention policy; expire/delete past-retention memory retention policy deletions, MemoryRetentionApplied resumable cursor policyId+runDate+batch
StaleMemoryWorker Scheduled (daily) Flag stale/superseded memory (drift, old versions) MemoryRecord cohort MemoryMarkedStale flags resumable cursor memoryRecordId+runDate
ConflictDetectionWorker DecisionRecorded, KnowledgePatternDiscovered Detect contradictory decisions/patterns and raise conflicts new vs. existing records KnowledgeConflictDetected 3× backoff conflictPairKey
KnowledgeReplayWorker KnowledgeReplayRequested Replay a knowledge/signal timeline to rebuild projections replay window rebuilt projections, KnowledgeReplayCompleted resumable by offset replayId+offset

Event Flow

flowchart TB
    Created["ArtifactCreated / ArtifactIngested"] --> Ingest["KnowledgeIngestionWorker"]
    Ingest -->|"MemoryRecordCreated"| Classify["ClassificationWorker"]
    Ingest -->|"MemoryRecordCreated"| Chunk["ArtifactChunkingWorker"]
    Ingest -->|"MemoryRecordCreated"| Graph["GraphProjectionWorker"]
    Ingest -->|"MemoryRecordCreated"| Quality["QualityAssessmentWorker"]

    Chunk -->|"ChunksReady"| Embed["EmbeddingWorker"]
    Embed -->|"EmbeddingFailed"| Retry["EmbeddingRetryWorker"]
    Embed -->|"EmbeddingCompleted"| VStore[("Qdrant")]

    Classify -->|"Confidential/Secret"| Redact["RedactionWorker"]
    Classify --> Quality

    RepoReq["RepositoryIndexed request"] --> CodeIdx["CodebaseIndexingWorker"]
    CodeIdx --> Symbols["CodeSymbolExtractionWorker"]
    Symbols --> CodeEmbed["CodeEmbeddingWorker"]
    Symbols --> Deps["CodeDependencyGraphWorker"]
    Deps --> Graph

    DocReq["DocumentationIndexed request"] --> DocIdx["DocumentationIndexingWorker"]
    DocIdx --> MdChunk["MarkdownChunkingWorker"]
    MdChunk --> Embed

    Signal["RuntimeSignalReceived"] --> RtWorker["RuntimeSignalWorker"]
    RtWorker --> Graph
    RtWorker --> Conflict["ConflictDetectionWorker"]

    Sched["Scheduler"] --> Refresh["EmbeddingRefreshWorker"]
    Sched --> Retention["RetentionWorker"]
    Sched --> Stale["StaleMemoryWorker"]
    Replay["KnowledgeReplayRequested"] --> ReplayW["KnowledgeReplayWorker"]
    ReplayW --> Graph
Hold "Alt" / "Option" to enable pan & zoom

Worker Design Rules

  • Idempotent by construction — all writes are upserts keyed by the idempotency key; replays are safe.
  • Backpressure aware — embedding and indexing workers respect model/store rate limits and shed load to retry queues rather than failing hard.
  • Tenant-fair scheduling — long-running cohorts (refresh, retention, replay) are partitioned by tenantId so no tenant starves another.
  • Fully traced — each worker propagates traceId/correlationId from the triggering envelope into OTEL spans, and emits a completion event so progress is observable in the Observability & Feedback Platform.
  • Poison isolation — unprocessable messages dead-letter with the full envelope; KnowledgeReplayWorker can re-drive them after a fix.