Optimiser les coûts LLM en production : techniques concrètes
Votre POC avec GPT-4 coûtait 50€/mois. En production avec 10,000 utilisateurs, la facture explose à 15,000€/mois. Ce scénario est courant. Voici les techniques pour réduire drastiquement vos coûts LLM sans sacrifier la qualité.
Comprendre la structure des coûts
Anatomie d'une facture LLM
| Composant | Impact | Levier d'optimisation |
|-----------|--------|----------------------|
| Tokens d'entrée | 30-40% | Compression, cache |
| Tokens de sortie | 50-60% | Contraintes, streaming |
| Latence | Indirect | Async, batch |
| Appels API | Variable | Cache, batch |
Tarifs comparés (janvier 2026)
| Modèle | Input (1M tokens) | Output (1M tokens) | Contexte |
|--------|-------------------|--------------------| ---------|
| GPT-4o | $2.50 | $10.00 | 128K |
| Claude Sonnet | $3.00 | $15.00 | 200K |
| Claude Haiku | $0.25 | $1.25 | 200K |
| GPT-4o-mini | $0.15 | $0.60 | 128K |
| Mistral Large | $2.00 | $6.00 | 128K |
| Llama 3.1 70B (self-hosted) | ~$0.50 | ~$0.50 | 128K |
Calculer son coût actuel
def calculate_monthly_cost(
requests_per_day: int,
avg_input_tokens: int,
avg_output_tokens: int,
input_price_per_1m: float,
output_price_per_1m: float
) -> float:
"""Calculer le coût mensuel estimé."""
daily_input_tokens = requests_per_day * avg_input_tokens
daily_output_tokens = requests_per_day * avg_output_tokens
monthly_input_tokens = daily_input_tokens * 30
monthly_output_tokens = daily_output_tokens * 30
input_cost = (monthly_input_tokens / 1_000_000) * input_price_per_1m
output_cost = (monthly_output_tokens / 1_000_000) * output_price_per_1m
return input_cost + output_cost
Technique 1 : Cache sémantique
Le problème
50% de vos requêtes sont des variations de la même question. Vous payez le LLM à chaque fois.
La solution
Implémenter un cache qui reconnaît les questions similaires.
import hashlib
import redis
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticCache:
def __init__(self, redis_url: str, similarity_threshold: float = 0.92):
self.redis = redis.from_url(redis_url)
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.threshold = similarity_threshold
def _get_embedding(self, text: str) -> np.ndarray:
return self.encoder.encode(text)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, query: str) -> str | None:
"""Chercher une réponse en cache."""
query_embedding = self._get_embedding(query)
# Parcourir les clés existantes
for key in self.redis.scan_iter("cache:*"):
cached = self.redis.hgetall(key)
if cached:
cached_embedding = np.frombuffer(cached[b'embedding'], dtype=np.float32)
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity >= self.threshold:
return cached[b'response'].decode()
return None
def set(self, query: str, response: str, ttl: int = 3600):
"""Stocker une réponse en cache."""
embedding = self._get_embedding(query)
key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"
self.redis.hset(key, mapping={
'query': query,
'response': response,
'embedding': embedding.tobytes()
})
self.redis.expire(key, ttl)
# Utilisation
cache = SemanticCache("redis://localhost:6379")
def query_llm_with_cache(query: str) -> str:
# Vérifier le cache
cached = cache.get(query)
if cached:
return cached # Gratuit !
ROI du cache
| Métrique | Sans cache | Avec cache (50% hit) |
|----------|------------|---------------------|
| Requêtes/jour | 10,000 | 10,000 |
| Appels LLM | 10,000 | 5,000 |
| Coût/jour | $100 | $50 |
| Économie | - | 50% |
Technique 2 : Routing intelligent
Le problème
Vous utilisez GPT-4 pour tout, y compris les questions simples qui pourraient être traitées par un modèle moins cher.
La solution
Router vers le modèle approprié selon la complexité.
from enum import Enum
from anthropic import Anthropic
class ModelTier(Enum):
SIMPLE = "claude-3-haiku-20240307" # $0.25/1M
MEDIUM = "claude-3-5-sonnet-20241022" # $3.00/1M
COMPLEX = "claude-3-opus-20240229" # $15.00/1M
class SmartRouter:
def __init__(self, client: Anthropic):
self.client = client
def classify_complexity(self, query: str) -> ModelTier:
"""Classifier la complexité de la requête."""
# Règles heuristiques
simple_patterns = [
"traduis", "résume en une phrase", "oui ou non",
"quel est", "définis", "date de"
]
complex_patterns = [
"analyse en détail", "compare et contraste",
"génère du code", "écris un article", "planifie"
]
query_lower = query.lower()
for pattern in complex_patterns:
if pattern in query_lower:
return ModelTier.COMPLEX
for pattern in simple_patterns:
if pattern in query_lower:
return ModelTier.SIMPLE
# Par défaut : medium
return ModelTier.MEDIUM
def route(self, query: str, context: str = "") -> str:
"""Router vers le modèle approprié."""
tier = self.classify_complexity(query)
response = self.client.messages.create(
model=tier.value,
max_tokens=1000,
messages=[{"role": "user", "content": query}]
)
return response.content[0].text
Classification par ML
Pour une classification plus précise :
from sklearn.ensemble import RandomForestClassifier
import joblib
class MLRouter:
def __init__(self, model_path: str):
self.classifier = joblib.load(model_path)
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
ROI du routing
| Trafic | Avant (tout GPT-4) | Après (routing) |
|--------|-------------------|-----------------|
| Simple (40%) | $40/jour | $4/jour (Haiku) |
| Medium (45%) | $45/jour | $27/jour (Sonnet) |
| Complex (15%) | $15/jour | $15/jour (GPT-4) |
| Total | $100/jour | $46/jour |
| Économie | - | 54% |
Technique 3 : Compression de contexte
Le problème
Votre prompt système fait 2000 tokens. Chaque requête paie ces 2000 tokens.
Solutions
1. Résumé automatique du contexte
def compress_context(context: str, target_tokens: int = 500) -> str:
"""Résumer le contexte pour réduire les tokens."""
response = client.messages.create(
model="claude-3-haiku-20240307", # Modèle pas cher pour résumer
max_tokens=target_tokens,
messages=[{
"role": "user",
"content": f"Résume ce contexte en gardant les informations essentielles:\n\n{context}"
}]
)
return response.content[0].text
2. Prompt caching (Anthropic)
from anthropic import Anthropic
client = Anthropic()
# Le contexte système avec cache
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": "Vous êtes un expert en finance...", # 2000 tokens
"cache_control": {"type": "ephemeral"}
}
],
messages=[{"role": "user", "content": "Question de l'utilisateur"}]
)
3. Extraction des informations clés
def extract_relevant_context(query: str, documents: list[str]) -> str:
"""Extraire uniquement le contexte pertinent."""
# Embeddings pour trouver les passages pertinents
query_embedding = encoder.encode(query)
doc_embeddings = encoder.encode(documents)
# Top 3 documents les plus pertinents
similarities = cosine_similarity([query_embedding], doc_embeddings)[0]
top_indices = similarities.argsort()[-3:][::-1]
Technique 4 : Batching
Le problème
Vous envoyez des requêtes une par une. L'overhead par requête s'accumule.
La solution
Regrouper les requêtes similaires.
import asyncio
from collections import defaultdict
from typing import Callable
class BatchProcessor:
def __init__(
self,
batch_size: int = 10,
wait_time: float = 0.5,
process_fn: Callable = None
):
self.batch_size = batch_size
self.wait_time = wait_time
self.process_fn = process_fn
self.pending = []
self.lock = asyncio.Lock()
async def add(self, item: dict) -> asyncio.Future:
"""Ajouter un item au batch."""
future = asyncio.Future()
async with self.lock:
self.pending.append((item, future))
if len(self.pending) >= self.batch_size:
await self._process_batch()
return future
async def _process_batch(self):
"""Traiter le batch actuel."""
if not self.pending:
return
batch = self.pending[:self.batch_size]
self.pending = self.pending[self.batch_size:]
items = [item for item, _ in batch]
futures = [future for _, future in batch]
try:
results = await self.process_fn(items)
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
# Utilisation avec OpenAI batch API
async def process_batch(items: list[dict]) -> list[str]:
# OpenAI Batch API pour 50% de réduction
batch_response = await openai.batches.create(
input_file_id=upload_batch_file(items),
endpoint="/v1/chat/completions",
completion_window="24h"
)
return extract_results(batch_response)
ROI du batching
| Méthode | Prix/1M tokens | Latence |
|---------|----------------|---------|
| Temps réel | $10.00 | <1s |
| Batch API | $5.00 | 24h max |
| Économie | 50% | - |
Technique 5 : Modèles self-hosted
Le problème
À grande échelle, les API deviennent plus chères que l'infrastructure.
Point de bascule
def calculate_breakeven(
api_cost_per_1m: float,
monthly_tokens: int,
gpu_cost_per_hour: float,
tokens_per_second: int
) -> dict:
"""Calculer le point de rentabilité self-hosted."""
# Coût API
api_monthly = (monthly_tokens / 1_000_000) * api_cost_per_1m
# Coût self-hosted
seconds_needed = monthly_tokens / tokens_per_second
hours_needed = seconds_needed / 3600
self_hosted_monthly = hours_needed * gpu_cost_per_hour
return {
"api_cost": api_monthly,
"self_hosted_cost": self_hosted_monthly,
"savings": api_monthly - self_hosted_monthly,
"savings_percent": (api_monthly - self_hosted_monthly) / api_monthly * 100
}
Quand passer au self-hosted
| Tokens/mois | Recommandation |
|-------------|----------------|
| < 10M | API |
| 10-100M | API ou hybride |
| 100M-1B | Self-hosted + API fallback |
| > 1B | Full self-hosted |
Stack self-hosted recommandée
# docker-compose.yml pour vLLM
services:
vllm:
image: vllm/vllm-openai:latest
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
environment:
- MODEL=meta-llama/Llama-3.1-70B-Instruct
- MAX_MODEL_LEN=8192
ports:
- "8000:8000"
volumes:
- ./models:/root/.cache/huggingface
Technique 6 : Output constraints
Le problème
Le LLM génère des réponses de 500 tokens quand 50 suffisent.
Solutions
1. Limiter max_tokens
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100, # Forcer la concision
messages=[{"role": "user", "content": query}]
)
2. Structured output
from pydantic import BaseModel
class ShortAnswer(BaseModel):
answer: str # Max 50 chars dans le prompt
confidence: float
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Réponds en JSON avec ce format:
{{"answer": "réponse courte (max 50 caractères)", "confidence": 0.0-1.0}}
3. Streaming avec early stop
async def stream_with_cutoff(query: str, max_chars: int = 500):
"""Arrêter le streaming dès qu'on a assez."""
collected = ""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=[{"role": "user", "content": query}]
) as stream:
async for chunk in stream:
collected += chunk.text
if len(collected) >= max_chars:
break # Stop paying for more tokens
Tableau récapitulatif
| Technique | Économie | Complexité | Impact qualité |
|-----------|----------|------------|----------------|
| Cache sémantique | 30-60% | Moyenne | Aucun |
| Routing intelligent | 40-60% | Moyenne | Faible |
| Compression contexte | 20-40% | Faible | Faible |
| Batching | 50% | Moyenne | Aucun (latence+) |
| Self-hosted | 50-80% | Élevée | Variable |
| Output constraints | 20-40% | Faible | Moyen |
Plan d'action
Phase 1 : Quick wins (semaine 1-2)
1. Implémenter le cache sémantique Redis
2. Ajouter des contraintes de tokens max
3. Activer le prompt caching Anthropic
Économie attendue : 30-40%
Phase 2 : Optimisation (semaine 3-4)
1. Déployer le routing intelligent
2. Mettre en place le batching pour les tâches async
3. Compresser les contextes longs
Économie attendue : 50-60%
Phase 3 : Scale (mois 2+)
1. Évaluer le self-hosted pour les workloads prévisibles
2. Implémenter un système hybride API + self-hosted
3. Monitorer et ajuster en continu
Économie attendue : 60-80%
Monitoring des coûts
import prometheus_client as prom
# Métriques
llm_tokens_total = prom.Counter(
'llm_tokens_total',
'Total tokens consumed',
['model', 'direction'] # input/output
)
llm_cost_total = prom.Counter(
'llm_cost_dollars',
'Total cost in dollars',
['model']
)
llm_cache_hits = prom.Counter(
'llm_cache_hits_total',
'Cache hit count'
)
# Wrapper de tracking
def track_llm_call(model: str, input_tokens: int, output_tokens: int):
llm_tokens_total.labels(model=model, direction='input').inc(input_tokens)
llm_tokens_total.labels(model=model, direction='output').inc(output_tokens)
Conclusion
Optimiser les coûts LLM n'est pas une option, c'est une nécessité pour la viabilité de vos projets IA en production. Les techniques présentées peuvent réduire votre facture de 60-80% sans impact majeur sur la qualité.
Commencez par le cache sémantique et le routing : ce sont les quick wins avec le meilleur ROI. Puis affinez avec la compression et le batching. Le self-hosted n'est pertinent qu'à très grande échelle.
Règle d'or : mesurez avant d'optimiser. Sans métriques précises, vous optimisez à l'aveugle.
---
Pour comprendre les formats économiques : [TOON format : réduire les coûts LLM de 40%](/toon-format-reduire-couts-llm/)
Pour l'architecture RAG : [RAG en production : architecture simple qui fonctionne vraiment](/rag-production-architecture-simple/)