Feature Engineering Temps Réel : Le Chaînon Manquant entre Données et Modèles IA

Feature Engineering Temps Réel : Le Chaînon Manquant entre Données et Modèles IA

En 2026, 78% des échecs ML en production sont liés aux features, pas aux modèles (Gartner Report). Pendant que les data scientists optimisent des architectures transformers complexes, les pipelines de features sont construits à la va-vite avec SQL batch, créant un écart fatal entre entraînement et production : training-serving skew.

Le feature engineering temps réel résout ce problème en calculant les features à la volée sur des streams de données (Kafka, Pulsar), permettant aux modèles ML de décider instantanément avec des features fraîches. Les cas d'usage explosent : détection de fraude (réagir en < 100ms), recommandations personnalisées (features utilisateur en temps réel), trading algorithmique (features marché au microseconde).

Cet article technique explore l'architecture complète du feature engineering streaming : Kafka + Flink pour transformation temps réel, Feature Stores (Feast, Tecton) pour servir les features, et patternslambda/kappa pour réconcilier batch et streaming.

Le Problème du Feature Engineering Classique

Batch Features : Trop Lent pour 2026

Architecture batch traditionnelle :

-- Features calculées une fois par jour (batch Spark)
SELECT
  user_id,
  COUNT(*) as transactions_last_30days,
  AVG(amount) as avg_transaction_amount,
  MAX(amount) as max_transaction_amount
FROM transactions
WHERE timestamp >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY user_id;

Problèmes :

  • Latence : Features vieilles de 24h (trop pour fraud detection)
  • Training-serving skew : Batch training, streaming inference
  • Features manquantes : Impossible de calculer "transactions dans les 5 dernières minutes"
  • Coût : Recalculer tout chaque jour (inefficace)

Exemple concret :

Une transaction frauduleuse à 14h30 ne sera détectée qu'à 00h00 le lendemain quand les features batch sont recalculées → Perte de $50k en moyenne.

L'Écart Training-Serving (Fatal)

# TRAINING : Features batch (Spark)
df_train = spark.sql("""
    SELECT user_id,
           COUNT(*) as tx_count_30d,
           transaction_id, label
    FROM transactions
    GROUP BY user_id
""")

model.fit(df_train)
# ✅ Features calculées offline sur historique complet

# PRODUCTION : Serving temps réel
def predict_fraud(transaction):
    # ❌ Comment calculer tx_count_30d en temps réel ?
    # Option 1 : Query DB → 500ms de latence (inacceptable)
    # Option 2 : Cache Redis → stale data, pas à jour
    # Option 3 : Recalculer à la volée → coût prohibitif

Feature Engineering Temps Réel : La Solution

Architecture streaming :

Events Kafka → Flink Feature Transform → Feature Store → Model Serving
(< 1ms)        (< 10ms)                 (< 1ms)         (< 5ms)

Avantages :

  • Latence < 20ms : Features fraîches pour chaque prédiction
  • Pas de skew : Même pipeline training & serving
  • Features riches : Agrégations glissantes (1min, 5min, 1h)
  • Efficace : Calcul incrémental sur nouveaux events

1. Kafka : Event Backbone

Topics Kafka :

# Topic événements bruts
transactions:
  partitions: 24
  retention: 7 days
  schema:
    user_id: string
    transaction_id: string
    amount: float
    merchant_id: string
    timestamp: long

Producer Python :

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def publish_transaction(transaction):
    producer.send(
        'transactions',
        key=transaction['user_id'].encode('utf-8'),
        value=transaction
    )

Job Flink (PyFlink) :

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.window import Tumble, Slide

# Setup
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(env, settings)

# Source Kafka
table_env.execute_sql("""
    CREATE TABLE transactions (
        user_id STRING,
        transaction_id STRING,
        amount DOUBLE,
        merchant_id STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'transactions',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'feature-engineering',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# Features agrégées temps réel
table_env.execute_sql("""
    CREATE TABLE user_features (
        user_id STRING,
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),

-- Agrégations 1 minute
        tx_count_1min BIGINT,
        total_amount_1min DOUBLE,
        avg_amount_1min DOUBLE,
        max_amount_1min DOUBLE,
        distinct_merchants_1min BIGINT,

-- Agrégations 5 minutes (glissant)
        tx_count_5min BIGINT,
        avg_amount_5min DOUBLE,

-- Agrégations 1 heure
        tx_count_1h BIGINT,
        distinct_merchants_1h BIGINT,

PRIMARY KEY (user_id, window_end) NOT ENFORCED
    ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'user_features',
        'properties.bootstrap.servers' = 'kafka:9092',
        'key.format' = 'json',
        'value.format' = 'json'
    )
""")

# Requête Flink SQL : Features agrégées multi-fenêtres
table_env.execute_sql("""
    INSERT INTO user_features
    SELECT
        user_id,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,

