Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.livepeer.org/llms.txt

Use this file to discover all available pages before exploring further.


PyTrickle (livepeer/pytrickle) is a Python package for building custom real-time video processing services over the trickle protocol. It sits one level below ComfyStream: where ComfyStream runs ComfyUI workflows, PyTrickle provides the FrameProcessor abstraction for arbitrary Python processing logic backed by asyncio. The package handles the trickle protocol connection, video decoding via FFmpeg, frame delivery as PyTorch tensors, and video encoding of processed output. A PyTrickle service exposes a StreamServer HTTP API that Livepeer orchestrators connect to when routing live-video-to-video jobs.

FrameProcessor

FrameProcessor is the base class for all PyTrickle processing services. Subclass it and implement process_video_async to define how each frame is transformed.
from pytrickle import FrameProcessor, StreamServer
from pytrickle.frames import VideoFrame, AudioFrame
from typing import Optional, List

class MyProcessor(FrameProcessor):

    async def initialize(self):
        # Load AI models or initialise any resources here.
        # Called once before the first frame arrives.
        pass

    async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]:
        # frame.tensor is a PyTorch tensor: [H, W, C], float32, 0-1 range
        tensor = frame.tensor.clone()
        # Apply your model, filter, or transform here.
        return frame.replace_tensor(tensor)

    async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFrame]]:
        # Return the frame unchanged or process it.
        return [frame]

    def update_params(self, params: dict):
        # Called when the orchestrator sends a parameter update.
        # Update instance variables to change behaviour at runtime.
        pass
VideoFrame.tensor is a CPU PyTorch tensor in HWC format with float32 values in [0, 1]. frame.replace_tensor(new_tensor) creates a new VideoFrame with the updated pixel data while preserving frame metadata.

StreamServer

StreamServer wraps a FrameProcessor and exposes the HTTP endpoint that orchestrators connect to:
async def main():
    processor = MyProcessor()
    await processor.start()  # calls initialize()

    app = StreamServer(
        frame_processor=processor,
        port=8000,
        capability_name="my-processor",
    )
    await app.run_forever()

import asyncio
asyncio.run(main())
The StreamServer accepts trickle stream connections and runs the processor loop. The HTTP API includes endpoints for starting and stopping streams, querying status, and receiving parameter updates.

SDK Responsibilities

Prerequisites

  • Python 3.8+
  • PyTorch (install separately; version must match your CUDA version)
  • FFmpeg available on PATH
  • http-trickle binary for local testing: git clone https://github.com/livepeer/http-trickle.git && cd http-trickle && make build
Use the PyTrickle quickstart for a working service in under 20 minutes.

PyTrickle Quickstart

Install PyTrickle, write a minimal FrameProcessor, and run it against a test stream.

Frame Processor Reference

Full FrameProcessor API: VideoFrame, AudioFrame, update_params, and StreamServer options.

Real-Time AI Overview

Cascade architecture and how PyTrickle services fit into the Livepeer pipeline.

ComfyStream Workflow Authoring

ComfyUI-based alternative for real-time AI pipelines without custom Python code.
Last modified on May 19, 2026