Skip to main content
Pipelines provide a powerful plugin framework that allows you to seamlessly integrate custom logic and Python libraries into Open WebUI. The framework acts as middleware between the UI and your language models, enabling you to modify requests and responses.

Overview

Pipelines can be used to:
  • Add custom logic before and after model inference
  • Implement rate limiting and usage monitoring
  • Filter and transform messages
  • Integrate third-party services
  • Enable live translation
  • Monitor and log conversations
Pipelines run as a separate service and communicate with Open WebUI through OpenAI-compatible API endpoints.

Installation

1

Install Pipelines

Clone and set up the Pipelines repository:
git clone https://github.com/open-webui/pipelines.git
cd pipelines
pip install -r requirements.txt
2

Start Pipelines Server

Launch the Pipelines service:
python main.py
By default, the server runs on http://localhost:9099
3

Configure Open WebUI

In Open WebUI admin settings, add the Pipelines URL as an OpenAI API endpoint:
  • Navigate to Admin Panel → Settings → Connections
  • Add new OpenAI API URL: http://localhost:9099
  • Set an API key (any string will work for local development)

Pipeline Architecture

Pipelines implement two main filter types:

Inlet Filters

Process requests before they reach the model:
class Pipeline:
    async def inlet(self, body: dict, user: dict) -> dict:
        # Modify the request body
        messages = body.get("messages", [])
        
        # Add system context
        messages.insert(0, {
            "role": "system",
            "content": "You are a helpful assistant."
        })
        
        body["messages"] = messages
        return body

Outlet Filters

Process responses after the model generates them:
class Pipeline:
    async def outlet(self, body: dict, user: dict) -> dict:
        # Modify the response
        messages = body.get("messages", [])
        
        # Log or transform response
        if messages:
            last_message = messages[-1]
            print(f"Model response: {last_message}")
        
        return body

Managing Pipelines

Upload Pipeline

Admin users can upload custom pipeline files through the API:
import requests

url = "http://localhost:8080/api/pipelines/upload"
files = {"file": open("my_pipeline.py", "rb")}
data = {"urlIdx": 0}

response = requests.post(url, files=files, data=data)

Add Pipeline from URL

Install pipelines directly from a URL:
import requests

url = "http://localhost:8080/api/pipelines/add"
payload = {
    "url": "https://github.com/user/repo/pipeline.py",
    "urlIdx": 0
}

response = requests.post(url, json=payload)

List Available Pipelines

Retrieve all configured pipeline instances:
import requests

response = requests.get("http://localhost:8080/api/pipelines/list")
pipelines = response.json()

Valves Configuration

Pipelines support configurable parameters called “valves”:
from pydantic import BaseModel, Field

class Pipeline:
    class Valves(BaseModel):
        priority: int = Field(
            default=0,
            description="Pipeline priority (higher runs first)"
        )
        enabled: bool = Field(
            default=True,
            description="Enable or disable this pipeline"
        )
        api_key: str = Field(
            default="",
            description="External API key"
        )
    
    def __init__(self):
        self.valves = self.Valves()

Get Pipeline Valves

import requests

response = requests.get(
    "http://localhost:8080/api/pipelines/my-pipeline/valves",
    params={"urlIdx": 0}
)
valves = response.json()

Update Pipeline Valves

import requests

url = "http://localhost:8080/api/pipelines/my-pipeline/valves/update"
payload = {
    "priority": 10,
    "enabled": True,
    "api_key": "sk-..."
}

response = requests.post(url, json=payload, params={"urlIdx": 0})

Common Use Cases

import time
from collections import defaultdict

class Pipeline:
    def __init__(self):
        self.user_requests = defaultdict(list)
        self.max_requests = 10
        self.time_window = 60  # seconds
    
    async def inlet(self, body: dict, user: dict) -> dict:
        user_id = user["id"]
        current_time = time.time()
        
        # Clean old requests
        self.user_requests[user_id] = [
            req_time for req_time in self.user_requests[user_id]
            if current_time - req_time < self.time_window
        ]
        
        # Check rate limit
        if len(self.user_requests[user_id]) >= self.max_requests:
            raise Exception("Rate limit exceeded. Please try again later.")
        
        self.user_requests[user_id].append(current_time)
        return body
import re

class Pipeline:
    def __init__(self):
        self.blocked_patterns = [
            r"\b(password|secret|api[_-]?key)\b",
            r"\b\d{3}-\d{2}-\d{4}\b",  # SSN pattern
        ]
    
    async def inlet(self, body: dict, user: dict) -> dict:
        messages = body.get("messages", [])
        
        for message in messages:
            content = message.get("content", "")
            for pattern in self.blocked_patterns:
                if re.search(pattern, content, re.IGNORECASE):
                    raise Exception("Message contains sensitive information")
        
        return body
import logging
from datetime import datetime

class Pipeline:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    async def inlet(self, body: dict, user: dict) -> dict:
        self.logger.info(f"Request from {user['email']} at {datetime.now()}")
        return body
    
    async def outlet(self, body: dict, user: dict) -> dict:
        messages = body.get("messages", [])
        if messages:
            response_length = len(str(messages[-1]))
            self.logger.info(f"Response to {user['email']}: {response_length} chars")
        return body

Pipeline Priority

Multiple pipelines can be chained together. The execution order is determined by priority:
class Pipeline:
    class Valves(BaseModel):
        priority: int = Field(default=0)
    
    def __init__(self):
        self.valves = self.Valves(priority=10)  # Higher runs first
Inlet filters execute in descending priority order (highest first). Outlet filters execute in ascending priority order (lowest first).

Troubleshooting

Ensure your pipeline file:
  • Is a valid Python file (.py extension)
  • Contains a Pipeline class
  • Implements inlet() and/or outlet() methods
  • Handles exceptions appropriately

Common Issues

Pipeline not appearing in Open WebUI:
  • Check that the Pipelines server is running
  • Verify the OpenAI API URL is correctly configured
  • Ensure the pipeline file was uploaded successfully
Pipeline errors:
  • Check server logs for Python exceptions
  • Validate that all dependencies are installed
  • Ensure valve configuration is valid

Next Steps