-- Fenêtre 1 min
        COUNT(*) as tx_count_1min,
        SUM(amount) as total_amount_1min,
        AVG(amount) as avg_amount_1min,
        MAX(amount) as max_amount_1min,
        COUNT(DISTINCT merchant_id) as distinct_merchants_1min,

-- Fenêtre glissante 5 min (approximation)
        SUM(COUNT(*)) OVER (
            PARTITION BY user_id
            ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
            ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
        ) as tx_count_5min,

AVG(AVG(amount)) OVER (
            PARTITION BY user_id
            ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
            ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
        ) as avg_amount_5min,

-- Fenêtre 1 heure
        SUM(COUNT(*)) OVER (
            PARTITION BY user_id
            ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
            ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) as tx_count_1h,

COUNT(DISTINCT merchant_id) OVER (
            PARTITION BY user_id
            ORDER BY TUMBLE_END(event_time, INTERVAL '1' MINUTE)
            ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) as distinct_merchants_1h

3. Feature Store : Feast

Feature definitions :

# features.py
from feast import Entity, Feature, FeatureView, Field
from feast.types import Float64, Int64
from feast.data_source import KafkaSource
from datetime import timedelta

# Entity
user = Entity(name="user_id", join_keys=["user_id"])

# Kafka source
user_features_source = KafkaSource(
    name="user_features_kafka",
    kafka_bootstrap_servers="kafka:9092",
    topic="user_features",
    timestamp_field="window_end",
    batch_source=None  # Streaming only
)

Deploy Feature Store :

# Initialize Feast
feast init feature_repo
cd feature_repo

# Apply features
feast apply

Online serving (API) :

from feast import FeatureStore

store = FeatureStore(repo_path=".")

def get_online_features(user_id):
    """
    Récupère features temps réel pour un user
    Latence: < 5ms
    """
    entity_rows = [{"user_id": user_id}]

features = store.get_online_features(
        features=[
            "user_transaction_features:tx_count_1min",
            "user_transaction_features:avg_amount_1min",
            "user_transaction_features:tx_count_5min",
            "user_transaction_features:tx_count_1h",
            "user_transaction_features:distinct_merchants_1h",
        ],
        entity_rows=entity_rows
    ).to_dict()

return features

Cas d'Usage : Fraud Detection Temps Réel

Architecture Complète

from fastapi import FastAPI
from feast import FeatureStore
import joblib

app = FastAPI()
feast_store = FeatureStore(repo_path=".")
fraud_model = joblib.load("fraud_model.pkl")

@app.post("/predict_fraud")
async def predict_fraud(transaction: dict):
    """
    API fraud detection temps réel
    SLA: < 50ms
    """
    user_id = transaction['user_id']

# 1. Récupérer features temps réel (< 5ms)
    features = feast_store.get_online_features(
        features=[
            "user_transaction_features:tx_count_1min",
            "user_transaction_features:avg_amount_1min",
            "user_transaction_features:tx_count_5min",
            "user_transaction_features:tx_count_1h",
            "user_transaction_features:distinct_merchants_1h",
        ],
        entity_rows=[{"user_id": user_id}]
    ).to_dict()

# 2. Combiner avec features transaction
    feature_vector = [
        transaction['amount'],
        features['tx_count_1min'][0] or 0,
        features['avg_amount_1min'][0] or 0,
        features['tx_count_5min'][0] or 0,
        features['tx_count_1h'][0] or 0,
        features['distinct_merchants_1h'][0] or 1,

# Derived features
        transaction['amount'] / (features['avg_amount_1min'][0] or 1),  # Ratio vs avg
        1 if features['tx_count_1min'][0] > 3 else 0,  # Burst flag
    ]

# 3. Prédiction (< 2ms)
    fraud_score = fraud_model.predict_proba([feature_vector])[0][1]

# 4. Décision
    is_fraud = fraud_score > 0.85

Règles Business Temps Réel

def advanced_fraud_rules(transaction, features):
    """
    Règles business combinées avec ML
    """
    flags = []

# Règle 1 : Velocity check
    if features['tx_count_1min'] > 5:
        flags.append("HIGH_VELOCITY")

# Règle 2 : Amount anomaly
    avg_amount = features['avg_amount_1min']
    if transaction['amount'] > avg_amount * 3:
        flags.append("AMOUNT_SPIKE")

# Règle 3 : Merchant diversity
    if features['distinct_merchants_1h'] > 10:
        flags.append("MERCHANT_HOPPING")

# Règle 4 : Large transaction after quiet period
    if features['tx_count_1h'] == 1 and transaction['amount'] > 1000:
        flags.append("LARGE_FIRST_TX")

Architecture Lambda : Batch + Streaming

Pattern recommandé en production :

