Test MCP_Perso

This commit is contained in:
MaksTinyWorkshop
2026-03-31 09:24:06 +02:00
parent 6d56061554
commit 80d9d0a48d
32 changed files with 1593 additions and 1 deletions

View File

@@ -147,6 +147,19 @@ Fichiers principaux :
- `80_bmad/articulation_avec_lead_tech.md`
- `80_bmad/process_llm_et_parallelisation.md`
- `80_bmad/integration_mcp_sidecar.md`
---
## MCP (expérimental)
Squelette de serveur MCP orienté support BMAD (guidance, gates, capitalisation).
Dossier :
```
mcp/leadtech_bmad_mcp/
```
---

View File

@@ -0,0 +1,66 @@
# Story BMAD — Template avec gates Lead_tech MCP
## Metadata
- Story: <id>
- Status: ready-for-dev
- Parallel-safe: false
- Depends-on: ~
- Can-run-with: ~
- Domain: backend|frontend|ux|n8n|product|workflow
## Contexte
- Objectif:
- ACs:
- Contraintes:
## Plan d'implementation
1. ...
2. ...
3. ...
## Patterns a appliquer
- (alimente par `get_guidance`)
## Risques a eviter
- (alimente par `get_guidance`)
## Gates de validation
### Leadtech MCP Gates
- Tool: `get_guidance`
- Timestamp:
- must_do:
- red_flags:
- blocking_issues:
- Decision:
- Tool: `validate_plan`
- Timestamp:
- must_do:
- red_flags:
- blocking_issues:
- Decision:
- Tool: `validate_patch`
- Timestamp:
- must_do:
- red_flags:
- blocking_issues:
- Decision:
## Dev Agent Record
- Changements effectues:
- Tests executes:
- Fichiers modifies:
## Capitalisation (optionnel)
- `propose_capitalization(..., dry_run=true)`
- Decision humaine:

View File

