Feature Engineering Temps Réel : Le Chaînon Manquant entre Données et Modèles IA
En 2026, 78% des échecs ML en production sont liés aux features, pas aux modèles (Gartner Report). Pendant que les data scientists optimisent des architectures transformers complexes, les pipelines de features sont construits à la va-vite avec SQL batch, créant un écart fatal entre entraînement et production : training-serving skew.
Le feature engineering temps réel résout ce problème en calculant les features à la volée sur des streams de données (Kafka, Pulsar), permettant aux modèles ML de décider instantanément avec des features fraîches. Les cas d'usage explosent : détection de fraude (réagir en < 100ms), recommandations personnalisées (features utilisateur en temps réel), trading algorithmique (features marché au microseconde).
Cet article technique explore l'architecture complète du feature engineering streaming : Kafka + Flink pour transformation temps réel, Feature Stores (Feast, Tecton) pour servir les features, et patternslambda/kappa pour réconcilier batch et streaming.
Le Problème du Feature Engineering Classique
Batch Features : Trop Lent pour 2026
Architecture batch traditionnelle :
-- Features calculées une fois par jour (batch Spark)
SELECT
user_id,
COUNT(*) as transactions_last_30days,
AVG(amount) as avg_transaction_amount,
MAX(amount) as max_transaction_amount
FROM transactions
WHERE timestamp >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY user_id;
Problèmes :
- ❌ Latence : Features vieilles de 24h (trop pour fraud detection)
- ❌ Training-serving skew : Batch training, streaming inference
- ❌ Features manquantes : Impossible de calculer "transactions dans les 5 dernières minutes"
- ❌ Coût : Recalculer tout chaque jour (inefficace)
Exemple concret :
Une transaction frauduleuse à 14h30 ne sera détectée qu'à 00h00 le lendemain quand les features batch sont recalculées → Perte de $50k en moyenne.
L'Écart Training-Serving (Fatal)
# TRAINING : Features batch (Spark)
df_train = spark.sql("""
SELECT user_id,
COUNT(*) as tx_count_30d,
transaction_id, label
FROM transactions
GROUP BY user_id
""")
model.fit(df_train)
# ✅ Features calculées offline sur historique complet
# PRODUCTION : Serving temps réel
def predict_fraud(transaction):
# ❌ Comment calculer tx_count_30d en temps réel ?
# Option 1 : Query DB → 500ms de latence (inacceptable)
# Option 2 : Cache Redis → stale data, pas à jour
# Option 3 : Recalculer à la volée → coût prohibitif
Feature Engineering Temps Réel : La Solution
Architecture streaming :
Events Kafka → Flink Feature Transform → Feature Store → Model Serving
(< 1ms) (< 10ms) (< 1ms) (< 5ms)
Avantages :
- ✅ Latence < 20ms : Features fraîches pour chaque prédiction
- ✅ Pas de skew : Même pipeline training & serving
- ✅ Features riches : Agrégations glissantes (1min, 5min, 1h)
- ✅ Efficace : Calcul incrémental sur nouveaux events
Architecture Streaming avec Kafka + Flink
1. Kafka : Event Backbone
Topics Kafka :
# Topic événements bruts
transactions:
partitions: 24
retention: 7 days
schema:
user_id: string
transaction_id: string
amount: float
merchant_id: string
timestamp: long
Producer Python :
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish_transaction(transaction):
producer.send(
'transactions',
key=transaction['user_id'].encode('utf-8'),
value=transaction
)
2. Flink : Feature Transformation
Job Flink (PyFlink) :
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.window import Tumble, Slide
# Setup
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(env, settings)
# Source Kafka
table_env.execute_sql("""
CREATE TABLE transactions (
user_id STRING,
transaction_id STRING,
amount DOUBLE,
merchant_id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'feature-engineering',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Features agrégées temps réel
table_env.execute_sql("""
CREATE TABLE user_features (
user_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
-- Agrégations 1 minute
tx_count_1min BIGINT,
total_amount_1min DOUBLE,
avg_amount_1min DOUBLE,
max_amount_1min DOUBLE,
distinct_merchants_1min BIGINT,
-- Agrégations 5 minutes (glissant)
tx_count_5min BIGINT,
avg_amount_5min DOUBLE,
-- Agrégations 1 heure
tx_count_1h BIGINT,
distinct_merchants_1h BIGINT,
PRIMARY KEY (user_id, window_end) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_features',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
)
""")
# Requête Flink SQL : Features agrégées multi-fenêtres
table_env.execute_sql("""
INSERT INTO user_features
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
-- Fenêtre 1 min
COUNT(*) as tx_count_1min,
SUM(amount) as total_amount_1min,
AVG(amount) as avg_amount_1min,
MAX(amount) as max_amount_1min,
COUNT(DISTINCT merchant_id) as distinct_merchants_1min,
-- Fenêtre glissante 5 min (approximation)
SUM(COUNT(*)) OVER (
PARTITION BY user_id
ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as tx_count_5min,
AVG(AVG(amount)) OVER (
PARTITION BY user_id
ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as avg_amount_5min,
-- Fenêtre 1 heure
SUM(COUNT(*)) OVER (
PARTITION BY user_id
ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
) as tx_count_1h,
COUNT(DISTINCT merchant_id) OVER (
PARTITION BY user_id
ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
) as distinct_merchants_1h
3. Feature Store : Feast
Feature definitions :
# features.py
from feast import Entity, Feature, FeatureView, Field
from feast.types import Float64, Int64
from feast.data_source import KafkaSource
from datetime import timedelta
# Entity
user = Entity(name="user_id", join_keys=["user_id"])
# Kafka source
user_features_source = KafkaSource(
name="user_features_kafka",
kafka_bootstrap_servers="kafka:9092",
topic="user_features",
timestamp_field="window_end",
batch_source=None # Streaming only
)
Deploy Feature Store :
# Initialize Feast
feast init feature_repo
cd feature_repo
# Apply features
feast apply
Online serving (API) :
from feast import FeatureStore
store = FeatureStore(repo_path=".")
def get_online_features(user_id):
"""
Récupère features temps réel pour un user
Latence: < 5ms
"""
entity_rows = [{"user_id": user_id}]
features = store.get_online_features(
features=[
"user_transaction_features:tx_count_1min",
"user_transaction_features:avg_amount_1min",
"user_transaction_features:tx_count_5min",
"user_transaction_features:tx_count_1h",
"user_transaction_features:distinct_merchants_1h",
],
entity_rows=entity_rows
).to_dict()
return features
Cas d'Usage : Fraud Detection Temps Réel
Architecture Complète
from fastapi import FastAPI
from feast import FeatureStore
import joblib
app = FastAPI()
feast_store = FeatureStore(repo_path=".")
fraud_model = joblib.load("fraud_model.pkl")
@app.post("/predict_fraud")
async def predict_fraud(transaction: dict):
"""
API fraud detection temps réel
SLA: < 50ms
"""
user_id = transaction['user_id']
# 1. Récupérer features temps réel (< 5ms)
features = feast_store.get_online_features(
features=[
"user_transaction_features:tx_count_1min",
"user_transaction_features:avg_amount_1min",
"user_transaction_features:tx_count_5min",
"user_transaction_features:tx_count_1h",
"user_transaction_features:distinct_merchants_1h",
],
entity_rows=[{"user_id": user_id}]
).to_dict()
# 2. Combiner avec features transaction
feature_vector = [
transaction['amount'],
features['tx_count_1min'][0] or 0,
features['avg_amount_1min'][0] or 0,
features['tx_count_5min'][0] or 0,
features['tx_count_1h'][0] or 0,
features['distinct_merchants_1h'][0] or 1,
# Derived features
transaction['amount'] / (features['avg_amount_1min'][0] or 1), # Ratio vs avg
1 if features['tx_count_1min'][0] > 3 else 0, # Burst flag
]
# 3. Prédiction (< 2ms)
fraud_score = fraud_model.predict_proba([feature_vector])[0][1]
# 4. Décision
is_fraud = fraud_score > 0.85
Règles Business Temps Réel
def advanced_fraud_rules(transaction, features):
"""
Règles business combinées avec ML
"""
flags = []
# Règle 1 : Velocity check
if features['tx_count_1min'] > 5:
flags.append("HIGH_VELOCITY")
# Règle 2 : Amount anomaly
avg_amount = features['avg_amount_1min']
if transaction['amount'] > avg_amount * 3:
flags.append("AMOUNT_SPIKE")
# Règle 3 : Merchant diversity
if features['distinct_merchants_1h'] > 10:
flags.append("MERCHANT_HOPPING")
# Règle 4 : Large transaction after quiet period
if features['tx_count_1h'] == 1 and transaction['amount'] > 1000:
flags.append("LARGE_FIRST_TX")
Architecture Lambda : Batch + Streaming
Pattern recommandé en production :
┌─────────────────────────────────────────────────────┐
│ LAMBDA ARCHITECTURE │
│ │
│ ┌────────────┐ ┌────────────┐ │
│ │ Batch Layer│ │Speed Layer │ │
│ │ (Spark) │ │ (Flink) │ │
│ │ │ │ │ │
│ │ Historical │ │ Real-time │ │
│ │ Features │ │ Features │ │
│ │ (30+ days) │ │ (< 1 hour) │ │
│ └─────┬──────┘ └──────┬─────┘ │
│ │ │ │
│ └───────┬───────────────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │Feature Store │ │
│ │ (Feast) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Model Serving│ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────┘
Implémentation :
# Batch features (Spark, une fois par jour)
def compute_batch_features():
"""
Features lourdes sur historique complet
"""
spark.sql("""
INSERT OVERWRITE TABLE user_features_batch
SELECT
user_id,
COUNT(*) as lifetime_tx_count,
AVG(amount) as lifetime_avg_amount,
STDDEV(amount) as lifetime_stddev_amount,
MIN(timestamp) as first_transaction_date,
DATEDIFF(CURRENT_DATE, MIN(timestamp)) as account_age_days
FROM transactions
WHERE timestamp >= '2020-01-01'
GROUP BY user_id
""")
# Streaming features (Flink, temps réel)
# → Déjà implémenté ci-dessus
# Combinaison au serving
def get_combined_features(user_id):
# Batch features (cache Redis, refresh quotidien)
batch_features = redis_client.hgetall(f"batch_features:{user_id}")
# Streaming features (Feast, temps réel)
streaming_features = feast_store.get_online_features(
features=["user_transaction_features:tx_count_1min", ...],
entity_rows=[{"user_id": user_id}]
).to_dict()
Monitoring et Observabilité
Métriques Clés
from prometheus_client import Counter, Histogram, Gauge
# Latence feature serving
feature_latency = Histogram(
'feature_serving_latency_seconds',
'Feature serving latency',
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1]
)
# Freshness des features
feature_freshness = Gauge(
'feature_freshness_seconds',
'Time since feature was computed',
['feature_name']
)
# Feature nulls (data quality)
feature_nulls = Counter(
'feature_null_count',
'Count of null features',
['feature_name', 'user_id']
)
# Monitoring dans le code
@feature_latency.time()
def get_features(user_id):
features = feast_store.get_online_features(...)
# Check freshness
age = time.time() - features['timestamp'][0]
feature_freshness.labels(feature_name='tx_count_1min').set(age)
# Check nulls
for key, value in features.items():
if value[0] is None:
feature_nulls.labels(feature_name=key, user_id=user_id).inc()
Alertes Critiques
# Prometheus alerts
groups:
name: feature_engineering
rules:
- alert: HighFeatureLatency
expr: histogram_quantile(0.95, feature_serving_latency_seconds) > 0.05
for: 5m
annotations:
summary: "Feature serving latency > 50ms (P95)"
- alert: StaleFeatures
expr: feature_freshness_seconds > 300
for: 2m
annotations:
summary: "Features not updated for 5+ minutes"
Déploiement Production
# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: feature-serving-api
spec:
replicas: 5
template:
spec:
containers:
- name: api
image: feature-serving:1.0.0
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: FEAST_REPO_PATH
value: "/app/feature_repo"
- name: REDIS_URL
value: "redis://redis-cluster:6379"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
# Flink job TaskManager
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 4
template:
spec:
containers:
- name: taskmanager
image: flink:1.18
resources:
requests:
memory: "4Gi"
cpu: "2000m"
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
Conclusion : La Vitesse des Données Première sur l'IA
En 2026, le feature engineering temps réel n'est plus optionnel pour les applications ML critiques. Avec Kafka + Flink + Feature Stores, les organisations peuvent enfin fermer la boucle entre données streaming et prédictions ML, atteignant des latences < 20ms bout-en-bout.
Points clés :
- ✅ Architecture Lambda : Batch (historique) + Streaming (temps réel)
- ✅ Flink SQL : Features agrégées multi-fenêtres
- ✅ Feature Store (Feast) : Serving unifié < 5ms
- ✅ Monitoring : Latence, freshness, data quality
ROI mesuré :
- Fraud detection : +40% de détection, -60% faux positifs
- Recommandations : +25% CTR grâce à features fraîches
- Trading : -80% latence décision (20ms vs 100ms)
Pour approfondir l'écosystème data streaming, consultez nos guides sur Feature Store production pour les bonnes pratiques, Kubernetes GPU ML pour scaler les workloads, et RAG en production pour combiner features temps réel et LLMs.
Feature Engineering Temps Réel + Flink + Feast = L'infrastructure ML moderne pour 2026.