> ## Documentation Index
> Fetch the complete documentation index at: https://daily-docs-pr-4815.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Observer Pattern

> Understanding and implementing observers in Pipecat

The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline's core functionality.

## Base Observer

All observers must inherit from `BaseObserver` and can implement these methods:

* `on_push_frame(data: FramePushed)`: Called when a frame is pushed from one processor to another
* `on_process_frame(data: FrameProcessed)`: Called when a frame is being processed by a processor
* `on_pipeline_started()`: Called after the `StartFrame` has been processed by all processors in the pipeline

```python theme={null}
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed

class CustomObserver(BaseObserver):
    async def on_push_frame(self, data: FramePushed):
        # Your frame observation logic here
        pass

    async def on_process_frame(self, data: FrameProcessed):
        # Your frame processing observation logic here
        pass

    async def on_pipeline_started(self):
        # Called when the pipeline has fully started
        pass
```

## Lifecycle and Cleanup

The pipeline calls `setup()` on each observer when it starts and `cleanup()` when it stops. If your observer spawns background work, use `self.create_task()` so the task is tracked by the pipeline's task manager, and override `cleanup()` to cancel it. Always call `super().cleanup()`, which waits for any in-flight event handlers to finish.

```python theme={null}
class CustomObserver(BaseObserver):
    def __init__(self):
        super().__init__()
        self._task = None

    async def on_pipeline_started(self):
        # Use create_task (not asyncio.create_task) so the task is tracked.
        self._task = self.create_task(self._run())

    async def _run(self):
        # Your long-running background logic here.
        pass

    async def cleanup(self):
        if self._task:
            await self.cancel_task(self._task)
        await super().cleanup()
```

<Note>
  If you give your observer a custom `__init__`, you must call
  `super().__init__()`. Skipping it leaves the observer partially initialized
  and raises errors such as `'CustomObserver' object has no attribute '_name'`
  at runtime.
</Note>

## Available Observers

Pipecat provides several built-in observers:

* **LLMLogObserver**: Logs LLM activity and responses
* **TranscriptionLogObserver**: Logs speech-to-text transcription events
* **RTVIObserver**: Converts internal frames to RTVI protocol messages for server to client messaging
* **[StartupTimingObserver](/api-reference/server/utilities/observers/startup-timing-observer)**: Measures processor startup times and transport readiness
* **[UserBotLatencyObserver](/api-reference/server/utilities/observers/user-bot-latency-observer)**: Measures user-to-bot response latency
* **[TurnTrackingObserver](/api-reference/server/utilities/observers/turn-tracking-observer)**: Tracks conversation turns and events

## Using Multiple Observers

You can attach multiple observers to a pipeline worker. Each observer will be notified of all frames:

```python theme={null}
worker = PipelineWorker(
    pipeline,
    params=PipelineParams(
        observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
    ),
)
```

## Example: Debug Observer

Here's an example observer that logs interruptions and bot speaking events:

```python theme={null}
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
    InterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger

class DebugObserver(BaseObserver):
    """Observer to log interruptions and bot speaking events to the console.

    Logs all frame instances of:
    - InterruptionFrame
    - BotStartedSpeakingFrame
    - BotStoppedSpeakingFrame

    This allows you to see the frame flow from processor to processor through the pipeline for these frames.
    Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
    """

    async def on_push_frame(self, data: FramePushed):
        time_sec = data.timestamp / 1_000_000_000
        arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"

        if isinstance(data.frame, InterruptionFrame):
            logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStartedSpeakingFrame):
            logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStoppedSpeakingFrame):
            logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
```

## Common Use Cases

Observers are particularly useful for:

* Debugging frame flow
* Logging specific events
* Monitoring pipeline behavior
* Collecting metrics
* Converting internal frames to external messages