@@ -173,3 +173,15 @@ $LEADTECH/95_a_capitaliser.md
↓ (validation humaine)
Lead_tech (fichier approprié)
```
---
# Extension optionnelle : MCP sidecar
Pour rendre la base interactive dans BMAD sans changer l'orchestration BMAD,
un serveur MCP sidecar peut etre ajoute.
Reference d'implementation dans ce repo :
- `80_bmad/integration_mcp_sidecar.md`
- `mcp/leadtech_bmad_mcp/`

View File

@@ -50,6 +50,7 @@ You must fully embody this agent's persona and follow all activation instruction
<r> Stay in character until exit selected</r>
<r> Display Menu items as the item dictates and in the order given.</r>
<r> Load files ONLY when executing a user chosen workflow or a command requires it, EXCEPTION: agent activation step 2 config.yaml</r>
<r> If leadtech-bmad MCP server is available: call validate_plan before implementation and validate_patch before marking story for review. Treat blocking_issues as hard blockers — do NOT proceed past them without explicit user override.</r>
</rules>
</activation> <persona>
<role>Senior Software Engineer</role>

View File

@@ -49,6 +49,7 @@ You must fully embody this agent's persona and follow all activation instruction
<r> Stay in character until exit selected</r>
<r> Display Menu items as the item dictates and in the order given.</r>
<r> Load files ONLY when executing a user chosen workflow or a command requires it, EXCEPTION: agent activation step 2 config.yaml</r>
<r> If leadtech-bmad MCP server is available: during create-story workflow, call get_guidance to enrich story sections "Patterns a appliquer", "Risques a eviter", and "Leadtech MCP Gates". If blocking_issues is non-empty, do NOT set story to ready-for-dev without explicit user acknowledgement.</r>
</rules>
</activation> <persona>
<role>Technical Scrum Master + Story Preparation Specialist</role>

View File

@@ -274,6 +274,25 @@
<template-output
file="{default_output_file}">story_requirements</template-output>
<!-- Lead_tech MCP guidance: enrich story with validated patterns and risks -->
<check if="leadtech-bmad MCP server is available">
<action>Extract domain from story (backend|frontend|ux|n8n|product|workflow)</action>
<action>Call MCP tool: get_guidance(domain={{domain}}, task_type="analysis", story_text={{story_text}})</action>
<!-- Lecture des docs complets pour les top matches -->
<action>For each URI listed in the "Lire le contenu complet" should_do of get_guidance response:
- If URI starts with "leadtech://knowledge/": call MCP resource leadtech://knowledge/{domain}/{bucket}/{slug}
- If URI starts with "leadtech://global/": call MCP resource leadtech://global/{label}
Read the full content and use it to enrich the story dev notes and guardrails below.
</action>
<action>Write result into story section "Patterns a appliquer": must_do items from get_guidance response, including excerpts</action>
<action>Write result into story section "Risques a eviter": red_flags items from get_guidance response, including excerpts</action>
<action>If global docs matched (should_do contains "Consulter doc global"): add a "Contexte Lead_tech global" section in Dev Notes with the relevant excerpts</action>
<action>Write result into story section "Leadtech MCP Gates": { tool: "get_guidance", timestamp: now, must_do, red_flags, blocking_issues, decision: "voir implementation" }</action>
<critical>Si blocking_issues est non vide, signaler au SM avant de passer ready-for-dev</critical>
</check>
<!-- Developer context section - MOST IMPORTANT PART -->
<template-output file="{default_output_file}">
developer_context_section</template-output> **DEV AGENT GUARDRAILS:** <template-output file="{default_output_file}">

View File

@@ -227,6 +227,24 @@
<!-- Story parallelization check -->
<action>Before implementation, read `Parallel-safe`, `Depends-on`, and `Can-run-with` from the story file.</action>
<action>If metadata conflicts with the actual code state or branch reality, stop and reconcile the story before coding.</action>
<!-- Lead_tech MCP gate: validate plan before implementation -->
<check if="leadtech-bmad MCP server is available">
<action>Extract domain from story file (backend|frontend|ux|n8n|product|workflow)</action>
<action>Build plan_text: concatenate story tasks/subtasks and Dev Notes sections</action>
<action>Call MCP tool: validate_plan(domain={{domain}}, plan_text={{plan_text}}, agent_role="builder", strict=true)</action>
<action>Write gate result into story section "Leadtech MCP Gates": { tool: "validate_plan", timestamp: now, must_do, red_flags, blocking_issues }</action>
<critical>Si blocking_issues est non vide: HALT — corriger le plan avant de commencer l'implémentation</critical>
<!-- Lecture des docs référencés dans must_do/red_flags si URIs MCP présentes -->
<action>Scan must_do and red_flags for URIs of the form "leadtech://knowledge/..." or "leadtech://global/...".
For each URI found: read the full document via the corresponding MCP resource.
Use the content to reinforce implementation constraints — treat it as authoritative Lead_tech guidance.
</action>
<action>Appliquer chaque must_do du résultat comme contrainte d'implémentation</action>
</check>
<step n="5" goal="Implement task following red-green-refactor cycle">
<critical>FOLLOW THE STORY FILE TASKS/SUBTASKS SEQUENCE EXACTLY AS WRITTEN - NO DEVIATION</critical>
@@ -330,6 +348,17 @@
<action>Verify ALL tasks and subtasks are marked [x] (re-scan the story document now)</action>
<action>Run the full regression suite (do not skip)</action>
<action>Confirm File List includes every changed file</action>
<!-- Lead_tech MCP gate: validate patch before marking for review -->
<check if="leadtech-bmad MCP server is available">
<action>Produce diff_text: concatenate git diff or list of changes made during implementation</action>
<action>Extract changed_files list from story File List section</action>
<action>Call MCP tool: validate_patch(domain={{domain}}, diff_text={{diff_text}}, changed_files={{changed_files}}, strict=true)</action>
<action>Write gate result into story section "Leadtech MCP Gates": { tool: "validate_patch", timestamp: now, must_do, red_flags, blocking_issues }</action>
<critical>Si blocking_issues est non vide: HALT — corriger avant de passer en review</critical>
<action>Si must_do non vide et non déjà couverts: traiter comme tâches complémentaires avant review</action>
</check>
<action>Execute enhanced definition-of-done validation</action>
<action>Update the story Status to: "review"</action>

View File

@@ -0,0 +1,57 @@
# BMAD — Integration MCP Sidecar (Lead_tech)
Ce document decrit le cablage du serveur MCP `leadtech-bmad-mcp` dans BMAD.
## Positionnement
- BMAD reste le systeme d'orchestration.
- Le MCP Lead_tech est un **appui**: guidance, controles qualite, capitalisation.
- Le MCP ne pilote ni branche, ni statut, ni execution des stories.
## Points d'injection
1. Entrée story (Analyst)
- Appel: `get_guidance(domain, task_type="analysis", story_text=...)`
- Sortie injectee dans la story:
- `Patterns a appliquer`
- `Risques a eviter`
- `Gates de validation`
2. Pre-implementation (Builder)
- Appel: `validate_plan(domain, plan_text, agent_role="builder")`
- Regle: pas d'implementation tant que `blocking_issues` n'est pas vide.
3. Post-patch (Builder)
- Appel: `validate_patch(domain, diff_text, changed_files)`
- Resultat ajoute au Dev Agent Record.
4. Review (Reviewer)
- Appel: `emit_checklist(agent_role="reviewer", domain, story_text)`
- Option: relancer `validate_patch` sur le diff final de PR.
5. Cloture et apprentissage (Curator)
- Appel: `propose_capitalization(...)` en `dry_run=true` par defaut.
- Revue periodique: `triage_capitalization()`.
## Mode de rollout recommande
- Phase 1: advisory only (aucun blocage automatique)
- Phase 2: blocage sur `blocking_issues` uniquement
- Phase 3: rules strictes sur domaines critiques (backend auth/contracts/sessions)
## Traceabilite
Chaque story BMAD doit contenir une section `Leadtech MCP Gates` avec:
- timestamp
- tool appele
- resume des `must_do`, `red_flags`, `blocking_issues`
- decision humaine associee
## Contraintes de securite
- `LEADTECH_MCP_ALLOW_WRITE=0` en environnement normal.
- Ecriture activee ponctuellement uniquement pour:
- `propose_capitalization(dry_run=false)`
- `route_to_project_memory(dry_run=false)`
- Jamais d'ecriture directe dans `knowledge/*` par tool MCP.

View File

@@ -104,3 +104,67 @@ Ce mécanisme permet :
- de garder `Lead_tech` cohérent et fiable.
---
## 2026-03-30 — app-alexandrie
FILE_UPDATE_PROPOSAL
Fichier cible : knowledge/backend/risques/nestjs.md
Pourquoi :
Guard NestJS avec statuts multiples (PENDING/BLOCKED) : le check READ_METHODS doit précéder les checks de statut. Sans ça, les sessions BLOCKED ne peuvent pas accéder aux endpoints GET (ex: /auth/devices), enfermant l'utilisateur sans possibilité de révocation.
Proposition :
### Guard multi-statut : READ_METHODS avant statut, whitelist pour les endpoints de sortie de blocage
**Risque** : un guard qui vérifie le statut BLOCKED avant de vérifier la méthode en lecture bloque les GET, dont les endpoints permettant à l'utilisateur de sortir du blocage (lister ses appareils, révoquer).
**Règle** :
1. Toujours vérifier `READ_METHODS.has(request.method)` EN PREMIER, avant tout check de statut.
2. Définir une whitelist explicite des writes autorisés même en état bloqué (ex: révocation). Sans elle, le circuit de sortie est fermé.
```typescript
// ❌ DANGEREUX — BLOCKED bloque aussi les GET
if (user.sessionStatus === 'BLOCKED') throw new HttpException(...);
if (READ_METHODS.has(request.method)) return true; // jamais atteint pour BLOCKED
// ✅ CORRECT
if (READ_METHODS.has(request.method)) return true;
if (isDeviceManagementPath(request.path)) return true; // whitelist writes critiques
if (user.sessionStatus === 'BLOCKED') throw new HttpException(...);
```
FILE_UPDATE_PROPOSAL
Fichier cible : knowledge/backend/patterns/nestjs.md
Pourquoi :
Pattern fusion DB : quand un service de réconciliation d'état est appelé à chaque requête authentifiée, faire 2 prisma.update séparés (statut + lastSeenAt) double le coût DB par requête.
Proposition :
### Fusionner lastSeenAt dans l'update de réconciliation — évite N requêtes DB par request
Passer `now` en paramètre et inclure `lastSeenAt` dans le même update conditionnel.
```typescript
// ❌ 2 requêtes par requête authentifiée
private async reconcileSessionStatus(session) {
if (statusChanged) await prisma.session.update({ data: { status, graceEndsAt } });
}
await prisma.session.update({ data: { lastSeenAt: now } }); // 2ème update systématique
// ✅ 1 requête — lastSeenAt toujours inclus dans le même appel
private async reconcileSessionStatus(session, now = new Date()) {
await prisma.session.update({
data: { lastSeenAt: now, ...(statusChanged && { status, graceEndsAt }) }
});
}
```
2026-03-30 — base / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/backend/risques/contracts.md / Pourquoi: Une story marquée conforme « pas de process.env direct en service métier » dérivait encore vers une lecture brute de variable d'environnement dans un helper utilisé par le service; la validation Zod existait mais était contournée au runtime. / Proposition: Ajouter une vigilance explicite « Feature flags backend: ne pas lire process.env dans les services ni helpers métier; injecter ConfigService (ou config validée) et centraliser la lecture via une fonction pure recevant la config injectée. » + symptôme review « tests green mais dépendance implicite à l'état process global ».
2026-03-30 — base / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/frontend/risques/tests.md / Pourquoi: Un test déclaré comme “test composant” validait en réalité des helpers recopiés dans le fichier de test, créant un faux positif (la logique réellement exécutée en production pouvait diverger sans casser le test). / Proposition: Ajouter un anti-pattern « Helpers copiés localement dans les tests » avec règle: les tests doivent importer le module réellement utilisé par l'écran/composant (ou extraire un utilitaire partagé), jamais dupliquer la logique métier dans le test. Inclure checklist review: « aucune fonction de prod recopiée dans *.spec.ts ».
2026-03-30 — base / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/workflow/risques/story-tracking.md / Pourquoi: En code review, une story peut afficher des tâches [x] et une File List propre, mais le working tree contient de nombreux changements source non tracés pour cette story; sans note de périmètre explicite, le reviewer peut valider un faux scope ou rejeter à tort. / Proposition: Ajouter une règle « File List vs branch dirty »: si des changements hors story sont présents, documenter explicitement un bloc “hors périmètre de la story” dans le Dev Agent Record (liste courte + justification), et ne marquer done qu'après clarification de périmètre.
2026-03-30 — base / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/frontend/risques/tests.md / Pourquoi: Un test “écran” qui importe seulement un helper utilitaire ne valide pas le comportement réel de l'écran; il peut rester vert même si la logique de décision UI dans le screen diverge. / Proposition: Ajouter un anti-pattern « test écran indirect »: imposer que la logique de décision UI soit soit testée via rendu composant, soit extraite dans un module dédié importé par le screen et par le test (single source of truth), jamais testée uniquement via un helper adjacent non relié au flux écran.
2026-03-31 — app-template-resto / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/backend/risques/nextjs.md / Pourquoi: En code review story 4.1, une Server Action appelait fetch() vers elle-même (loopback HTTP) pour contourner la contrainte “server-only”. Cette approche ajoute une latence réseau, une fragilité sur APP_URL en Docker, une double authentification, et une surface SSRF. La vraie solution est d'appeler directement la fonction server-only depuis la Server Action — les deux sont côté serveur et le “server-only” ne bloque pas les imports inter-modules serveur. / Proposition: Ajouter un anti-pattern « Self-request / loopback HTTP depuis Server Action »: une Server Action peut importer et appeler directement des modules marqués server-only sans fetch interne. Règle: si tu te retrouves à reconstituer des cookies de session pour faire un fetch vers ta propre API depuis une Server Action, c'est un signal que l'appel doit être direct.
2026-03-31 — app-template-resto / FILE_UPDATE_PROPOSAL / Fichier cible: knowledge/backend/risques/nextjs.md / Pourquoi: L'écriture atomique (tmp + rename) pour les fichiers media échoue avec EXDEV si le tmpdir système (/tmp) est sur un device différent du volume de stockage (cas courant Docker bind-mount). / Proposition: Ajouter un pattern « écriture atomique intra-device »: créer le fichier temporaire dans le même répertoire que la destination finale (pas dans os.tmpdir()), ce qui garantit que rename() reste intra-device et ne lève jamais EXDEV. Inclure note: s'assurer que le répertoire cible est créé avant l'écriture du tmp.

View File

@@ -50,4 +50,5 @@
# - automatiser certains audits inter-projets
#
# Les lignes commençant par # sont ignorées.
app-alexandrie|NestJS + Expo (React Native) + Prisma + pnpm monorepo|mindleaf|Epic 6 en cours
app-alexandrie|NestJS + Expo (React Native) + Prisma + pnpm monorepo|mindleaf|Epic 7 en cours
app-template-resto|Next.js Prisma+PostgreSQL|perso|Epic 4 en cours

View File

@@ -0,0 +1 @@
python3

View File

@@ -0,0 +1 @@
/usr/bin/python3

View File

@@ -0,0 +1 @@
python3

View File

@@ -0,0 +1 @@
lib

View File

@@ -0,0 +1,5 @@
home = /usr/bin
include-system-site-packages = false
version = 3.11.2
executable = /usr/bin/python3.11
command = /usr/bin/python3 -m venv /srv/helpers/_Assistant_Lead_Tech/mcp/leadtech_bmad_mcp/.venv

View File

@@ -0,0 +1,53 @@
# leadtech-bmad-mcp
Serveur MCP **sidecar** pour brancher la base Lead_tech dans un workflow BMAD sans remplacer BMAD.
## Objectif
- BMAD garde l'orchestration (story, roles, statut, handoff).
- Ce serveur apporte des outils de guidance et de gate qualite.
- Ecriture controlee: uniquement `95_a_capitaliser.md` et memoire projet (optionnel, avec flag).
## Tools exposes
- `get_guidance(domain, task_type, story_text?, keywords?, max_items?)`
- `validate_plan(domain, plan_text, agent_role?, strict?)`
- `validate_patch(domain, diff_text, changed_files?, strict?)`
- `emit_checklist(agent_role, domain, story_text?)`
- `propose_capitalization(project_name, target_file, why, proposal, dry_run?)`
- `triage_capitalization(project_filter?, max_entries?)`
- `route_to_project_memory(project_name, section, content, dry_run?)`
## Resources exposees
- `leadtech://index`
- `leadtech://capitalisation/pending`
- `leadtech://projects/conf`
- `leadtech://knowledge/{domain}/{bucket}/{slug}`
## Installation locale
```bash
cd /srv/helpers/_Assistant_Lead_Tech/mcp/leadtech_bmad_mcp
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
## Lancement (stdio)
```bash
source .venv/bin/activate
export LEADTECH_ROOT=/srv/helpers/_Assistant_Lead_Tech
leadtech-bmad-mcp
```
## Variables d'environnement
- `LEADTECH_ROOT` (defaut: `/srv/helpers/_Assistant_Lead_Tech`)
- `LEADTECH_MCP_ALLOW_WRITE` (defaut: `0`)
- mettre `1` pour autoriser l'ecriture dans `95_a_capitaliser.md` et `CLAUDE.md` projet
## Mode de branchement BMAD
Voir `80_bmad/integration_mcp_sidecar.md` pour les points d'injection exacts dans le workflow.

View File

@@ -0,0 +1,11 @@
{
"mcpServers": {
"leadtech-bmad": {
"command": "/srv/helpers/_Assistant_Lead_Tech/mcp/leadtech_bmad_mcp/.venv/bin/leadtech-bmad-mcp",
"env": {
"LEADTECH_ROOT": "/srv/helpers/_Assistant_Lead_Tech",
"LEADTECH_MCP_ALLOW_WRITE": "0"
}
}
}
}

View File

@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "leadtech-bmad-mcp"
version = "0.1.0"
description = "Serveur MCP sidecar pour Lead_tech et workflow BMAD"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"mcp>=1.2.0"
]
[project.optional-dependencies]
dev = ["pytest>=7.0"]
[project.scripts]
leadtech-bmad-mcp = "leadtech_bmad_mcp.server:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -0,0 +1,4 @@
"""Lead_tech BMAD MCP server package."""
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@@ -0,0 +1,140 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
VALID_DOMAINS = {"backend", "frontend", "ux", "n8n", "product", "workflow"}
VALID_BUCKETS = {"patterns", "risques"}
@dataclass(frozen=True)
class LeadtechPaths:
root: Path
knowledge: Path
capitalisation: Path
projects_conf: Path
def get_paths() -> LeadtechPaths:
root = Path(os.getenv("LEADTECH_ROOT", "/srv/helpers/_Assistant_Lead_Tech")).resolve()
return LeadtechPaths(
root=root,
knowledge=root / "knowledge",
capitalisation=root / "95_a_capitaliser.md",
projects_conf=root / "_projects.conf",
)
def _safe_path(base: Path, *parts: str) -> Path:
path = (base / Path(*parts)).resolve()
if not str(path).startswith(str(base.resolve())):
raise ValueError("Chemin hors base autorisee")
return path
def list_domain_files(domain: str, bucket: str) -> list[Path]:
if domain not in VALID_DOMAINS:
raise ValueError(f"Domaine invalide: {domain}")
if bucket not in VALID_BUCKETS:
raise ValueError(f"Type invalide: {bucket}")
base = _safe_path(get_paths().knowledge, domain, bucket)
if not base.exists():
return []
return sorted(p for p in base.glob("*.md") if p.is_file())
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
EXCERPT_LENGTH = 400
def _extract_excerpt(content: str, tokens: list[str]) -> str:
"""Retourne un extrait centré sur la première occurrence d'un token, ou le début du fichier."""
low = content.lower()
best_pos = len(content)
for tok in tokens:
pos = low.find(tok)
if 0 <= pos < best_pos:
best_pos = pos
if best_pos == len(content):
best_pos = 0
start = max(0, best_pos - 80)
excerpt = content[start : start + EXCERPT_LENGTH].strip()
if start > 0:
excerpt = "" + excerpt
if start + EXCERPT_LENGTH < len(content):
excerpt = excerpt + ""
return excerpt
def search_knowledge(domain: str, query: str, bucket: str | None = None, max_items: int = 12) -> list[dict[str, str]]:
buckets = [bucket] if bucket else ["patterns", "risques"]
tokens = [t.strip().lower() for t in query.split() if t.strip()]
out: list[dict[str, str]] = []
for b in buckets:
for file_path in list_domain_files(domain, b):
content = read_text(file_path)
score = sum(content.lower().count(tok) for tok in tokens)
if score <= 0:
continue
out.append(
{
"path": str(file_path),
"bucket": b,
"title": file_path.stem,
"score": str(score),
"excerpt": _extract_excerpt(content, tokens),
}
)
out.sort(key=lambda x: int(x["score"]), reverse=True)
return out[:max_items]
def read_knowledge_doc(domain: str, bucket: str, slug: str) -> str:
file_path = _safe_path(get_paths().knowledge, domain, bucket, f"{slug}.md")
if not file_path.exists():
raise FileNotFoundError(f"Fichier introuvable: {file_path}")
return read_text(file_path)
# Fichiers globaux indexés (hors knowledge/) — ordre de priorité
_GLOBAL_DOCS: list[tuple[str, str]] = [
("40_decisions_et_archi.md", "architecture"),
("90_debug_et_postmortem.md", "debug"),
("10_conventions_redaction.md", "conventions"),
]
def search_global_docs(query: str, max_items: int = 4) -> list[dict[str, str]]:
"""Cherche dans les fichiers globaux Lead_tech (decisions, postmortems, conventions)."""
tokens = [t.strip().lower() for t in query.split() if t.strip()]
root = get_paths().root
out: list[dict[str, str]] = []
for filename, label in _GLOBAL_DOCS:
file_path = root / filename
if not file_path.exists():
continue
content = read_text(file_path)
score = sum(content.lower().count(tok) for tok in tokens)
if score <= 0:
continue
out.append(
{
"path": str(file_path),
"bucket": "global",
"title": label,
"filename": filename,
"score": str(score),
"excerpt": _extract_excerpt(content, tokens),
}
)
out.sort(key=lambda x: int(x["score"]), reverse=True)
return out[:max_items]

View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from typing import Any
CONFIDENCE_HIGH = "HIGH"
CONFIDENCE_MEDIUM = "MEDIUM"
CONFIDENCE_LOW = "LOW"
def empty_gate_output() -> dict[str, Any]:
return {
"must_do": [],
"should_do": [],
"red_flags": [],
"blocking_issues": [],
"confidence": CONFIDENCE_MEDIUM,
"references": [],
}
def add_reference(payload: dict[str, Any], path: str, reason: str) -> None:
payload["references"].append({"path": path, "reason": reason})

View File

@@ -0,0 +1,309 @@
from __future__ import annotations
import re
import os
import subprocess
from datetime import date
from pathlib import Path
from typing import Literal
from mcp.server.fastmcp import FastMCP
from .knowledge import get_paths, read_knowledge_doc, read_text, search_knowledge, search_global_docs
from .schemas import CONFIDENCE_HIGH, CONFIDENCE_LOW, CONFIDENCE_MEDIUM, add_reference, empty_gate_output
from .triage import parse_capitalisation_entries, novelty_level, scope_level
Domain = Literal["backend", "frontend", "ux", "n8n", "product", "workflow"]
TaskType = Literal["analysis", "implementation", "review", "debug", "architecture"]
AgentRole = Literal["analyst", "builder", "reviewer", "curator"]
mcp = FastMCP(name="leadtech-bmad-mcp")
GATES = [
"Contracts-First / Zod-Infer / No-DTO",
"Format erreur API: { error: { code, message, requestId } }",
"Sessions: expiresAt obligatoire et filtre en query",
"Navigation reactive useEffect",
"NestJS guards: AuthGuard en premier",
]
# Patterns compilés une fois — couvrent camelCase, snake_case, kebab-case et variantes courantes
_RE_CONTRACT = re.compile(r"zod|z\.object|contracts?[-_]first|shared[-_]contract", re.IGNORECASE)
_RE_REQUEST_ID = re.compile(r"request[_-]?id", re.IGNORECASE)
_RE_EXPIRES_AT = re.compile(r"expires[_-]?at", re.IGNORECASE)
_RE_AUTH_GUARD = re.compile(r"auth[_\s-]?guard|jwtauthguard", re.IGNORECASE)
_RE_PARALLEL = re.compile(r"parallel", re.IGNORECASE)
_RE_DEPENDS_ON = re.compile(r"depends[-_]on|can[-_]run[-_]with", re.IGNORECASE)
_RE_SESSION = re.compile(r"sessions?", re.IGNORECASE)
_RE_ERROR = re.compile(r"\berrors?\b|\berreurs?\b|\bexceptions?\b", re.IGNORECASE)
_RE_GUARD = re.compile(r"guard", re.IGNORECASE)
def _base_output() -> dict:
payload = empty_gate_output()
payload["gates"] = GATES
return payload
@mcp.resource("leadtech://index")
def resource_index() -> str:
return read_text(get_paths().root / "00_INDEX.md")
@mcp.resource("leadtech://capitalisation/pending")
def resource_capitalisation() -> str:
return read_text(get_paths().capitalisation)
@mcp.resource("leadtech://projects/conf")
def resource_projects_conf() -> str:
return read_text(get_paths().projects_conf)
@mcp.resource("leadtech://knowledge/{domain}/{bucket}/{slug}")
def resource_knowledge_doc(domain: str, bucket: str, slug: str) -> str:
return read_knowledge_doc(domain=domain, bucket=bucket, slug=slug)
@mcp.resource("leadtech://global/architecture")
def resource_architecture() -> str:
return read_text(get_paths().root / "40_decisions_et_archi.md")
@mcp.resource("leadtech://global/debug")
def resource_debug() -> str:
return read_text(get_paths().root / "90_debug_et_postmortem.md")
@mcp.resource("leadtech://global/conventions")
def resource_conventions() -> str:
return read_text(get_paths().root / "10_conventions_redaction.md")
@mcp.tool(description="Retourne patterns + risques utiles pour une story BMAD")
def get_guidance(domain: Domain, task_type: TaskType, story_text: str = "", keywords: list[str] | None = None, max_items: int = 12) -> dict:
out = _base_output()
query = " ".join(keywords or []) or story_text or task_type
matches = search_knowledge(domain=domain, query=query, max_items=max_items)
for item in matches:
slug = Path(item["path"]).stem
read_uri = f"leadtech://knowledge/{domain}/{item['bucket']}/{slug}"
label = f"{item['title']}{item['excerpt']} [lire complet: {read_uri}]"
if item["bucket"] == "patterns":
out["must_do"].append(f"Appliquer: {label}")
else:
out["red_flags"].append(f"Surveiller: {label}")
add_reference(out, item["path"], f"Match {item['bucket']} score={item['score']}")
# Fichiers globaux (decisions, postmortems, conventions)
global_matches = search_global_docs(query=query, max_items=3)
for item in global_matches:
read_uri = f"leadtech://global/{item['title']}"
label = f"{item['title']}{item['excerpt']} [lire complet: {read_uri}]"
out["should_do"].append(f"Consulter doc global: {label}")
add_reference(out, item["path"], f"Match global score={item['score']}")
out["should_do"].append(f"Adapter la guidance au role et aux ACs de la story ({task_type}).")
all_top = (matches + global_matches)[:3]
if all_top:
uris = []
for m in all_top:
if m["bucket"] == "global":
uris.append(f"leadtech://global/{m['title']}")
else:
uris.append(f"leadtech://knowledge/{domain}/{m['bucket']}/{Path(m['path']).stem}")
out["should_do"].append(
"Lire le contenu complet des docs prioritaires via les resources MCP: " + ", ".join(uris)
)
out["confidence"] = CONFIDENCE_HIGH if (matches or global_matches) else CONFIDENCE_LOW
return out
@mcp.tool(description="Valide un plan BMAD contre patterns et pieges")
def validate_plan(domain: Domain, plan_text: str, agent_role: AgentRole = "builder", strict: bool = True) -> dict:
out = _base_output()
if domain == "backend" and not _RE_CONTRACT.search(plan_text):
out["must_do"].append("Ajouter explicitement la strategie contracts-first / Zod.")
if strict:
out["blocking_issues"].append("Plan backend sans reference aux contrats partages.")
if domain == "backend" and not _RE_REQUEST_ID.search(plan_text) and _RE_ERROR.search(plan_text):
out["should_do"].append("Ajouter la normalisation d'erreur API avec requestId.")
if not re.search(r"test|spec", plan_text, re.IGNORECASE):
out["must_do"].append("Ajouter une strategie de test (unit/integration/e2e).")
if _RE_PARALLEL.search(plan_text) and not _RE_DEPENDS_ON.search(plan_text):
out["red_flags"].append("Parallel mentionne sans clarifier Depends-on/Can-run-with.")
add_reference(out, str(get_paths().root / "80_bmad/process_llm_et_parallelisation.md"), "Regles de synchronisation BMAD")
out["confidence"] = CONFIDENCE_HIGH if not out["blocking_issues"] else CONFIDENCE_MEDIUM
return out
@mcp.tool(description="Valide un patch/diff contre les gates Lead_tech")
def validate_patch(domain: Domain, diff_text: str, changed_files: list[str] | None = None, strict: bool = True) -> dict:
out = _base_output()
if domain == "backend":
if not _RE_EXPIRES_AT.search(diff_text) and _RE_SESSION.search(diff_text):
out["blocking_issues"].append("Session modifiee sans expiresAt visible dans le diff.")
if not _RE_REQUEST_ID.search(diff_text) and _RE_ERROR.search(diff_text):
out["must_do"].append("Verifier le format erreur API standard avec requestId.")
if _RE_GUARD.search(diff_text) and not _RE_AUTH_GUARD.search(diff_text):
out["red_flags"].append("Usage de guard sans trace explicite d'ordre AuthGuard en premier.")
if changed_files and all("_bmad-output/" in f for f in changed_files):
out["blocking_issues"].append("Patch sans fichier source: seulement des artefacts BMAD.")
if strict and not re.search(r"test|spec|describe\s*\(|it\s*\(|expect\s*\(", diff_text, re.IGNORECASE):
out["should_do"].append("Aucun test visible dans le diff: verifier couverture manuelle.")
add_reference(out, str(get_paths().root / "knowledge/workflow/risques/story-tracking.md"), "Anti-pattern story done sans code")
out["confidence"] = CONFIDENCE_HIGH if not out["blocking_issues"] else CONFIDENCE_MEDIUM
return out
@mcp.tool(description="Retourne une checklist actionnable par role BMAD")
def emit_checklist(agent_role: AgentRole, domain: Domain, story_text: str = "") -> dict:
out = _base_output()
out["must_do"].append("Lire les README patterns+risques du domaine avant action.")
out["must_do"].append("Verifier coherence story.md <-> sprint-status avant cloture.")
if agent_role == "analyst":
out["must_do"].append("Rendre explicites Depends-on et Can-run-with si parallel-safe.")
elif agent_role == "builder":
out["must_do"].append("Executer un gate validate_plan avant impl et validate_patch apres diff.")
elif agent_role == "reviewer":
out["must_do"].append("Refuser done si aucun fichier source dans File List.")
else:
out["must_do"].append("Capitaliser uniquement via 95_a_capitaliser.md, jamais direct knowledge/.")
if story_text and "parallel-safe" not in story_text.lower():
out["should_do"].append("Ajouter le champ Parallel-safe dans la story.")
out["confidence"] = CONFIDENCE_HIGH
return out
@mcp.tool(description="Ajoute une proposition FILE_UPDATE_PROPOSAL dans 95_a_capitaliser.md")
def propose_capitalization(project_name: str, target_file: str, why: str, proposal: str, dry_run: bool = True) -> dict:
today = date.today().isoformat()
block = (
f"\n\n{today}{project_name}\n\n"
f"FILE_UPDATE_PROPOSAL\n"
f"Fichier cible : {target_file}\n\n"
f"Pourquoi :\n{why.strip()}\n\n"
f"Proposition :\n{proposal.strip()}\n"
)
out = {"dry_run": dry_run, "target": str(get_paths().capitalisation), "proposal_block": block.strip("\n")}
if not dry_run:
if not bool(int(os.getenv("LEADTECH_MCP_ALLOW_WRITE", "0"))):
return {"error": "Ecriture desactivee. Exporter LEADTECH_MCP_ALLOW_WRITE=1"}
with get_paths().capitalisation.open("a", encoding="utf-8") as f:
f.write(block)
out["written"] = True
return out
@mcp.tool(description="Trie les propositions du buffer de capitalisation")
def triage_capitalization(project_filter: str = "", max_entries: int = 20) -> dict:
raw = read_text(get_paths().capitalisation)
entries = parse_capitalisation_entries(raw)
if project_filter:
entries = [e for e in entries if project_filter.lower() in e.header.lower()]
entries = entries[:max_entries]
reports = []
for entry in entries:
novelty = novelty_level(entry)
scope = scope_level(entry)
if novelty in {"DOUBLON_EXACT", "DOUBLON_SEMANTIQUE"}:
decision = "REJETER"
confidence = CONFIDENCE_HIGH
elif scope == "PROJET":
decision = "A_DEPLACER_CLAUDE_PROJET"
confidence = CONFIDENCE_MEDIUM
else:
decision = "INTEGRER_KNOWLEDGE"
confidence = CONFIDENCE_MEDIUM
reports.append(
{
"entry": entry.header,
"decision": decision,
"confidence": confidence,
"nouveaute": novelty,
"portee": scope,
"target_file": entry.target,
"why": entry.why,
"proposal": entry.proposal,
}
)
return {"count": len(reports), "reports": reports}
@mcp.tool(description="Route un apprentissage vers le CLAUDE.md du projet cible")
def route_to_project_memory(project_name: str, section: Literal["Lecons apprises", "Points sensibles"], content: str, dry_run: bool = True) -> dict:
paths = get_paths()
resolver = paths.root / "scripts" / "resolve-project-path.sh"
cmd = [str(resolver), project_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
project_path = result.stdout.strip()
if result.returncode != 0 or not project_path:
return {
"error": "Projet introuvable via resolve-project-path.sh",
"project": project_name,
"stdout": result.stdout,
"stderr": result.stderr,
}
claude_file = Path(project_path) / "CLAUDE.md"
section_header = f"# {section}"
block = f"\n- {content.strip()}\n"
if dry_run:
return {
"dry_run": True,
"project": project_name,
"claude_file": str(claude_file),
"section": section,
"preview": block.strip(),
}
if not bool(int(os.getenv("LEADTECH_MCP_ALLOW_WRITE", "0"))):
return {"error": "Ecriture desactivee. Exporter LEADTECH_MCP_ALLOW_WRITE=1"}
if not claude_file.exists():
return {"error": "CLAUDE.md introuvable", "claude_file": str(claude_file)}
original = claude_file.read_text(encoding="utf-8")
if section_header in original:
updated = original.replace(section_header, f"{section_header}\n{block}", 1)
else:
updated = f"{original.rstrip()}\n\n{section_header}\n{block}"
claude_file.write_text(updated, encoding="utf-8")
return {
"written": True,
"project": project_name,
"claude_file": str(claude_file),
"section": section,
}
def main() -> None:
mcp.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from .knowledge import get_paths, read_text
BLOCK_RE = re.compile(
r"(?P<header>\d{4}-\d{2}-\d{2}\s+[-—]\s+[^\n]+)\n\nFILE_UPDATE_PROPOSAL\n"
r"Fichier cible\s*:\s*(?P<target>[^\n]+)\n\n"
r"Pourquoi\s*:\s*\n(?P<why>.*?)\n\n"
r"Proposition\s*:\s*\n(?P<proposal>.*?)(?=\n\n\d{4}-\d{2}-\d{2}\s+[-—]|\Z)",
re.DOTALL,
)
@dataclass
class CapitalisationEntry:
header: str
target: str
why: str
proposal: str
def parse_capitalisation_entries(raw: str) -> list[CapitalisationEntry]:
entries: list[CapitalisationEntry] = []
for match in BLOCK_RE.finditer(raw):
entries.append(
CapitalisationEntry(
header=match.group("header").strip(),
target=match.group("target").strip(),
why=match.group("why").strip(),
proposal=match.group("proposal").strip(),
)
)
return entries
def novelty_level(entry: CapitalisationEntry) -> str:
root = get_paths().root
target_file = (root / entry.target).resolve()
if target_file.exists():
target_text = read_text(target_file).lower()
proposal_text = entry.proposal.lower()
if proposal_text and proposal_text in target_text:
return "DOUBLON_EXACT"
shared = [word for word in re.findall(r"[a-zA-Z]{5,}", proposal_text) if word in target_text]
if len(shared) >= 5:
return "DOUBLON_SEMANTIQUE"
return "NOUVEAU"
def scope_level(entry: CapitalisationEntry) -> str:
low = f"{entry.why} {entry.proposal}".lower()
project_markers = ["route", "screen", "component", "module", "label", "copy"]
score = sum(1 for marker in project_markers if marker in low)
return "PROJET" if score >= 2 else "GLOBAL"

View File

View File

@@ -0,0 +1,246 @@
from __future__ import annotations
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from leadtech_bmad_mcp.knowledge import (
_safe_path,
list_domain_files,
search_knowledge,
search_global_docs,
read_knowledge_doc,
_extract_excerpt,
LeadtechPaths,
get_paths,
)
# ---------------------------------------------------------------------------
# _safe_path
# ---------------------------------------------------------------------------
def test_safe_path_valid(tmp_path):
result = _safe_path(tmp_path, "sub", "file.md")
assert result == (tmp_path / "sub" / "file.md").resolve()
def test_safe_path_traversal_blocked(tmp_path):
with pytest.raises(ValueError, match="hors base"):
_safe_path(tmp_path, "..", "secret.md")
def test_safe_path_deep_traversal_blocked(tmp_path):
with pytest.raises(ValueError, match="hors base"):
_safe_path(tmp_path, "sub", "../../etc/passwd")
# ---------------------------------------------------------------------------
# list_domain_files
# ---------------------------------------------------------------------------
def _make_knowledge(tmp_path: Path) -> LeadtechPaths:
knowledge = tmp_path / "knowledge"
(knowledge / "backend" / "patterns").mkdir(parents=True)
(knowledge / "backend" / "risques").mkdir(parents=True)
(knowledge / "backend" / "patterns" / "contracts.md").write_text("zod contract schema")
(knowledge / "backend" / "patterns" / "auth.md").write_text("jwt token session")
(knowledge / "backend" / "risques" / "general.md").write_text("never store passwords")
return LeadtechPaths(
root=tmp_path,
knowledge=knowledge,
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
def test_list_domain_files_returns_md_files(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
files = list_domain_files("backend", "patterns")
assert len(files) == 2
assert all(f.suffix == ".md" for f in files)
def test_list_domain_files_invalid_domain(tmp_path):
with pytest.raises(ValueError, match="Domaine invalide"):
list_domain_files("unknown_domain", "patterns")
def test_list_domain_files_invalid_bucket(tmp_path):
with pytest.raises(ValueError, match="Type invalide"):
list_domain_files("backend", "unknown_bucket")
def test_list_domain_files_missing_dir_returns_empty(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
files = list_domain_files("frontend", "patterns") # dir not created
assert files == []
# ---------------------------------------------------------------------------
# search_knowledge
# ---------------------------------------------------------------------------
def test_search_knowledge_finds_match(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_knowledge("backend", "zod contract")
assert len(results) >= 1
titles = [r["title"] for r in results]
assert "contracts" in titles
def test_search_knowledge_no_match_returns_empty(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_knowledge("backend", "xyzzy_not_existing_keyword_abc")
assert results == []
def test_search_knowledge_sorted_by_score(tmp_path):
paths = _make_knowledge(tmp_path)
# contracts.md contains "zod" once, auth.md contains "session" once
# query "zod" should rank contracts first
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_knowledge("backend", "zod")
assert results[0]["title"] == "contracts"
def test_search_knowledge_respects_max_items(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_knowledge("backend", "a", max_items=1)
assert len(results) <= 1
def test_search_knowledge_single_bucket(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_knowledge("backend", "never", bucket="risques")
assert all(r["bucket"] == "risques" for r in results)
# ---------------------------------------------------------------------------
# read_knowledge_doc
# ---------------------------------------------------------------------------
def test_read_knowledge_doc_returns_content(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
content = read_knowledge_doc("backend", "patterns", "contracts")
assert "zod" in content
def test_read_knowledge_doc_not_found_raises(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
with pytest.raises(FileNotFoundError):
read_knowledge_doc("backend", "patterns", "nonexistent")
def test_read_knowledge_doc_traversal_blocked(tmp_path):
paths = _make_knowledge(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
with pytest.raises(ValueError, match="hors base"):
read_knowledge_doc("backend", "patterns", "../../etc/passwd")
# ---------------------------------------------------------------------------
# _extract_excerpt
# ---------------------------------------------------------------------------
def test_extract_excerpt_centered_on_token():
content = "début " * 30 + "ZOD CONTRACT ici" + " fin" * 30
excerpt = _extract_excerpt(content, ["zod"])
assert "ZOD" in excerpt.upper()
def test_extract_excerpt_fallback_to_start():
content = "a b c d e f g h i j k"
excerpt = _extract_excerpt(content, ["xyznotfound"])
assert excerpt.startswith("a b")
def test_extract_excerpt_adds_ellipsis_when_truncated():
content = "x " * 300
excerpt = _extract_excerpt(content, ["x"])
assert excerpt.endswith("")
def test_extract_excerpt_length_bounded():
content = "token " * 200
excerpt = _extract_excerpt(content, ["token"])
# EXCERPT_LENGTH=400 + éventuelles ellipses (2 chars) + début tronqué (80 chars avant)
assert len(excerpt) <= 500
# ---------------------------------------------------------------------------
# search_global_docs
# ---------------------------------------------------------------------------
def _make_global_docs(tmp_path: Path) -> LeadtechPaths:
(tmp_path / "40_decisions_et_archi.md").write_text("Ne jamais utiliser SQL Server en LXC Proxmox. Toujours PostgreSQL.")
(tmp_path / "90_debug_et_postmortem.md").write_text("Bug session: expiresAt manquant causait des sessions fantômes.")
(tmp_path / "10_conventions_redaction.md").write_text("Ne jamais écrire directement dans knowledge/.")
return LeadtechPaths(
root=tmp_path,
knowledge=tmp_path / "knowledge",
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
def test_search_global_docs_finds_postmortem(tmp_path):
paths = _make_global_docs(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_global_docs("session expiresAt")
assert len(results) >= 1
assert results[0]["title"] == "debug"
def test_search_global_docs_finds_architecture(tmp_path):
paths = _make_global_docs(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_global_docs("PostgreSQL Proxmox")
assert len(results) >= 1
assert results[0]["title"] == "architecture"
def test_search_global_docs_no_match(tmp_path):
paths = _make_global_docs(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_global_docs("xyzzy_not_existing_keyword")
assert results == []
def test_search_global_docs_includes_excerpt(tmp_path):
paths = _make_global_docs(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_global_docs("PostgreSQL")
assert "excerpt" in results[0]
assert len(results[0]["excerpt"]) > 0
def test_search_global_docs_missing_file_skipped(tmp_path):
# Seulement 40_ créé, les deux autres absents
(tmp_path / "40_decisions_et_archi.md").write_text("PostgreSQL recommandé.")
paths = LeadtechPaths(
root=tmp_path,
knowledge=tmp_path / "knowledge",
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
results = search_global_docs("PostgreSQL")
assert len(results) == 1
assert results[0]["title"] == "architecture"
def test_search_global_docs_respects_max_items(tmp_path):
paths = _make_global_docs(tmp_path)
with patch("leadtech_bmad_mcp.knowledge.get_paths", return_value=paths):
# "dans" apparaît dans les 3 fichiers
results = search_global_docs("dans jamais", max_items=1)
assert len(results) <= 1

View File

@@ -0,0 +1,240 @@
from __future__ import annotations
"""
Tests des gates de validate_plan et validate_patch.
On importe les fonctions directement sans démarrer le serveur MCP.
"""
import re
import sys
import types
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
def _mock_mcp_module():
"""Injecte un faux module mcp pour éviter l'import de la dépendance."""
class FastMCP:
def __init__(self, **kwargs): pass
def tool(self, description="", **k): return lambda f: f
def resource(self, path, **k): return lambda f: f
def run(self): pass
mcp_mod = types.ModuleType("mcp")
mcp_server = types.ModuleType("mcp.server")
mcp_fastmcp = types.ModuleType("mcp.server.fastmcp")
mcp_fastmcp.FastMCP = FastMCP
sys.modules.setdefault("mcp", mcp_mod)
sys.modules.setdefault("mcp.server", mcp_server)
sys.modules.setdefault("mcp.server.fastmcp", mcp_fastmcp)
_mock_mcp_module()
from leadtech_bmad_mcp.server import validate_plan, validate_patch # noqa: E402
from leadtech_bmad_mcp.knowledge import LeadtechPaths # noqa: E402
def _fake_paths(tmp_path: Path) -> LeadtechPaths:
(tmp_path / "80_bmad").mkdir(parents=True, exist_ok=True)
(tmp_path / "knowledge" / "workflow" / "risques").mkdir(parents=True, exist_ok=True)
(tmp_path / "80_bmad" / "process_llm_et_parallelisation.md").write_text("")
(tmp_path / "knowledge" / "workflow" / "risques" / "story-tracking.md").write_text("")
return LeadtechPaths(
root=tmp_path,
knowledge=tmp_path / "knowledge",
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
# ---------------------------------------------------------------------------
# validate_plan — contracts gate
# ---------------------------------------------------------------------------
class TestValidatePlanContracts:
def test_blocks_when_no_contract_reference(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "On va implémenter un service utilisateur.", strict=True)
assert result["blocking_issues"]
assert any("contrats" in i.lower() for i in result["blocking_issues"])
def test_passes_with_zod(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "On utilise Zod pour valider les inputs. Tests unitaires inclus.", strict=True)
assert not result["blocking_issues"]
def test_passes_with_z_object(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Schema: z.object({ id: z.string() }). Unit tests inclus.", strict=True)
assert not result["blocking_issues"]
def test_passes_with_contracts_first(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Approche contracts-first avec shared_contract. Unit tests inclus.", strict=True)
assert not result["blocking_issues"]
def test_no_block_on_frontend(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("frontend", "On construit un formulaire React. Unit tests inclus.", strict=True)
# Pas de blocking sur frontend pour l'absence de Zod
contract_blocks = [i for i in result["blocking_issues"] if "contrats" in i.lower()]
assert not contract_blocks
class TestValidatePlanRequestId:
def test_suggests_requestid_when_error_without_requestid(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "On gère les erreurs HTTP. Zod schema. Tests unitaires.", strict=False)
assert any("requestid" in s.lower() or "requestId" in s for s in result["should_do"])
def test_no_suggestion_when_requestid_present(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "On retourne { error: { code, message, requestId } }. Zod. Tests.", strict=False)
assert not any("requestid" in s.lower() for s in result["should_do"])
def test_requestid_snake_case_accepted(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Le champ request_id est dans chaque error. Zod. Tests.", strict=False)
assert not any("requestid" in s.lower() for s in result["should_do"])
class TestValidatePlanTests:
def test_flags_missing_test_strategy(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "On implémente la feature X avec Zod contracts-first.", strict=False)
assert any("test" in s.lower() for s in result["must_do"])
def test_no_flag_when_tests_mentioned(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Feature X avec Zod. Unit tests et integration tests inclus.", strict=False)
assert not any("strategie de test" in s.lower() for s in result["must_do"])
def test_spec_keyword_accepted(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Feature X avec Zod. Spec files inclus pour chaque module.", strict=False)
assert not any("strategie de test" in s.lower() for s in result["must_do"])
class TestValidatePlanParallel:
def test_flags_parallel_without_depends_on(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "Cette story est parallel-safe. Zod. Tests.", strict=False)
assert any("parallel" in s.lower() for s in result["red_flags"])
def test_no_flag_when_depends_on_present(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_plan("backend", "parallel-safe: true. depends-on: story-1. Zod. Tests.", strict=False)
assert not any("parallel" in s.lower() for s in result["red_flags"])
# ---------------------------------------------------------------------------
# validate_patch — backend gates
# ---------------------------------------------------------------------------
class TestValidatePatchSession:
def test_blocks_session_without_expires_at(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "UPDATE sessions SET user_id = ? WHERE id = ?")
assert any("expiresat" in i.lower() or "expiresat" in i.lower() for i in result["blocking_issues"])
def test_passes_session_with_expires_at_camel(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "UPDATE sessions SET user_id = ?, expiresAt = ? WHERE id = ?")
session_blocks = [i for i in result["blocking_issues"] if "expiresat" in i.lower() or "session" in i.lower()]
assert not session_blocks
def test_passes_session_with_expires_at_snake(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "ALTER TABLE sessions ADD COLUMN expires_at TIMESTAMP;")
session_blocks = [i for i in result["blocking_issues"] if "expiresat" in i.lower() or "session" in i.lower()]
assert not session_blocks
class TestValidatePatchRequestId:
def test_flags_error_without_requestid(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "throw new Error('not found')")
assert any("requestid" in s.lower() for s in result["must_do"])
def test_no_flag_with_requestid_camel(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "return { error: { code: 'NOT_FOUND', message, requestId: uuid() } }")
assert not any("requestid" in s.lower() for s in result["must_do"])
def test_no_flag_with_request_id_snake(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "error_response = { code: 'ERR', request_id: generate() }")
assert not any("requestid" in s.lower() for s in result["must_do"])
class TestValidatePatchAuthGuard:
def test_flags_guard_without_authguard(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "@UseGuards(RolesGuard)\nasync someMethod() {}")
assert any("authguard" in s.lower() or "auth" in s.lower() for s in result["red_flags"])
def test_no_flag_with_authguard(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "@UseGuards(AuthGuard('jwt'), RolesGuard)\nasync someMethod() {}")
assert not any("authguard" in s.lower() for s in result["red_flags"])
def test_no_flag_with_jwtauthguard(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "@UseGuards(JwtAuthGuard, RolesGuard)\nasync someMethod() {}")
assert not any("authguard" in s.lower() for s in result["red_flags"])
class TestValidatePatchTests:
def test_suggests_tests_when_no_spec_file(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "const x = 1;", strict=True)
assert any("test" in s.lower() for s in result["should_do"])
def test_no_suggestion_when_describe_block(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "describe('x', () => { it('works', () => {}) })", strict=True)
assert not any("test" in s.lower() for s in result["should_do"])
def test_no_suggestion_when_it_block(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch("backend", "it('does something', () => { expect(x).toBe(1) })", strict=True)
assert not any("test" in s.lower() for s in result["should_do"])
class TestValidatePatchBmadOnly:
def test_blocks_when_only_bmad_artefacts(self, tmp_path):
paths = _fake_paths(tmp_path)
with patch("leadtech_bmad_mcp.server.get_paths", return_value=paths):
result = validate_patch(
"backend",
"story update",
changed_files=["_bmad-output/story-1.md", "_bmad-output/sprint-status.yaml"],
)
assert any("artefact" in i.lower() or "bmad" in i.lower() for i in result["blocking_issues"])

View File

@@ -0,0 +1,209 @@
from __future__ import annotations
import pytest
from pathlib import Path
from unittest.mock import patch
from leadtech_bmad_mcp.triage import (
parse_capitalisation_entries,
novelty_level,
scope_level,
CapitalisationEntry,
)
from leadtech_bmad_mcp.knowledge import LeadtechPaths
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
VALID_BLOCK = """\
2026-03-15 — app-alexandrie
FILE_UPDATE_PROPOSAL
Fichier cible : knowledge/backend/patterns/nestjs.md
Pourquoi :
L'ordre des guards NestJS a causé request.user undefined dans EmailVerifiedGuard.
Proposition :
Toujours enregistrer AuthGuard en premier dans providers[] avant tout guard qui lit request.user.
"""
VALID_BLOCK_2 = """\
2026-03-20 — app-foo
FILE_UPDATE_PROPOSAL
Fichier cible : knowledge/frontend/risques/general.md
Pourquoi :
Les useEffect sans deps array causent des boucles infinies.
Proposition :
Toujours spécifier le tableau de dépendances dans useEffect.
"""
SEPARATOR = "\n\n"
# ---------------------------------------------------------------------------
# parse_capitalisation_entries
# ---------------------------------------------------------------------------
def test_parse_single_entry():
entries = parse_capitalisation_entries(VALID_BLOCK)
assert len(entries) == 1
e = entries[0]
assert "app-alexandrie" in e.header
assert e.target == "knowledge/backend/patterns/nestjs.md"
assert "guards" in e.why.lower()
assert "AuthGuard" in e.proposal
def test_parse_multiple_entries():
raw = VALID_BLOCK + SEPARATOR + VALID_BLOCK_2
entries = parse_capitalisation_entries(raw)
assert len(entries) == 2
assert "app-alexandrie" in entries[0].header
assert "app-foo" in entries[1].header
def test_parse_empty_returns_empty():
entries = parse_capitalisation_entries("")
assert entries == []
def test_parse_malformed_no_match():
raw = "Ce texte ne contient aucune entrée valide."
entries = parse_capitalisation_entries(raw)
assert entries == []
def test_parse_preserves_multiline_proposal():
raw = """\
2026-03-15 — proj
FILE_UPDATE_PROPOSAL
Fichier cible : knowledge/backend/patterns/auth.md
Pourquoi :
Raison courte.
Proposition :
Ligne 1 du pattern.
Ligne 2 du pattern.
Ligne 3 du pattern.
"""
entries = parse_capitalisation_entries(raw)
assert len(entries) == 1
assert "Ligne 2" in entries[0].proposal
assert "Ligne 3" in entries[0].proposal
# ---------------------------------------------------------------------------
# novelty_level
# ---------------------------------------------------------------------------
def _make_paths_with_target(tmp_path: Path, target_rel: str, content: str) -> LeadtechPaths:
target = tmp_path / target_rel
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
return LeadtechPaths(
root=tmp_path,
knowledge=tmp_path / "knowledge",
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
def test_novelty_exact_doublon(tmp_path):
proposal = "Toujours enregistrer AuthGuard en premier dans providers[]."
paths = _make_paths_with_target(tmp_path, "knowledge/backend/patterns/nestjs.md", proposal)
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/nestjs.md",
why="Raison.",
proposal=proposal,
)
with patch("leadtech_bmad_mcp.triage.get_paths", return_value=paths):
assert novelty_level(entry) == "DOUBLON_EXACT"
def test_novelty_semantique_doublon(tmp_path):
# Le fichier cible contient déjà la plupart des mots clés de la proposition
existing = "toujours enregistrer authguard premier providers avant guard request"
paths = _make_paths_with_target(tmp_path, "knowledge/backend/patterns/nestjs.md", existing)
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/nestjs.md",
why="Raison.",
proposal="Toujours enregistrer authguard premier providers avant guard request utilisateur",
)
with patch("leadtech_bmad_mcp.triage.get_paths", return_value=paths):
result = novelty_level(entry)
assert result == "DOUBLON_SEMANTIQUE"
def test_novelty_nouveau_when_file_missing(tmp_path):
paths = LeadtechPaths(
root=tmp_path,
knowledge=tmp_path / "knowledge",
capitalisation=tmp_path / "95_a_capitaliser.md",
projects_conf=tmp_path / "_projects.conf",
)
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/nestjs.md",
why="Raison.",
proposal="Un pattern totalement nouveau sur les migrations Prisma.",
)
with patch("leadtech_bmad_mcp.triage.get_paths", return_value=paths):
assert novelty_level(entry) == "NOUVEAU"
def test_novelty_nouveau_when_different_content(tmp_path):
existing = "Ce fichier parle de Stripe webhooks et idempotency."
paths = _make_paths_with_target(tmp_path, "knowledge/backend/patterns/stripe.md", existing)
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/stripe.md",
why="Raison.",
proposal="Un pattern sur les guards NestJS complètement différent.",
)
with patch("leadtech_bmad_mcp.triage.get_paths", return_value=paths):
assert novelty_level(entry) == "NOUVEAU"
# ---------------------------------------------------------------------------
# scope_level
# ---------------------------------------------------------------------------
def test_scope_projet_when_multiple_markers():
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/frontend/patterns/forms.md",
why="Ce composant spécifique sur cet écran particulier.",
proposal="Le label de ce screen doit être en majuscules.",
)
assert scope_level(entry) == "PROJET"
def test_scope_global_when_no_markers():
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/auth.md",
why="Pattern de validation JWT applicable à tous les projets.",
proposal="Toujours vérifier l'expiration du token côté serveur.",
)
assert scope_level(entry) == "GLOBAL"
def test_scope_global_when_one_marker_only():
# un seul marker → pas suffisant pour PROJET
entry = CapitalisationEntry(
header="2026-03-15 — proj",
target="knowledge/backend/patterns/auth.md",
why="Ce route spécifique pose problème.",
proposal="Ajouter un middleware de validation générique.",
)
assert scope_level(entry) == "GLOBAL"