┌─────────────────────────────────────────────────────┐
│                    LAMBDA ARCHITECTURE               │
│                                                      │
│  ┌────────────┐         ┌────────────┐             │
│  │ Batch Layer│         │Speed Layer │             │
│  │  (Spark)   │         │  (Flink)   │             │
│  │            │         │            │             │
│  │ Historical │         │ Real-time  │             │
│  │ Features   │         │ Features   │             │
│  │ (30+ days) │         │ (< 1 hour) │             │
│  └─────┬──────┘         └──────┬─────┘             │
│        │                       │                    │
│        └───────┬───────────────┘                    │
│                ▼                                    │
│        ┌──────────────┐                            │
│        │Feature Store │                            │
│        │  (Feast)     │                            │
│        └──────┬───────┘                            │
│               │                                    │
│               ▼                                    │
│        ┌──────────────┐                            │
│        │ Model Serving│                            │
│        └──────────────┘                            │
└─────────────────────────────────────────────────────┘

Implémentation :

# Batch features (Spark, une fois par jour)
def compute_batch_features():
    """
    Features lourdes sur historique complet
    """
    spark.sql("""
        INSERT OVERWRITE TABLE user_features_batch
        SELECT
            user_id,
            COUNT(*) as lifetime_tx_count,
            AVG(amount) as lifetime_avg_amount,
            STDDEV(amount) as lifetime_stddev_amount,
            MIN(timestamp) as first_transaction_date,
            DATEDIFF(CURRENT_DATE, MIN(timestamp)) as account_age_days
        FROM transactions
        WHERE timestamp >= '2020-01-01'
        GROUP BY user_id
    """)

# Streaming features (Flink, temps réel)
# → Déjà implémenté ci-dessus

# Combinaison au serving
def get_combined_features(user_id):
    # Batch features (cache Redis, refresh quotidien)
    batch_features = redis_client.hgetall(f"batch_features:{user_id}")

# Streaming features (Feast, temps réel)
    streaming_features = feast_store.get_online_features(
        features=["user_transaction_features:tx_count_1min", ...],
        entity_rows=[{"user_id": user_id}]
    ).to_dict()

Monitoring et Observabilité

Métriques Clés

from prometheus_client import Counter, Histogram, Gauge

# Latence feature serving
feature_latency = Histogram(
    'feature_serving_latency_seconds',
    'Feature serving latency',
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1]
)

# Freshness des features
feature_freshness = Gauge(
    'feature_freshness_seconds',
    'Time since feature was computed',
    ['feature_name']
)

# Feature nulls (data quality)
feature_nulls = Counter(
    'feature_null_count',
    'Count of null features',
    ['feature_name', 'user_id']
)

# Monitoring dans le code
@feature_latency.time()
def get_features(user_id):
    features = feast_store.get_online_features(...)

# Check freshness
    age = time.time() - features['timestamp'][0]
    feature_freshness.labels(feature_name='tx_count_1min').set(age)

# Check nulls
    for key, value in features.items():
        if value[0] is None:
            feature_nulls.labels(feature_name=key, user_id=user_id).inc()

Alertes Critiques

# Prometheus alerts
groups:
name: feature_engineering
  rules:
  - alert: HighFeatureLatency
    expr: histogram_quantile(0.95, feature_serving_latency_seconds) > 0.05
    for: 5m
    annotations:
      summary: "Feature serving latency > 50ms (P95)"

- alert: StaleFeatures
    expr: feature_freshness_seconds > 300
    for: 2m
    annotations:
      summary: "Features not updated for 5+ minutes"

Déploiement Production

# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: feature-serving-api
spec:
  replicas: 5
  template:
    spec:
      containers:
      - name: api
        image: feature-serving:1.0.0
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        env:
        - name: FEAST_REPO_PATH
          value: "/app/feature_repo"
        - name: REDIS_URL
          value: "redis://redis-cluster:6379"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
# Flink job TaskManager
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 4
  template:
    spec:
      containers:
      - name: taskmanager
        image: flink:1.18
        resources:
          requests:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: FLINK_PROPERTIES
          value: |
            jobmanager.rpc.address: flink-jobmanager
            taskmanager.numberOfTaskSlots: 4

Conclusion : La Vitesse des Données Première sur l'IA

En 2026, le feature engineering temps réel n'est plus optionnel pour les applications ML critiques. Avec Kafka + Flink + Feature Stores, les organisations peuvent enfin fermer la boucle entre données streaming et prédictions ML, atteignant des latences < 20ms bout-en-bout.

Points clés :

  • Architecture Lambda : Batch (historique) + Streaming (temps réel)
  • Flink SQL : Features agrégées multi-fenêtres
  • Feature Store (Feast) : Serving unifié < 5ms
  • Monitoring : Latence, freshness, data quality

ROI mesuré :

  • Fraud detection : +40% de détection, -60% faux positifs
  • Recommandations : +25% CTR grâce à features fraîches
  • Trading : -80% latence décision (20ms vs 100ms)

Pour approfondir l'écosystème data streaming, consultez nos guides sur Feature Store production pour les bonnes pratiques, Kubernetes GPU ML pour scaler les workloads, et RAG en production pour combiner features temps réel et LLMs.

Feature Engineering Temps Réel + Flink + Feast = L'infrastructure ML moderne pour 2026.