Workers¶
Target Architecture — Final-State Design
This page describes the final-state background-processing fleet of the Knowledge Platform. Workers are MassTransit consumers (and a few scheduled jobs) that perform the asynchronous, idempotent, retry-safe work behind ingestion, indexing, and lifecycle management.
The Knowledge Platform runs 20 workers. Most are event-triggered MassTransit consumers reacting to envelope events; a few are scheduled (Quartz-style) jobs for lifecycle and retention. Every worker derives an idempotency key per the metadata schema:
The key + result is persisted so duplicate deliveries and replays return the original outcome rather than re-executing side effects. Failed messages retry with exponential backoff and ultimately dead-letter with the full envelope preserved for replay.
Worker Catalog¶
| Worker | Trigger | Purpose | Input | Output | Retry | Idempotency |
|---|---|---|---|---|---|---|
KnowledgeIngestionWorker |
ArtifactCreated, ArtifactIngested |
Orchestrate the ingestion pipeline (dedup → classify → chunk → embed → index → project) | Artifact metadata + storageRef |
MemoryRecord + pipeline events |
5× exp. backoff → DLQ | eventId+worker+memoryRecordId |
ArtifactChunkingWorker |
MemoryRecordCreated |
Split artifact bodies into VectorChunks with overlap |
MemoryRecord, body |
VectorChunk[] |
5× exp. backoff | eventId+worker+memoryRecordId |
EmbeddingWorker |
ChunksReady |
Embed chunks via ConnectSoft.Extensions.AI.* into Qdrant |
VectorChunk[] |
VectorDocument, EmbeddingCompleted |
3× backoff → EmbeddingRetryWorker |
eventId+worker+embeddingJobId |
EmbeddingRetryWorker |
EmbeddingFailed |
Retry transient embedding failures (rate limits, timeouts) | failed EmbeddingJob |
EmbeddingCompleted / DLQ |
10× capped backoff | embeddingJobId+attempt |
EmbeddingRefreshWorker |
Scheduled + EmbeddingModelUpgraded |
Re-embed stale/old-model vectors to current embedding model | VectorDocument cohort |
refreshed VectorDocument, EmbeddingRefreshed |
resume from cursor | vectorDocumentId+modelVersion |
CodebaseIndexingWorker |
RepositoryIndexed request / CodeIndexJob |
Drive repository indexing across symbols, deps, embeddings | CodeRepository, commit |
CodeIndexJob progress |
resumable by commit | codeIndexJobId+stage |
CodeSymbolExtractionWorker |
CodeIndexJob (symbols stage) |
Parse code into CodeSymbols (types, methods, contracts) |
repository tree | CodeSymbol[], CodeSymbolExtracted |
3× backoff | codeIndexJobId+filePath |
CodeEmbeddingWorker |
CodeSymbolExtracted |
Embed code symbols for code-aware semantic search | CodeSymbol[] |
VectorDocument (code) |
3× backoff → retry | codeSymbolId+modelVersion |
CodeDependencyGraphWorker |
CodeSymbolExtracted |
Build the code dependency graph as graph edges | CodeSymbol[] refs |
KnowledgeEdge[] (DEPENDS_ON) |
idempotent upsert | codeIndexJobId+edgeKey |
DocumentationIndexingWorker |
DocumentationIndexed request |
Drive documentation ingestion and embedding | doc source ref | MemoryRecord, VectorDocument |
5× backoff | eventId+worker+sourceRef |
MarkdownChunkingWorker |
doc ingest | Heading-aware chunking of Markdown/MkDocs content | Markdown body | VectorChunk[] |
3× backoff | sourceRef+chunkRange |
GraphProjectionWorker |
MemoryRecordCreated, *Recorded, *Created |
Project entities/relationships into KnowledgeNode/KnowledgeEdge |
domain events | GraphProjection, GraphProjectionUpdated |
idempotent upsert | eventId+worker+nodeKey |
RuntimeSignalWorker |
RuntimeSignalReceived (Observability) |
Persist runtime signals/incidents/feedback and link to artifacts | runtime signal | RuntimeSignal, RuntimeSignalRecorded |
5× backoff | eventId+worker+runtimeSignalId |
QualityAssessmentWorker |
MemoryRecordCreated, MemoryClassified |
Score records against QualityRules |
MemoryRecord |
KnowledgeQualityAssessment, KnowledgeQualityAssessed |
3× backoff | memoryRecordId+ruleSetVersion |
ClassificationWorker |
MemoryRecordCreated |
Classify sensitivity and assign MemoryClassification |
MemoryRecord, content |
MemoryClassification, MemoryClassified |
3× backoff | memoryRecordId+classifierVersion |
RedactionWorker |
MemoryClassified (Confidential/Secret) |
Produce redacted projections per audience | classified record | redacted blob ref, MemoryRedacted |
3× backoff | memoryRecordId+audience |
RetentionWorker |
Scheduled (daily) | Apply retention policy; expire/delete past-retention memory | retention policy | deletions, MemoryRetentionApplied |
resumable cursor | policyId+runDate+batch |
StaleMemoryWorker |
Scheduled (daily) | Flag stale/superseded memory (drift, old versions) | MemoryRecord cohort |
MemoryMarkedStale flags |
resumable cursor | memoryRecordId+runDate |
ConflictDetectionWorker |
DecisionRecorded, KnowledgePatternDiscovered |
Detect contradictory decisions/patterns and raise conflicts | new vs. existing records | KnowledgeConflictDetected |
3× backoff | conflictPairKey |
KnowledgeReplayWorker |
KnowledgeReplayRequested |
Replay a knowledge/signal timeline to rebuild projections | replay window | rebuilt projections, KnowledgeReplayCompleted |
resumable by offset | replayId+offset |
Event Flow¶
flowchart TB
Created["ArtifactCreated / ArtifactIngested"] --> Ingest["KnowledgeIngestionWorker"]
Ingest -->|"MemoryRecordCreated"| Classify["ClassificationWorker"]
Ingest -->|"MemoryRecordCreated"| Chunk["ArtifactChunkingWorker"]
Ingest -->|"MemoryRecordCreated"| Graph["GraphProjectionWorker"]
Ingest -->|"MemoryRecordCreated"| Quality["QualityAssessmentWorker"]
Chunk -->|"ChunksReady"| Embed["EmbeddingWorker"]
Embed -->|"EmbeddingFailed"| Retry["EmbeddingRetryWorker"]
Embed -->|"EmbeddingCompleted"| VStore[("Qdrant")]
Classify -->|"Confidential/Secret"| Redact["RedactionWorker"]
Classify --> Quality
RepoReq["RepositoryIndexed request"] --> CodeIdx["CodebaseIndexingWorker"]
CodeIdx --> Symbols["CodeSymbolExtractionWorker"]
Symbols --> CodeEmbed["CodeEmbeddingWorker"]
Symbols --> Deps["CodeDependencyGraphWorker"]
Deps --> Graph
DocReq["DocumentationIndexed request"] --> DocIdx["DocumentationIndexingWorker"]
DocIdx --> MdChunk["MarkdownChunkingWorker"]
MdChunk --> Embed
Signal["RuntimeSignalReceived"] --> RtWorker["RuntimeSignalWorker"]
RtWorker --> Graph
RtWorker --> Conflict["ConflictDetectionWorker"]
Sched["Scheduler"] --> Refresh["EmbeddingRefreshWorker"]
Sched --> Retention["RetentionWorker"]
Sched --> Stale["StaleMemoryWorker"]
Replay["KnowledgeReplayRequested"] --> ReplayW["KnowledgeReplayWorker"]
ReplayW --> Graph
Worker Design Rules¶
- Idempotent by construction — all writes are upserts keyed by the idempotency key; replays are safe.
- Backpressure aware — embedding and indexing workers respect model/store rate limits and shed load to retry queues rather than failing hard.
- Tenant-fair scheduling — long-running cohorts (refresh, retention, replay) are partitioned by
tenantIdso no tenant starves another. - Fully traced — each worker propagates
traceId/correlationIdfrom the triggering envelope into OTEL spans, and emits a completion event so progress is observable in the Observability & Feedback Platform. - Poison isolation — unprocessable messages dead-letter with the full envelope;
KnowledgeReplayWorkercan re-drive them after a fix.