Kitkat gives you a single, consistent interface to Anthropic Claude, OpenAI GPT, and Google Gemini — with streaming, BYOK (Bring Your Own Key), extended thinking, and typed responses that work identically across every provider. You can switch provider by changing two lines. Your request, response, and error handling stay exactly the same.
Every major LLM SDK has a different API, different streaming protocol, different error shapes, and different retry semantics. Switching providers means rewriting request code, stream parsers, and error handlers.
Kitkat solves this with a thin, typed abstraction layer that:
- Lets you swap providers without touching business logic
- Ships a real async-first design — not a sync wrapper with
asyncio.run - Stays minimal — install only the providers you actually use
- Is built to be extended — a clear ABC makes writing custom providers trivial
- Fails loudly and precisely — every error maps to a specific, typed exception
Kitkat uses an opt-in extras model. The core package is small and dependency-free; provider SDKs are installed only when you ask for them.
# Anthropic Claude only
pip install kitkat[anthropic]
# OpenAI (and OpenAI-compatible endpoints)
pip install kitkat[openai]
# Google Gemini (including Vertex AI)
pip install kitkat[gemini]
# Redis cache backend (for multi-process / multi-instance deployments)
pip install kitkat[redis]
# All three providers at once
pip install kitkat[all-providers]
# Everything (all providers + Redis)
pip install kitkat[all]Requires Python 3.11+
Using
uv?uv add kitkat[all]
import asyncio
from kitkat import LLMRequest, Message, Role
from kitkat.providers.anthropic import AnthropicProvider, AnthropicConfig
async def main() -> None:
config = AnthropicConfig(api_key="sk-ant-...")
request = LLMRequest(
messages=[Message(role=Role.USER, content="Explain async/await in one paragraph.")],
max_tokens=512,
)
async with AnthropicProvider(config) as provider:
response = await provider.complete(request)
print(f"Model : {response.model}")
print(f"Tokens : {response.usage.total_tokens}")
print(f"Latency : {response.latency_ms:.0f}ms")
print(response.content)
asyncio.run(main())from kitkat import LLMRequest, Message, Role
from kitkat.providers.gemini import GeminiProvider, GeminiConfig
config = GeminiConfig(api_key="AIza...")
# Same request object, same response shape — nothing else changes.| Provider | Extra | Streaming | Thinking | Vertex AI |
|---|---|---|---|---|
| Anthropic Claude | kitkat[anthropic] |
✅ | ✅ | — |
| OpenAI | kitkat[openai] |
✅ | ✅ | — |
| Google Gemini | kitkat[gemini] |
✅ | ✅ | ✅ |
Each provider ships its own typed config dataclass:
from kitkat.providers.openai import OpenAIConfig
config = OpenAIConfig(
api_key="sk-...",
model="gpt-4o",
base_url="https://integrate.api.nvidia.com/v1", # NVIDIA NIM
timeout_s=120.0,
)All providers implement true async streaming. Every chunk is a typed StreamChunk; the final chunk carries aggregated usage and latency.
from kitkat import LLMRequest, Message, Role
from kitkat.providers.anthropic import AnthropicProvider, AnthropicConfig
async with AnthropicProvider(AnthropicConfig(api_key="...")) as provider:
async for chunk in provider.stream(request):
if chunk.is_final:
print(f"\n\nDone — {chunk.usage.total_tokens} tokens")
else:
print(chunk.delta, end="", flush=True)Enable chain-of-thought reasoning for providers that support it (Claude, Gemini, OpenAI o-series):
from kitkat import LLMRequest, Message, Role, ThinkingConfig
request = LLMRequest(
messages=[Message(role=Role.USER, content="Solve this step by step: ...")],
thinking=ThinkingConfig(enabled=True, effort="high"),
)
response = await provider.complete(request)
print(response.thinking_content) # the reasoning trace
print(response.content) # the final answerThe BYOKLLMService accepts a user-supplied API key per-request, creating a lightweight client without a pre-flight credential probe. This is designed for multi-tenant applications where each user provides their own key.
from kitkat.service import BYOKLLMService
from kitkat import LLMRequest, Message, Role, ProviderType
service = BYOKLLMService()
response = await service.complete(
request=LLMRequest(messages=[Message(role=Role.USER, content="Hello")]),
provider_type=ProviderType.OPENAI,
api_key="sk-user-supplied-key",
)Every provider exposes a synchronous count_tokens() method. Providers delegate to their native tokenizer where available, with a tiktoken-based fallback for models not yet supported by tiktoken:
n = provider.count_tokens("How many tokens is this sentence?")
print(n) # e.g. 8Retry behaviour is configurable per-provider or per-call. Transient errors (429, 500–504) are retried with exponential back-off and optional jitter. Auth failures and token limit errors are never retried.
from kitkat import RetryPolicy
response = await provider.complete_with_retry(
request,
policy=RetryPolicy(
max_attempts=4,
base_delay_s=1.0,
max_delay_s=30.0,
jitter=True,
),
)LLMRouter puts a pool of providers behind a single complete() / stream() call. When the primary provider is down, rate-limited, or slow, the router automatically falls back to the next one — without any changes to your calling code.
Strategies: FAILOVER (priority order) · ROUND_ROBIN (cycle) · LEAST_LATENCY (auto-pick fastest) · RANDOM
import asyncio
import os
from kitkat import LLMRequest, Message, Role, RoutingStrategy
from kitkat.service.router import LLMRouter, RouterConfig
from kitkat.providers.anthropic import AnthropicProvider, AnthropicConfig
from kitkat.providers.openai import OpenAIProvider, OpenAIConfig
async def main() -> None:
# LLMRouter.build() initialises providers concurrently and skips any
# that fail to start — so one bad API key doesn't block the whole pool.
router = await LLMRouter.build(
providers=[
AnthropicProvider(AnthropicConfig(api_key=os.environ["ANTHROPIC_API_KEY"])),
OpenAIProvider(OpenAIConfig(api_key=os.environ["OPENAI_API_KEY"])),
],
config=RouterConfig(strategy=RoutingStrategy.FAILOVER),
)
request = LLMRequest(
messages=[Message(role=Role.USER, content="Summarise async/await in one paragraph.")],
)
async with router:
response = await router.complete(request)
print(f"Answered by : {response.provider}")
print(f"Latency : {response.latency_ms:.0f}ms")
print(response.content)
asyncio.run(main())Per-provider circuit breakers open automatically after repeated failures, letting the router skip an unhealthy provider without probing it on every request. They close again after the provider recovers:
from kitkat.service.router import RouterConfig, CircuitBreakerConfig
config = RouterConfig(
strategy=RoutingStrategy.FAILOVER,
circuit_breaker=CircuitBreakerConfig(
failure_threshold=5, # open after 5 consecutive failures
recovery_timeout_s=60.0, # probe again after 60 s
success_threshold=2, # close after 2 consecutive successes
),
)You can inspect router health and manually reset a breaker at any time:
# Full status snapshot (circuit states, latency, error rates)
status = await router.status()
# Manually reset a tripped breaker (returns True if the provider was found)
reset = await router.reset_circuit_breaker(ProviderType.ANTHROPIC)Streaming through the router works identically to single-provider streaming. If a provider fails before yielding the first token, the router transparently retries on the next one:
async with router:
async for chunk in router.stream(request):
if not chunk.is_final:
print(chunk.delta, end="", flush=True)LLMCache caches non-streaming responses so identical requests never hit the API twice. The cache key is a deterministic SHA-256 hash of the prompt, model, and generation parameters — metadata and timeout are intentionally excluded because they don't affect what the model generates.
Standalone cache (in-memory, zero extra dependencies):
from kitkat.service.cache import LLMCache, CacheConfig
from kitkat import CacheBackendType
cache = LLMCache(CacheConfig(
backend=CacheBackendType.MEMORY,
ttl_seconds=3_600, # entries live for 1 hour
max_memory_size=500, # LRU-evict after 500 entries
))
cached = await cache.get(request)
if cached is None:
response = await provider.complete(request)
await cache.set(request, response)
else:
response = cachedRedis cache (multi-process / multi-instance deployments, requires pip install kitkat[redis]):
from kitkat.service.cache import LLMCache, CacheConfig
from kitkat import CacheBackendType
cache = LLMCache(CacheConfig(
backend=CacheBackendType.REDIS,
redis_url=os.environ["REDIS_URL"],
ttl_seconds=7_200,
key_prefix="myapp:llm:", # isolate from other apps on the same Redis
))Cache + router together — the most common production setup. Pass a CacheConfig inside RouterConfig and the router handles get/set automatically:
from kitkat.service.router import LLMRouter, RouterConfig
from kitkat.service.cache import CacheConfig
from kitkat import CacheBackendType, RoutingStrategy
router = await LLMRouter.build(
providers=[anthropic_provider, openai_provider],
config=RouterConfig(
strategy=RoutingStrategy.FAILOVER,
enable_cache=True,
cache_on_truncated=False, # don't cache MAX_TOKENS-truncated responses
cache=CacheConfig(
backend=CacheBackendType.MEMORY,
ttl_seconds=3_600,
),
),
)Or use the factory shorthand:
from kitkat import create_llm_router, RoutingStrategy
router = create_llm_router(
providers=[anthropic_provider, openai_provider],
strategy=RoutingStrategy.FAILOVER,
enable_cache=True,
)Note —
CONTENT_FILTERandERRORresponses are never cached regardless of configuration. A truncated response (finish_reason=LENGTH) is only cached whencache_on_truncated=True.
All exceptions are typed subclasses of LLMError. Catch the base class for a broad handler or specific subclasses for fine-grained recovery:
from kitkat import (
LLMAuthenticationError,
LLMRateLimitError,
LLMTimeoutError,
LLMTokenLimitError,
LLMError,
)
try:
response = await provider.complete(request)
except LLMAuthenticationError:
# Bad API key — do not retry
raise
except LLMRateLimitError as exc:
# Honour the Retry-After header if present
await asyncio.sleep(exc.retry_after_s or 5.0)
except LLMTokenLimitError as exc:
# Prompt is too long for the model's context window
print(f"Prompt has ~{exc.token_count} tokens")
except LLMTimeoutError:
# Request exceeded timeout_s
pass
except LLMError as exc:
# Everything else (provider errors, connection errors, etc.)
print(f"[{exc.provider}] {exc}")Exception hierarchy:
LLMError
└── LLMProviderError
├── LLMProviderInitError
├── LLMAuthenticationError
├── LLMRateLimitError
├── LLMTokenLimitError
├── LLMTimeoutError
└── LLMContentFilterError
Implement LLMProvider to add any custom or private endpoint. The library discovers providers via Python entry-points — third-party packages can ship providers without modifying Kitkat itself.
from collections.abc import AsyncIterator
from kitkat import (
LLMProvider, FinishReason, LLMRequest, LLMResponse, Message,
ProviderCapabilities, ProviderType, StreamChunk, TokenUsage,
)
class MyProvider(LLMProvider):
PROVIDER_TYPE = ProviderType.OPENAI # reuse an existing slot
DEFAULT_MODEL = "my-model-v1"
CAPABILITIES = ProviderCapabilities(
supports_streaming=True,
max_context_tokens=32_768,
provider_type=ProviderType.OPENAI,
)
async def initialize(self) -> None:
self._client = MySDKClient(api_key=self._config["api_key"])
self._initialized = True
async def shutdown(self) -> None:
await self._client.aclose()
self._initialized = False
async def _init_client_only(self) -> None:
if not self._initialized:
self._client = MySDKClient(api_key=self._config["api_key"])
self._initialized = True
async def complete(self, request: LLMRequest) -> LLMResponse:
...
async def stream(self, request: LLMRequest) -> AsyncIterator[StreamChunk]:
yield StreamChunk(delta="hello")
yield StreamChunk(delta="", is_final=True, finish_reason=FinishReason.STOP)
async def health_check(self) -> bool:
return self._initialized
def count_tokens(self, text: str) -> int:
from kitkat._internal.tokenizers import count_tokens_tiktoken
return count_tokens_tiktoken(text)Register it via pyproject.toml so it's auto-discovered:
[project.entry-points."kitkat.providers"]
my-llm = "mypkg.provider:MyProvider"Contributions are welcome — bug reports, documentation improvements, new features, and tests are all appreciated. Please read CONTRIBUTING.md before opening a pull request.
Development setup:
git clone https://github.com/RajeshTechForge/kitkat.git
cd kitkat
# Create the virtual environment and install all dev dependencies
uv sync --extra dev
# Run the unit test suite
uv run pytest tests/unit/ -v
# Lint and format
uv run ruff check .
uv run ruff format .MIT © 2026 Rajesh Mondal
See LICENSE for the full text.