Event Topics
Event System Overview
Section titled “Event System Overview”Annihilation uses a Registry-based PubSub system implemented in Annihilation.Event. The registry is started as the first child in the supervision tree under the name Annihilation.Event.Registry with :duplicate keys, allowing multiple processes to subscribe to the same topic.
Annihilation.Event.subscribe(topic) # Subscribe calling processAnnihilation.Event.unsubscribe(topic) # Unsubscribe calling processAnnihilation.Event.broadcast(topic, event) # Broadcast to all subscribersAnnihilation.Event.subscribers(topic) # List subscriber PIDsMessage Format
Section titled “Message Format”Events arrive as standard Erlang messages:
{:event, topic, payload}Subscribing in a GenServer
Section titled “Subscribing in a GenServer”defmodule MySubscriber do use GenServer
def init(state) do Annihilation.Event.subscribe("agent:*") Annihilation.Event.subscribe("burst:events") {:ok, state} end
def handle_info({:event, "burst:events", {:wave_complete, stats}}, state) do Logger.info("Wave complete: #{inspect(stats)}") {:noreply, state} endendTopic Reference
Section titled “Topic Reference”Agent Topics
Section titled “Agent Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"agent:#{id}" | {:phase_change, old_phase, new_phase, state} | Agent.Server | Phase machine transition |
"agent:#{id}" | {:delta, %Delta{}} | Agent.Server | Streaming text/tool delta from LLM |
"agent:#{id}" | {:tool_results, [%ToolResult{}]} | Agent.Server | Batch of tool execution results |
"agent:#{id}" | {:tool_update, call_id, partial} | Agent.Server | Streaming tool output (progressive) |
"agent:#{id}" | :done | Agent.Server | Agent completed its task |
"agent:#{id}" | {:error, reason} | Agent.Server | Agent entered error state |
Agent Messaging Topics
Section titled “Agent Messaging Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"agent:mail:#{id}" | %{type: :new_message, from:, subject:, message_id:, priority:} | Agent.Mailbox | New direct message for agent |
"agent:mail:broadcast" | %{type: :broadcast_message, from:, subject:, message_id:} | Agent.Mailbox | Broadcast message to all agents |
Tether Topics
Section titled “Tether Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"tether:reaching" | {:question_asked, %Question{}} | QuestionQueue | Psychonaut reaching for the tether |
"tether:reaching" | {:question_answered, %Question{}} | QuestionQueue | Tether answered a question |
"tether:reaching" | {:question_timed_out, %Question{}} | QuestionQueue | Question timed out (2 min default) |
"tether:reaching" | {:late_answer, %Question{}, answer_text} | QuestionQueue | Tether answered after timeout |
Drift Topics
Section titled “Drift Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"tether:drifts" | {:drift_created, %AssumptionMade{}} | AssumptionsLedger | New drift (assumption) recorded |
"tether:drifts" | {:drift_confirmed, %{drift_id:, agent_id:, text:, note:}} | DriftReview | Tether confirmed assumption was correct |
"tether:drifts" | {:drift_rejected, %{drift_id:, agent_id:, text:, reason:, correction_bead_id:}} | DriftReview | Tether rejected assumption, correction bead created |
"tether:drifts" | {:drift_noted, %{drift_id:, note:}} | DriftReview | Tether added note to drift |
"tether:drifts" | {:drift_superseded, %{drift_id:, question_id:, answer:, correction_bead_id:}} | LateBeaconHandler | Late answer superseded the drift |
"tether:drifts" | {:pipeline_drift, %{entry:, assumption:, mutation_type:}} | Pipeline.GroundingQueue | Pipeline mutation timed out and drifted |
Pipeline Grounding Topics
Section titled “Pipeline Grounding Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"tether:questions" | {:pipeline_mutation_submitted, entry} | Pipeline.GroundingQueue | New pipeline mutation awaiting approval |
"tether:questions" | {:pipeline_mutation_responded, entry} | Pipeline.GroundingQueue | Tether responded to pipeline mutation |
Burst Topics
Section titled “Burst Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"burst:events" | {:burst_start, %{burst_id:, wave_count:, started_at:}} | Burst.Manager | New burst began |
"burst:events" | {:burst_complete, %{burst_id:, succeeded:, failed:}} | Burst.Manager | Burst finished (draining done) |
"burst:events" | {:wave_complete, %{wave_count:, burst_count:, total_beads:, succeeded:, failed:, duration_ms:}} | Burst.Manager | Wave finished (no more ready beads) |
Reflection Topics
Section titled “Reflection Topics”| Topic | Payload | Publisher | Description |
|---|---|---|---|
"reflection:started" | %{burst_id:} | Reflection.Pipeline | Reflection pipeline began |
"reflection:extracting" | %{burst_id:} | Reflection.Pipeline | Extracting diary entries |
"reflection:proposing" | %{burst_id:} | Reflection.Pipeline | Proposing playbook deltas |
"reflection:evaluating" | %{burst_id:} | Reflection.Pipeline | Evaluating deltas through evidence gate |
"reflection:curating" | %{burst_id:} | Reflection.Pipeline | Curating accepted deltas |
"reflection:completed" | %{burst_id:, summary:} | Reflection.Pipeline | Reflection pipeline completed |
"reflection:cached" | %{burst_id:} | Reflection.Pipeline | Returning cached reflection result |
"reflection:error" | %{stage:, error:} | Reflection.Pipeline | Error in a reflection stage |
Implementation Details
Section titled “Implementation Details”The event module is defined in lib/annihilation/event.ex:
defmodule Annihilation.Event do @registry Annihilation.Event.Registry
def subscribe(topic) do Registry.register(@registry, topic, []) end
def unsubscribe(topic) do Registry.unregister(@registry, topic) end
def broadcast(topic, event) do Registry.dispatch(@registry, topic, fn entries -> for {pid, _} <- entries do send(pid, {:event, topic, event}) end end) end
def subscribers(topic) do Registry.lookup(@registry, topic) |> Enum.map(fn {pid, _} -> pid end) endendKey design decisions:
- Uses OTP
Registrywith:duplicatekeys (not:unique), allowing fan-out to multiple subscribers per topic. - No GenServer wrapper — the module is a stateless function interface to the Registry.
- The registry is started directly in the supervision tree as
{Registry, keys: :duplicate, name: Annihilation.Event.Registry}. - Events are fire-and-forget (
send/2). There is no acknowledgment or back-pressure mechanism.