Compare commits

...

11 Commits

Author SHA1 Message Date
Danny ecb498c8ba feat: yaml files for llm reasoning 2026-02-03 09:17:48 -06:00
Danny de642efd87 fix: include artifacts directory in Docker image
Add COPY artifacts/ to Dockerfile so YAML files are available
for RAG indexing when the container starts.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 11:11:00 -06:00
Danny 2f172ddaf9 feat: auto-build FAISS index on startup if missing
Add lifespan handler that checks for FAISS index at startup
and automatically builds it if not found. This ensures the
service works on fresh deployments without manual indexing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 10:47:03 -06:00
Danny 1d11deb813 feat: yaml files for llm reasoning 2026-01-30 10:41:12 -06:00
Danny 77260c3e08 feat: yaml files for llm reasoning 2026-01-30 10:37:00 -06:00
Danny b0211b944d feat: replace Redis with in-memory conversation storage
- Remove Redis dependency and redis_client.py
- Implement ConversationMemory with module-level dictionary
- Add TTL support via timestamp checking
- Remove redis_connected from health endpoint
- Add embeddings, intent classification, and RAG prompt modules

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 10:34:47 -06:00
Danny 72778b65b5 feat: yaml files for llm reasoning 2026-01-29 12:39:41 -06:00
Danny Garcia 50599d7cee Merge pull request #5 from dannyjosephgarcia/WOOL-25
feat: add GCP service-to-service authentication
2026-01-20 11:42:10 -06:00
Danny c9336d1d84 feat: add GCP service-to-service authentication
Implement identity token verification for Cloud Run deployments:
- Add auth module with GCP identity token verification
- Add configurable auth settings (AUTH_ENABLED, AUTH_AUDIENCE)
- Add service account allowlist for access control
- Protect /chat and /chat/stream endpoints with auth dependency
- Add google-auth dependency for token verification

Auth can be disabled for local development via AUTH_ENABLED=false.
2026-01-19 11:06:59 -06:00
Danny Garcia e3c4680108 Merge pull request #4 from dannyjosephgarcia/WOOL-18
feat: add CORS middleware and SSE streaming endpoint
2026-01-16 12:44:26 -06:00
Danny 6c1cf0655a feat: add CORS middleware and SSE streaming endpoint
Add CORS support for frontend development with configurable origins via
CORS_ORIGINS environment variable. Add /chat/stream endpoint for
Server-Sent Events streaming with true streaming support for OpenAI
adapter and fallback single-chunk behavior for other adapters.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 12:43:21 -06:00
32 changed files with 4025 additions and 11 deletions
+3
View File
@@ -1,6 +1,9 @@
# LLM Mode: "local", "remote", "openai", or "asksage" # LLM Mode: "local", "remote", "openai", or "asksage"
LLM_MODE=local LLM_MODE=local
# CORS Configuration (comma-separated origins for frontend access)
CORS_ORIGINS=http://localhost:3000
# Remote LLM Configuration (required if LLM_MODE=remote) # Remote LLM Configuration (required if LLM_MODE=remote)
LLM_REMOTE_URL=https://your-llm-service.com/generate LLM_REMOTE_URL=https://your-llm-service.com/generate
LLM_REMOTE_TOKEN= LLM_REMOTE_TOKEN=
+3
View File
@@ -9,6 +9,9 @@ RUN pip install --no-cache-dir -r requirements.txt
# Copy application code # Copy application code
COPY app/ ./app/ COPY app/ ./app/
# Copy YAML artifacts for RAG indexing
COPY artifacts/ ./artifacts/
# Default model (API key must be passed at runtime for security) # Default model (API key must be passed at runtime for security)
ENV OPENAI_MODEL=gpt-4o-mini ENV OPENAI_MODEL=gpt-4o-mini
+114
View File
@@ -0,0 +1,114 @@
"""GCP Cloud Run service-to-service authentication module."""
import logging
from typing import Annotated
from fastapi import Depends, HTTPException, Request
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token
from app.config import settings
logger = logging.getLogger(__name__)
def verify_gcp_identity_token(token: str, audience: str) -> dict:
"""Verify a GCP identity token and return the decoded claims.
Args:
token: The identity token to verify.
audience: The expected audience (backend Cloud Run URL).
Returns:
The decoded token claims.
Raises:
ValueError: If the token is invalid or verification fails.
"""
try:
claims = id_token.verify_oauth2_token(
token,
google_requests.Request(),
audience=audience,
)
return claims
except Exception as e:
logger.warning(f"Token verification failed: {e}")
raise ValueError(f"Token verification failed: {e}")
async def verify_service_auth(request: Request) -> dict | None:
"""FastAPI dependency to verify GCP service-to-service authentication.
Returns None if auth is disabled (local dev), otherwise verifies the
identity token and checks the service account allowlist.
Returns:
The decoded token claims, or None if auth is disabled.
Raises:
HTTPException: 401 if authentication fails.
"""
# Skip auth if disabled (local development)
if not settings.auth_enabled:
logger.debug("Authentication disabled, skipping verification")
return None
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
logger.warning("Missing Authorization header")
raise HTTPException(
status_code=401,
detail="Missing Authorization header",
)
if not auth_header.startswith("Bearer "):
logger.warning("Invalid Authorization header format")
raise HTTPException(
status_code=401,
detail="Invalid Authorization header format. Expected 'Bearer <token>'",
)
token = auth_header[7:] # Remove "Bearer " prefix
# Verify the token
if not settings.auth_audience:
logger.error("AUTH_AUDIENCE not configured")
raise HTTPException(
status_code=500,
detail="Server authentication not properly configured",
)
try:
claims = verify_gcp_identity_token(token, settings.auth_audience)
except ValueError as e:
logger.warning(f"Token verification failed: {e}")
raise HTTPException(
status_code=401,
detail="Invalid or expired token",
)
# Check service account allowlist if configured
allowed_accounts = settings.allowed_service_accounts_list
if allowed_accounts:
email = claims.get("email", "")
if email not in allowed_accounts:
logger.warning(
f"Service account '{email}' not in allowlist",
extra={"allowed": allowed_accounts},
)
raise HTTPException(
status_code=403,
detail="Service account not authorized",
)
logger.info(
"Service authentication successful",
extra={"service_account": claims.get("email", "unknown")},
)
return claims
# Type alias for clean dependency injection
ServiceAuthDependency = Annotated[dict | None, Depends(verify_service_auth)]
+30
View File
@@ -19,6 +19,36 @@ class Settings(BaseSettings):
asksage_api_key: str = "" asksage_api_key: str = ""
asksage_model: str = "gpt-4o" asksage_model: str = "gpt-4o"
# CORS configuration
cors_origins: str = "http://localhost:3000"
# Conversation memory configuration
conversation_ttl: int = 86400 # 24 hours
# Embedding configuration
embedding_model: str = "text-embedding-3-small"
# RAG configuration
rag_top_k: int = 5
rag_similarity_threshold: float = 0.70
artifacts_path: str = "artifacts"
embeddings_path: str = "embeddings"
# Authentication settings
auth_enabled: bool = False # Set to True in production
auth_audience: str = "" # Backend Cloud Run URL (e.g., https://backend-xxx.run.app)
allowed_service_accounts: str = "" # Comma-separated list of allowed service account emails
@property
def cors_origins_list(self) -> list[str]:
"""Parse comma-separated CORS origins into a list."""
return [origin.strip() for origin in self.cors_origins.split(",") if origin.strip()]
@property
def allowed_service_accounts_list(self) -> list[str]:
"""Parse comma-separated allowed service accounts into a list."""
return [sa.strip() for sa in self.allowed_service_accounts.split(",") if sa.strip()]
class Config: class Config:
env_file = ".env" env_file = ".env"
env_file_encoding = "utf-8" env_file_encoding = "utf-8"
+18
View File
@@ -0,0 +1,18 @@
"""Embeddings module for YAML artifact indexing and retrieval."""
from app.embeddings.client import EmbeddingClient, get_embedding_client, EmbeddingClientDependency
from app.embeddings.indexer import ArtifactIndexer, get_indexer, IndexerDependency
from app.embeddings.retriever import Retriever, get_retriever, RetrieverDependency, reset_retriever
__all__ = [
"EmbeddingClient",
"get_embedding_client",
"EmbeddingClientDependency",
"ArtifactIndexer",
"get_indexer",
"IndexerDependency",
"Retriever",
"get_retriever",
"RetrieverDependency",
"reset_retriever",
]
+109
View File
@@ -0,0 +1,109 @@
"""OpenAI embedding client wrapper."""
import logging
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from openai import AsyncOpenAI, AuthenticationError, RateLimitError, APIConnectionError, APIError
from app.config import settings
logger = logging.getLogger(__name__)
class EmbeddingError(Exception):
"""Base exception for embedding operations."""
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(message)
class EmbeddingClient:
"""Async wrapper for OpenAI embeddings API."""
def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
"""Initialize the embedding client.
Args:
api_key: OpenAI API key
model: Embedding model identifier
"""
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
self.dimensions = 1536 # text-embedding-3-small dimension
async def embed(self, text: str) -> list[float]:
"""Generate embedding for a single text.
Args:
text: Text to embed
Returns:
Embedding vector (1536 dimensions)
Raises:
EmbeddingError: If embedding generation fails
"""
try:
response = await self.client.embeddings.create(
model=self.model,
input=text,
)
return response.data[0].embedding
except AuthenticationError as e:
raise EmbeddingError(f"OpenAI authentication failed: {e.message}", 401)
except RateLimitError as e:
raise EmbeddingError(f"OpenAI rate limit exceeded: {e.message}", 429)
except APIConnectionError as e:
raise EmbeddingError(f"Could not connect to OpenAI: {str(e)}", 503)
except APIError as e:
raise EmbeddingError(f"OpenAI API error: {e.message}", e.status_code or 500)
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Generate embeddings for multiple texts.
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
Raises:
EmbeddingError: If embedding generation fails
"""
if not texts:
return []
try:
response = await self.client.embeddings.create(
model=self.model,
input=texts,
)
# Sort by index to ensure correct ordering
sorted_embeddings = sorted(response.data, key=lambda x: x.index)
return [item.embedding for item in sorted_embeddings]
except AuthenticationError as e:
raise EmbeddingError(f"OpenAI authentication failed: {e.message}", 401)
except RateLimitError as e:
raise EmbeddingError(f"OpenAI rate limit exceeded: {e.message}", 429)
except APIConnectionError as e:
raise EmbeddingError(f"Could not connect to OpenAI: {str(e)}", 503)
except APIError as e:
raise EmbeddingError(f"OpenAI API error: {e.message}", e.status_code or 500)
@lru_cache()
def get_embedding_client() -> EmbeddingClient:
"""Get cached embedding client instance."""
return EmbeddingClient(
api_key=settings.openai_api_key,
model=settings.embedding_model,
)
EmbeddingClientDependency = Annotated[EmbeddingClient, Depends(get_embedding_client)]
+227
View File
@@ -0,0 +1,227 @@
"""YAML artifact indexer for building FAISS index."""
import json
import logging
from dataclasses import dataclass, asdict
from functools import lru_cache
from pathlib import Path
from typing import Annotated
import faiss
import numpy as np
import yaml
from fastapi import Depends
from app.config import settings
from app.embeddings.client import EmbeddingClient, get_embedding_client, EmbeddingError
logger = logging.getLogger(__name__)
@dataclass
class Chunk:
"""Represents a text chunk from a YAML artifact."""
chunk_id: str
content: str
chunk_type: str # factual_summary, interpretive_summary, method, invariants
artifact_file: str
source_file: str
tags: dict
@dataclass
class IndexResult:
"""Result of indexing operation."""
chunks_indexed: int
artifacts_processed: int
status: str
class ArtifactIndexer:
"""Parses YAML artifacts and builds FAISS index."""
def __init__(self, embedding_client: EmbeddingClient):
"""Initialize the indexer.
Args:
embedding_client: Client for generating embeddings
"""
self.embedding_client = embedding_client
self.artifacts_path = Path(settings.artifacts_path)
self.embeddings_path = Path(settings.embeddings_path)
self.dimensions = 1536
def _parse_yaml_to_chunks(self, yaml_path: Path) -> list[Chunk]:
"""Parse a YAML artifact file into chunks.
Args:
yaml_path: Path to the YAML file
Returns:
List of chunks extracted from the file
"""
chunks = []
artifact_file = str(yaml_path.relative_to(self.artifacts_path))
try:
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
except Exception as e:
logger.warning(f"Failed to parse {yaml_path}: {e}")
return chunks
if not data:
return chunks
source_file = data.get("source_file", "unknown")
tags = data.get("tags", {})
# Chunk 1: Factual summary
if factual := data.get("factual_summary"):
chunks.append(Chunk(
chunk_id=f"{artifact_file}::factual_summary",
content=f"[{source_file}] Factual summary: {factual.strip()}",
chunk_type="factual_summary",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk 2: Interpretive summary
if interpretive := data.get("interpretive_summary"):
chunks.append(Chunk(
chunk_id=f"{artifact_file}::interpretive_summary",
content=f"[{source_file}] Interpretive summary: {interpretive.strip()}",
chunk_type="interpretive_summary",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk per method
if methods := data.get("methods"):
for method_sig, method_data in methods.items():
description = method_data.get("description", "") if isinstance(method_data, dict) else method_data
if description:
chunks.append(Chunk(
chunk_id=f"{artifact_file}::method::{method_sig}",
content=f"[{source_file}] Method {method_sig}: {description.strip()}",
chunk_type="method",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk for invariants (combined)
if invariants := data.get("invariants"):
invariants_text = " ".join(f"- {inv}" for inv in invariants)
chunks.append(Chunk(
chunk_id=f"{artifact_file}::invariants",
content=f"[{source_file}] Invariants: {invariants_text}",
chunk_type="invariants",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
return chunks
def _collect_all_chunks(self) -> list[Chunk]:
"""Collect chunks from all YAML artifacts.
Returns:
List of all chunks from all artifacts
"""
all_chunks = []
for yaml_path in self.artifacts_path.rglob("*.yaml"):
chunks = self._parse_yaml_to_chunks(yaml_path)
all_chunks.extend(chunks)
logger.debug(f"Parsed {len(chunks)} chunks from {yaml_path}")
return all_chunks
async def build_index(self) -> IndexResult:
"""Build FAISS index from all YAML artifacts.
Returns:
IndexResult with statistics
Raises:
EmbeddingError: If embedding generation fails
"""
# Collect all chunks
chunks = self._collect_all_chunks()
if not chunks:
logger.warning("No chunks found in artifacts")
return IndexResult(
chunks_indexed=0,
artifacts_processed=0,
status="no_artifacts",
)
logger.info(f"Generating embeddings for {len(chunks)} chunks...")
# Generate embeddings in batches
batch_size = 100
all_embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [chunk.content for chunk in batch]
embeddings = await self.embedding_client.embed_batch(texts)
all_embeddings.extend(embeddings)
logger.debug(f"Embedded batch {i // batch_size + 1}")
# Build FAISS index (IndexFlatIP for inner product / cosine similarity on normalized vectors)
embeddings_array = np.array(all_embeddings, dtype=np.float32)
# Normalize for cosine similarity
faiss.normalize_L2(embeddings_array)
index = faiss.IndexFlatIP(self.dimensions)
index.add(embeddings_array)
# Create embeddings directory if needed
self.embeddings_path.mkdir(parents=True, exist_ok=True)
# Save FAISS index
faiss.write_index(index, str(self.embeddings_path / "faiss_index.bin"))
# Save metadata
metadata = {
"chunks": [asdict(chunk) for chunk in chunks],
}
with open(self.embeddings_path / "metadata.json", "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2)
# Save index info
artifact_files = set(chunk.artifact_file for chunk in chunks)
index_info = {
"total_chunks": len(chunks),
"total_artifacts": len(artifact_files),
"dimensions": self.dimensions,
"index_type": "IndexFlatIP",
}
with open(self.embeddings_path / "index_info.json", "w", encoding="utf-8") as f:
json.dump(index_info, f, indent=2)
logger.info(f"Indexed {len(chunks)} chunks from {len(artifact_files)} artifacts")
return IndexResult(
chunks_indexed=len(chunks),
artifacts_processed=len(artifact_files),
status="completed",
)
@lru_cache()
def get_indexer() -> ArtifactIndexer:
"""Get cached indexer instance."""
return ArtifactIndexer(embedding_client=get_embedding_client())
IndexerDependency = Annotated[ArtifactIndexer, Depends(get_indexer)]
+221
View File
@@ -0,0 +1,221 @@
"""FAISS-based retrieval with adaptive selection."""
import json
import logging
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Annotated
import faiss
import numpy as np
from fastapi import Depends
from app.config import settings
from app.embeddings.client import EmbeddingClient, get_embedding_client
logger = logging.getLogger(__name__)
@dataclass
class RetrievedChunk:
"""A chunk retrieved from FAISS search."""
chunk_id: str
content: str
chunk_type: str
artifact_file: str
source_file: str
tags: dict
score: float
class Retriever:
"""FAISS-based retriever with adaptive selection logic."""
def __init__(self, embedding_client: EmbeddingClient):
"""Initialize the retriever.
Args:
embedding_client: Client for generating query embeddings
"""
self.embedding_client = embedding_client
self.embeddings_path = Path(settings.embeddings_path)
self.top_k = settings.rag_top_k
self.threshold = settings.rag_similarity_threshold
self._index: faiss.IndexFlatIP | None = None
self._metadata: list[dict] | None = None
self._loaded = False
def load_index(self) -> bool:
"""Load FAISS index and metadata from disk.
Returns:
True if successfully loaded, False otherwise
"""
index_path = self.embeddings_path / "faiss_index.bin"
metadata_path = self.embeddings_path / "metadata.json"
if not index_path.exists() or not metadata_path.exists():
logger.warning("FAISS index or metadata not found. Run /index first.")
self._loaded = False
return False
try:
self._index = faiss.read_index(str(index_path))
with open(metadata_path, "r", encoding="utf-8") as f:
data = json.load(f)
self._metadata = data.get("chunks", [])
self._loaded = True
logger.info(f"Loaded FAISS index with {self._index.ntotal} vectors")
return True
except Exception as e:
logger.error(f"Failed to load FAISS index: {e}")
self._loaded = False
return False
@property
def is_loaded(self) -> bool:
"""Check if index is loaded."""
return self._loaded and self._index is not None
@property
def index_size(self) -> int:
"""Get number of vectors in index."""
if self._index is None:
return 0
return self._index.ntotal
def _adaptive_select(
self,
indices: np.ndarray,
scores: np.ndarray,
) -> list[tuple[int, float]]:
"""Apply adaptive selection logic.
- Always include top 2 chunks (regardless of score)
- For chunks 3-5: apply threshold
- Limit to self.top_k chunks total
Args:
indices: FAISS result indices
scores: FAISS result scores
Returns:
List of (index, score) tuples for selected chunks
"""
selected = []
for i, (idx, score) in enumerate(zip(indices, scores)):
if idx == -1: # FAISS returns -1 for no match
continue
# Always take top 2
if i < 2:
selected.append((int(idx), float(score)))
# Apply threshold for remaining
elif score >= self.threshold and len(selected) < self.top_k:
selected.append((int(idx), float(score)))
return selected
def _apply_diversity_filter(
self,
candidates: list[tuple[int, float]],
max_per_artifact: int = 2,
) -> list[tuple[int, float]]:
"""Limit chunks per artifact for diversity.
Args:
candidates: List of (index, score) tuples
max_per_artifact: Maximum chunks from same artifact
Returns:
Filtered list of (index, score) tuples
"""
artifact_counts: dict[str, int] = {}
filtered = []
for idx, score in candidates:
chunk = self._metadata[idx]
artifact = chunk["artifact_file"]
if artifact_counts.get(artifact, 0) < max_per_artifact:
filtered.append((idx, score))
artifact_counts[artifact] = artifact_counts.get(artifact, 0) + 1
return filtered
async def search(self, query: str) -> list[RetrievedChunk]:
"""Search for relevant chunks.
Args:
query: User's question
Returns:
List of retrieved chunks with relevance scores
"""
if not self.is_loaded:
if not self.load_index():
return []
# Generate query embedding
query_embedding = await self.embedding_client.embed(query)
query_vector = np.array([query_embedding], dtype=np.float32)
# Normalize for cosine similarity
faiss.normalize_L2(query_vector)
# Search FAISS (get more candidates than needed for filtering)
k_search = min(8, self._index.ntotal)
scores, indices = self._index.search(query_vector, k_search)
# Apply adaptive selection
selected = self._adaptive_select(indices[0], scores[0])
# Apply diversity filter
filtered = self._apply_diversity_filter(selected)
# Build result chunks
results = []
for idx, score in filtered:
chunk_data = self._metadata[idx]
results.append(RetrievedChunk(
chunk_id=chunk_data["chunk_id"],
content=chunk_data["content"],
chunk_type=chunk_data["chunk_type"],
artifact_file=chunk_data["artifact_file"],
source_file=chunk_data["source_file"],
tags=chunk_data.get("tags", {}),
score=score,
))
logger.debug(f"Retrieved {len(results)} chunks for query")
return results
# Singleton retriever instance
_retriever: Retriever | None = None
def get_retriever() -> Retriever:
"""Get singleton retriever instance (lazily initialized)."""
global _retriever
if _retriever is None:
_retriever = Retriever(embedding_client=get_embedding_client())
# Attempt to load index at startup
_retriever.load_index()
return _retriever
def reset_retriever() -> None:
"""Reset the singleton retriever (for reloading after re-indexing)."""
global _retriever
_retriever = None
RetrieverDependency = Annotated[Retriever, Depends(get_retriever)]
+9
View File
@@ -0,0 +1,9 @@
"""Intent classification module."""
from app.intent.classifier import IntentClassifier, get_intent_classifier, IntentClassifierDependency
__all__ = [
"IntentClassifier",
"get_intent_classifier",
"IntentClassifierDependency",
]
+98
View File
@@ -0,0 +1,98 @@
"""Lightweight intent classification using gpt-4o-mini."""
import logging
from functools import lru_cache
from typing import Annotated, Literal
from fastapi import Depends
from openai import AsyncOpenAI
from app.config import settings
from app.memory.conversation import Message
logger = logging.getLogger(__name__)
Intent = Literal["codebase", "general", "clarification"]
INTENT_PROMPT = """You are classifying questions for a Tyndale trading system documentation assistant.
Classify this user message into one category:
- "codebase": ANY question about trading, strategies, exchanges, orders, positions, risk, execution, hedging, market making, P&L, or how the system works. Also includes questions with "our", "the system", "this", or references to specific functionality.
- "general": ONLY greetings or completely off-topic questions ("How are you?", "What's the weather?", "Hello")
- "clarification": Follow-ups that reference previous answers ("Tell me more", "What did you mean?", "Can you explain that?")
DEFAULT TO "codebase" if uncertain. This is a trading system assistant - assume trading questions are about the codebase.
Respond with ONLY the category name, nothing else."""
class IntentClassifier:
"""Lightweight intent classifier using gpt-4o-mini."""
def __init__(self, api_key: str):
"""Initialize the classifier.
Args:
api_key: OpenAI API key
"""
self.client = AsyncOpenAI(api_key=api_key)
self.model = "gpt-4o-mini"
async def classify(
self,
message: str,
history: list[Message] | None = None,
) -> Intent:
"""Classify user message intent.
Args:
message: User's message
history: Optional conversation history for context
Returns:
Classified intent: "codebase", "general", or "clarification"
"""
# Build context from history (last 2 turns)
context = ""
if history and len(history) >= 2:
recent = history[-4:] # Last 2 exchanges
context = "Recent conversation:\n"
for msg in recent:
role = "User" if msg.role == "user" else "Assistant"
context += f"{role}: {msg.content[:100]}...\n" if len(msg.content) > 100 else f"{role}: {msg.content}\n"
context += "\n"
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": INTENT_PROMPT},
{"role": "user", "content": f"{context}Current message: {message}"},
],
max_tokens=10,
temperature=0,
)
raw_intent = response.choices[0].message.content.strip().lower()
# Validate intent
if raw_intent in ("codebase", "general", "clarification"):
logger.info(f"Intent classified: '{message[:50]}...' -> {raw_intent}")
return raw_intent
# Default to codebase for ambiguous cases (safer for RAG)
logger.warning(f"Unexpected intent response: {raw_intent}, defaulting to codebase")
return "codebase"
except Exception as e:
logger.warning(f"Intent classification failed: {e}, defaulting to codebase")
return "codebase"
@lru_cache()
def get_intent_classifier() -> IntentClassifier:
"""Get cached intent classifier instance."""
return IntentClassifier(api_key=settings.openai_api_key)
IntentClassifierDependency = Annotated[IntentClassifier, Depends(get_intent_classifier)]
+62
View File
@@ -3,6 +3,7 @@
import asyncio import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from functools import lru_cache from functools import lru_cache
from collections.abc import AsyncIterator
from typing import Annotated from typing import Annotated
import httpx import httpx
@@ -46,6 +47,27 @@ class LLMAdapter(ABC):
""" """
pass pass
async def generate_stream(
self, conversation_id: str, message: str
) -> AsyncIterator[str]:
"""Stream a response for the given message.
Default implementation yields the full response as a single chunk.
Subclasses can override this to provide true streaming.
Args:
conversation_id: The conversation identifier
message: The user's message
Yields:
Response content chunks
Raises:
LLMError: If generation fails for any reason
"""
response = await self.generate(conversation_id, message)
yield response
class LocalAdapter(LLMAdapter): class LocalAdapter(LLMAdapter):
"""Local stub adapter for development and testing.""" """Local stub adapter for development and testing."""
@@ -183,6 +205,46 @@ class OpenAIAdapter(LLMAdapter):
f"OpenAI API error: {e.message}", status_code=e.status_code or 500 f"OpenAI API error: {e.message}", status_code=e.status_code or 500
) )
async def generate_stream(
self, conversation_id: str, message: str
) -> AsyncIterator[str]:
"""Stream a response using the OpenAI API.
Args:
conversation_id: The conversation identifier (for future use with context)
message: The user's message
Yields:
Response content chunks
Raises:
LLMAuthenticationError: If API key is invalid
LLMRateLimitError: If rate limit is exceeded
LLMConnectionError: If connection fails
LLMError: For other API errors
"""
try:
stream = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": message}],
stream=True,
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except AuthenticationError as e:
raise LLMAuthenticationError(f"OpenAI authentication failed: {e.message}")
except RateLimitError as e:
raise LLMRateLimitError(f"OpenAI rate limit exceeded: {e.message}")
except APIConnectionError as e:
raise LLMConnectionError(f"Could not connect to OpenAI: {str(e)}")
except APIError as e:
raise LLMError(
f"OpenAI API error: {e.message}", status_code=e.status_code or 500
)
class AskSageAdapter(LLMAdapter): class AskSageAdapter(LLMAdapter):
"""AskSage API adapter using the official asksageclient SDK.""" """AskSage API adapter using the official asksageclient SDK."""
+301 -10
View File
@@ -1,11 +1,29 @@
import logging import logging
import uuid import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from app.auth import ServiceAuthDependency
from app.config import settings, MAX_MESSAGE_LENGTH from app.config import settings, MAX_MESSAGE_LENGTH
from app.embeddings import RetrieverDependency, IndexerDependency, get_retriever, reset_retriever, get_indexer
from app.embeddings.retriever import RetrievedChunk
from app.intent import IntentClassifierDependency
from app.llm import AdapterDependency, LLMError, llm_exception_to_http from app.llm import AdapterDependency, LLMError, llm_exception_to_http
from app.schemas import ChatRequest, ChatResponse, HealthResponse from app.memory import ConversationMemoryDependency
from app.prompts import build_rag_prompt
from app.schemas import (
ChatRequest,
ChatResponse,
HealthResponse,
IndexResponse,
SourceReference,
StreamChunkEvent,
StreamDoneEvent,
StreamErrorEvent,
)
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@@ -14,27 +32,144 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle management."""
# Startup: auto-index if FAISS index is missing
retriever = get_retriever()
if not retriever.is_loaded:
logger.info("FAISS index not found, building index on startup...")
try:
indexer = get_indexer()
result = await indexer.build_index()
reset_retriever() # Reset to load newly built index
logger.info(f"Startup indexing completed: {result.chunks_indexed} chunks from {result.artifacts_processed} artifacts")
except Exception as e:
logger.error(f"Startup indexing failed: {e}")
else:
logger.info(f"FAISS index loaded: {retriever.index_size} vectors")
yield # App runs here
# Shutdown: nothing to clean up
# Create FastAPI app # Create FastAPI app
app = FastAPI( app = FastAPI(
title="Tyndale AI Service", title="Tyndale AI Service",
description="LLM Chat Service for algorithmic trading support", description="LLM Chat Service for algorithmic trading support",
version="0.1.0", version="0.1.0",
lifespan=lifespan,
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
) )
@app.get("/health", response_model=HealthResponse) @app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse: async def health_check() -> HealthResponse:
"""Health check endpoint.""" """Health check endpoint with FAISS status."""
return HealthResponse(status="ok") retriever = get_retriever()
return HealthResponse(
status="ok",
faiss_loaded=retriever.is_loaded,
index_size=retriever.index_size,
)
@app.get("/debug/search")
async def debug_search(
q: str,
retriever: RetrieverDependency,
) -> dict:
"""Debug endpoint to test retrieval directly."""
chunks = await retriever.search(q)
return {
"query": q,
"chunks_found": len(chunks),
"chunks": [
{
"source_file": c.source_file,
"chunk_type": c.chunk_type,
"score": round(c.score, 4),
"content_preview": c.content[:200] + "..." if len(c.content) > 200 else c.content,
}
for c in chunks
],
}
@app.post("/index", response_model=IndexResponse)
async def reindex_artifacts(
background_tasks: BackgroundTasks,
indexer: IndexerDependency,
_auth: ServiceAuthDependency,
) -> IndexResponse:
"""Trigger re-indexing of YAML artifacts.
Builds FAISS index from all YAML files in the artifacts directory.
"""
logger.info("Starting artifact indexing...")
try:
result = await indexer.build_index()
# Reset retriever to reload new index
reset_retriever()
logger.info(
f"Indexing completed: {result.chunks_indexed} chunks from {result.artifacts_processed} artifacts"
)
return IndexResponse(
status=result.status,
chunks_indexed=result.chunks_indexed,
artifacts_processed=result.artifacts_processed,
)
except Exception as e:
logger.error(f"Indexing failed: {e}")
raise HTTPException(status_code=500, detail=f"Indexing failed: {str(e)}")
def _chunks_to_sources(chunks: list[RetrievedChunk]) -> list[SourceReference]:
"""Convert retrieved chunks to source references."""
return [
SourceReference(
artifact_file=chunk.artifact_file,
source_file=chunk.source_file,
chunk_type=chunk.chunk_type,
relevance_score=chunk.score,
)
for chunk in chunks
]
@app.post("/chat", response_model=ChatResponse) @app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse: async def chat(
"""Process a chat message through the LLM adapter. request: ChatRequest,
adapter: AdapterDependency,
retriever: RetrieverDependency,
memory: ConversationMemoryDependency,
classifier: IntentClassifierDependency,
_auth: ServiceAuthDependency,
) -> ChatResponse:
"""Process a chat message through the RAG pipeline.
- Validates message length - Validates message length
- Generates conversation_id if not provided - Generates conversation_id if not provided
- Routes to appropriate LLM adapter based on LLM_MODE - Classifies intent (codebase/general/clarification)
- Retrieves relevant context from FAISS (for codebase intent)
- Builds RAG prompt and generates response
- Stores conversation turn in Redis
""" """
# Validate message length # Validate message length
if len(request.message) > MAX_MESSAGE_LENGTH: if len(request.message) > MAX_MESSAGE_LENGTH:
@@ -47,19 +182,42 @@ async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse
# Generate or use provided conversation_id # Generate or use provided conversation_id
conversation_id = request.conversation_id or str(uuid.uuid4()) conversation_id = request.conversation_id or str(uuid.uuid4())
# Log request metadata (not content) # Get conversation history
history = await memory.get_history(conversation_id)
# Classify intent
intent = await classifier.classify(request.message, history)
# Log request metadata
logger.info( logger.info(
"Chat request received", "Chat request received",
extra={ extra={
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message_length": len(request.message), "message_length": len(request.message),
"mode": settings.llm_mode, "mode": settings.llm_mode,
"intent": intent,
}, },
) )
# Retrieve context for codebase questions
chunks: list[RetrievedChunk] = []
if intent == "codebase":
chunks = await retriever.search(request.message)
logger.debug(f"Retrieved {len(chunks)} chunks for codebase question")
# Build RAG prompt
system_prompt, user_content = build_rag_prompt(
user_message=request.message,
intent=intent,
chunks=chunks,
history=history,
)
# Generate response with exception handling # Generate response with exception handling
try: try:
response_text = await adapter.generate(conversation_id, request.message) # For RAG, we pass the full constructed prompt
full_prompt = f"{system_prompt}\n\n{user_content}"
response_text = await adapter.generate(conversation_id, full_prompt)
except LLMError as e: except LLMError as e:
logger.error( logger.error(
"LLM generation failed", "LLM generation failed",
@@ -71,6 +229,15 @@ async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse
) )
raise llm_exception_to_http(e) raise llm_exception_to_http(e)
# Store conversation turn
sources = _chunks_to_sources(chunks)
await memory.store_turn(
conversation_id=conversation_id,
user_message=request.message,
assistant_message=response_text,
sources=[s.model_dump() for s in sources] if sources else None,
)
# Log response metadata # Log response metadata
logger.info( logger.info(
"Chat response generated", "Chat response generated",
@@ -78,6 +245,8 @@ async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse
"conversation_id": conversation_id, "conversation_id": conversation_id,
"response_length": len(response_text), "response_length": len(response_text),
"mode": settings.llm_mode, "mode": settings.llm_mode,
"intent": intent,
"sources_count": len(sources),
}, },
) )
@@ -85,7 +254,129 @@ async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse
conversation_id=conversation_id, conversation_id=conversation_id,
response=response_text, response=response_text,
mode=settings.llm_mode, mode=settings.llm_mode,
sources=[], intent=intent,
sources=sources,
)
@app.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
adapter: AdapterDependency,
retriever: RetrieverDependency,
memory: ConversationMemoryDependency,
classifier: IntentClassifierDependency,
_auth: ServiceAuthDependency,
) -> StreamingResponse:
"""Stream a chat response through the RAG pipeline using Server-Sent Events.
- Validates message length
- Generates conversation_id if not provided
- Classifies intent and retrieves context
- Streams response with SSE format
- Stores conversation turn after completion
"""
# Validate message length
if len(request.message) > MAX_MESSAGE_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Message exceeds maximum length of {MAX_MESSAGE_LENGTH:,} characters. "
f"Your message has {len(request.message):,} characters.",
)
# Generate or use provided conversation_id
conversation_id = request.conversation_id or str(uuid.uuid4())
# Get conversation history
history = await memory.get_history(conversation_id)
# Classify intent
intent = await classifier.classify(request.message, history)
# Retrieve context for codebase questions
chunks: list[RetrievedChunk] = []
if intent == "codebase":
chunks = await retriever.search(request.message)
# Build RAG prompt
system_prompt, user_content = build_rag_prompt(
user_message=request.message,
intent=intent,
chunks=chunks,
history=history,
)
sources = _chunks_to_sources(chunks)
# Log request metadata
logger.info(
"Chat stream request received",
extra={
"conversation_id": conversation_id,
"message_length": len(request.message),
"mode": settings.llm_mode,
"intent": intent,
},
)
async def stream_response():
"""Async generator that yields SSE-formatted events."""
full_response = []
try:
full_prompt = f"{system_prompt}\n\n{user_content}"
async for chunk in adapter.generate_stream(conversation_id, full_prompt):
full_response.append(chunk)
event = StreamChunkEvent(content=chunk, conversation_id=conversation_id)
yield f"data: {event.model_dump_json()}\n\n"
# Store conversation turn
response_text = "".join(full_response)
await memory.store_turn(
conversation_id=conversation_id,
user_message=request.message,
assistant_message=response_text,
sources=[s.model_dump() for s in sources] if sources else None,
)
# Send completion event with sources
done_event = StreamDoneEvent(
conversation_id=conversation_id,
mode=settings.llm_mode,
intent=intent,
sources=sources,
)
yield f"data: {done_event.model_dump_json()}\n\n"
logger.info(
"Chat stream completed",
extra={
"conversation_id": conversation_id,
"mode": settings.llm_mode,
"intent": intent,
"sources_count": len(sources),
},
)
except LLMError as e:
logger.error(
"LLM streaming failed",
extra={
"conversation_id": conversation_id,
"error_type": type(e).__name__,
"error_message": e.message,
},
)
error_event = StreamErrorEvent(message=e.message, code=e.status_code)
yield f"data: {error_event.model_dump_json()}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
) )
+9
View File
@@ -0,0 +1,9 @@
"""Memory module for conversation history management."""
from app.memory.conversation import ConversationMemory, get_conversation_memory, ConversationMemoryDependency
__all__ = [
"ConversationMemory",
"get_conversation_memory",
"ConversationMemoryDependency",
]
+122
View File
@@ -0,0 +1,122 @@
"""Conversation history management with in-memory storage."""
import logging
import time
from dataclasses import dataclass, asdict, field
from typing import Annotated
from fastapi import Depends
from app.config import settings
logger = logging.getLogger(__name__)
MAX_HISTORY_MESSAGES = 20
@dataclass
class Message:
"""A single message in conversation history."""
role: str # "user" or "assistant"
content: str
sources: list[dict] | None = None
@dataclass
class ConversationData:
"""Container for conversation messages with timestamp for TTL."""
messages: list[Message] = field(default_factory=list)
last_updated: float = field(default_factory=time.time)
# Module-level storage for conversations
_conversations: dict[str, ConversationData] = {}
class ConversationMemory:
"""Manages conversation history in memory."""
def __init__(self, ttl: int):
"""Initialize conversation memory.
Args:
ttl: Time-to-live in seconds for conversations
"""
self.ttl = ttl
async def get_history(self, conversation_id: str) -> list[Message]:
"""Get conversation history.
Args:
conversation_id: Conversation identifier
Returns:
List of messages in chronological order, or empty list if expired/not found
"""
data = _conversations.get(conversation_id)
if data is None:
return []
# Check if expired
if time.time() - data.last_updated > self.ttl:
del _conversations[conversation_id]
return []
return data.messages
async def store_turn(
self,
conversation_id: str,
user_message: str,
assistant_message: str,
sources: list[dict] | None = None,
) -> None:
"""Store a conversation turn (user message + assistant response).
Args:
conversation_id: Conversation identifier
user_message: User's message
assistant_message: Assistant's response
sources: Optional source references used in response
"""
# Get existing history (checks TTL)
history = await self.get_history(conversation_id)
# Add new messages
history.append(Message(role="user", content=user_message))
history.append(Message(role="assistant", content=assistant_message, sources=sources))
# Trim to max size (keep most recent)
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
# Store with updated timestamp
_conversations[conversation_id] = ConversationData(
messages=history,
last_updated=time.time(),
)
logger.debug(f"Stored conversation turn for {conversation_id}")
async def clear(self, conversation_id: str) -> bool:
"""Clear conversation history.
Args:
conversation_id: Conversation identifier
Returns:
True if cleared successfully
"""
if conversation_id in _conversations:
del _conversations[conversation_id]
return True
def get_conversation_memory() -> ConversationMemory:
"""Get conversation memory instance."""
return ConversationMemory(ttl=settings.conversation_ttl)
ConversationMemoryDependency = Annotated[ConversationMemory, Depends(get_conversation_memory)]
+136
View File
@@ -0,0 +1,136 @@
"""System prompts for RAG-based codebase Q&A."""
from app.embeddings.retriever import RetrievedChunk
from app.memory.conversation import Message
# For codebase questions WITH retrieved context
CODEBASE_SYSTEM_PROMPT = """You answer questions about the Tyndale trading system using ONLY the provided YAML artifacts.
HARD CONSTRAINTS:
- Do NOT assume access to source code
- Do NOT invent implementation details not in the artifacts
- Do NOT speculate about code mechanics beyond what artifacts describe
- If artifacts do not contain enough information, say so explicitly
RESPONSE STYLE:
- Prefer architectural and behavioral explanations over mechanics
- Reference source files by path (e.g., ./trader.py)
- Explain trading concepts for developers without finance background
- Keep responses focused and concise"""
# For general questions (no RAG context)
GENERAL_SYSTEM_PROMPT = """You are an assistant for the Tyndale trading system documentation.
You can answer general questions, but for specific codebase questions, you need artifact context.
If the user asks about specific code without context, ask them to rephrase or be more specific."""
# For clarification/follow-ups (uses conversation history)
CLARIFICATION_SYSTEM_PROMPT = """You are continuing a conversation about the Tyndale trading system.
Use the conversation history to answer follow-up questions.
If you need to look up new information, ask the user to rephrase as a standalone question."""
def select_system_prompt(intent: str, has_context: bool) -> str:
"""Select appropriate system prompt based on intent and context.
Args:
intent: Classified intent (codebase, general, clarification)
has_context: Whether RAG context was retrieved
Returns:
System prompt string
"""
if intent == "codebase" and has_context:
return CODEBASE_SYSTEM_PROMPT
elif intent == "codebase" and not has_context:
return CODEBASE_SYSTEM_PROMPT + "\n\nNOTE: No relevant artifacts were found for this question. Acknowledge this limitation in your response."
elif intent == "clarification":
return CLARIFICATION_SYSTEM_PROMPT
else:
return GENERAL_SYSTEM_PROMPT
def format_context(chunks: list[RetrievedChunk]) -> str:
"""Format retrieved chunks as context for the LLM.
Args:
chunks: List of retrieved chunks
Returns:
Formatted context string
"""
if not chunks:
return ""
context_parts = ["## Retrieved Artifact Context\n"]
for i, chunk in enumerate(chunks, 1):
context_parts.append(f"### Source {i}: {chunk.source_file} ({chunk.chunk_type})")
context_parts.append(chunk.content)
context_parts.append("") # Empty line separator
return "\n".join(context_parts)
def format_history(history: list[Message], max_messages: int = 10) -> str:
"""Format conversation history for the LLM.
Args:
history: List of conversation messages
max_messages: Maximum messages to include
Returns:
Formatted history string
"""
if not history:
return ""
# Take most recent messages
recent = history[-max_messages:] if len(history) > max_messages else history
history_parts = ["## Conversation History\n"]
for msg in recent:
role = "User" if msg.role == "user" else "Assistant"
history_parts.append(f"**{role}**: {msg.content}")
history_parts.append("")
return "\n".join(history_parts)
def build_rag_prompt(
user_message: str,
intent: str,
chunks: list[RetrievedChunk],
history: list[Message],
) -> tuple[str, str]:
"""Build complete RAG prompt with system message and user content.
Args:
user_message: Current user message
intent: Classified intent
chunks: Retrieved context chunks
history: Conversation history
Returns:
Tuple of (system_prompt, user_content)
"""
# Select system prompt
system_prompt = select_system_prompt(intent, bool(chunks))
# Build user content
parts = []
# Add context if available
if chunks:
parts.append(format_context(chunks))
# Add history for clarification intent
if intent == "clarification" and history:
parts.append(format_history(history))
# Add current question
parts.append(f"## Current Question\n{user_message}")
user_content = "\n\n".join(parts)
return system_prompt, user_content
+54 -1
View File
@@ -3,6 +3,15 @@ from typing import Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
class SourceReference(BaseModel):
"""Reference to a source artifact chunk used in RAG response."""
artifact_file: str = Field(..., description="Path to the YAML artifact file")
source_file: str = Field(..., description="Path to the source code file")
chunk_type: str = Field(..., description="Type of chunk (factual_summary, interpretive_summary, method, invariants)")
relevance_score: float = Field(..., description="Similarity score from FAISS search")
class ChatRequest(BaseModel): class ChatRequest(BaseModel):
"""Request model for the /chat endpoint.""" """Request model for the /chat endpoint."""
@@ -18,16 +27,60 @@ class ChatResponse(BaseModel):
conversation_id: str = Field(..., description="Conversation ID (generated if not provided)") conversation_id: str = Field(..., description="Conversation ID (generated if not provided)")
response: str = Field(..., description="The LLM's response") response: str = Field(..., description="The LLM's response")
mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used") mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used")
sources: list = Field(default_factory=list, description="Source references (empty for now)") intent: Literal["codebase", "general", "clarification"] = Field(
default="general", description="Classified intent of the user message"
)
sources: list[SourceReference] = Field(default_factory=list, description="Source artifact references used in response")
class HealthResponse(BaseModel): class HealthResponse(BaseModel):
"""Response model for the /health endpoint.""" """Response model for the /health endpoint."""
status: str = Field(default="ok") status: str = Field(default="ok")
faiss_loaded: bool = Field(default=False, description="Whether FAISS index is loaded")
index_size: int = Field(default=0, description="Number of chunks in FAISS index")
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
"""Standard error response model.""" """Standard error response model."""
detail: str = Field(..., description="Error description") detail: str = Field(..., description="Error description")
# --- SSE Streaming Event Models ---
class StreamChunkEvent(BaseModel):
"""SSE event for content chunks during streaming."""
type: Literal["chunk"] = "chunk"
content: str = Field(..., description="Content chunk from the LLM")
conversation_id: str = Field(..., description="Conversation ID")
class StreamDoneEvent(BaseModel):
"""SSE event signaling completion of streaming."""
type: Literal["done"] = "done"
conversation_id: str = Field(..., description="Conversation ID")
mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used")
intent: Literal["codebase", "general", "clarification"] = Field(
default="general", description="Classified intent of the user message"
)
sources: list[SourceReference] = Field(default_factory=list, description="Source artifact references used in response")
class IndexResponse(BaseModel):
"""Response model for the /index endpoint."""
status: str = Field(..., description="Indexing status")
chunks_indexed: int = Field(default=0, description="Number of chunks indexed")
artifacts_processed: int = Field(default=0, description="Number of YAML files processed")
class StreamErrorEvent(BaseModel):
"""SSE event for errors during streaming."""
type: Literal["error"] = "error"
message: str = Field(..., description="Error message")
code: int = Field(default=500, description="HTTP status code")
+65
View File
@@ -0,0 +1,65 @@
source_file: exch_live/__init__.py
schema_version: v1.1
factual_summary: >
Defines shared utilities and base behavior for exchange integrations,
including standardized order string formatting and basic exchange
health/state tracking used across concrete exchange implementations.
methods:
- name: order_str
signature: "(inst, order_dict, eOid) -> str"
description: >
Formats a human-readable string representation of an order, including
side, quantity, symbol, price, exchange name, and external order ID.
- name: all_up
signature: "() -> bool"
description: >
Returns whether all tracked exchange subsystems are currently marked
as operational.
- name: set_up
signature: "(kind) -> None"
description: >
Marks a specific exchange subsystem as operational and logs the updated
exchange state, including any remaining subsystems that are not yet up.
interpretive_summary: >
Provides a minimal base abstraction for exchange integrations, centralizing
shared concerns such as operational state tracking and consistent logging.
This base class deliberately avoids order placement or strategy logic,
serving instead as a foundation that concrete exchange implementations
can extend while preserving uniform health semantics and observability.
invariants:
- Exchange operational state is represented consistently via string values
- An exchange is considered fully operational only when all subsystems are up
- Logging reflects partial readiness when subsystems are still initializing
tags:
domain:
- exchange_integration
- monitoring
trading_function:
- order_execution
strategy_layer:
- execution
system_layer:
- engine
intent:
- abstraction
- observability
data_type:
- orders
- configs
risk:
- latency
- data_corruption
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: llm_assisted
confidence_score: 0.70
+108
View File
@@ -0,0 +1,108 @@
source_file: exch_live/binance.py
schema_version: v1.1
factual_summary: >
Provides asynchronous utilities for discovering available perpetual futures
contracts, normalizing symbol representations, and streaming real-time order
book depth and trade data from a futures exchange via REST and websocket
interfaces. The module coordinates contract discovery, websocket subscription,
message parsing, and queue-based data distribution.
methods:
- name: propose_contracts
signature: "() -> list"
description: >
Retrieves metadata for all available perpetual futures contracts from the
exchange, filters for actively trading instruments, normalizes symbol names,
and returns the resulting contract list for downstream consumption.
- name: get_depths
signature: "(qDepth: asyncio.Queue, qOut: asyncio.Queue, sendRaw: bool) -> None"
description: >
Continuously fetches order book depth snapshots for symbols received via an
asyncio queue and forwards either raw or processed depth data to an output
queue depending on configuration.
- name: run_ws
signature: "(contract_data: list, qDepth: asyncio.Queue, qOut: asyncio.Queue, sendRaw: bool) -> None"
description: >
Establishes and maintains websocket subscriptions for depth and trade streams
for a set of contracts, receives and parses streaming messages, and forwards
structured data to downstream consumers with basic timeout handling.
- name: get_normalized_symbols
signature: "(cd: list) -> list"
description: >
Extracts and returns the standardized symbol identifiers from a list of
contract metadata objects.
- name: get_native_symbols
signature: "(cd: list) -> list"
description: >
Extracts and returns the exchange-native symbol identifiers from a list of
contract metadata objects.
- name: price_precision_from_ticksize
signature: "(ts: str) -> int"
description: >
Derives the price precision implied by a tick size representation.
- name: get_price_precision_map
signature: "(cd: list) -> dict"
description: >
Builds a mapping from symbol to price precision based on exchange-provided
price filter metadata.
- name: get_min_qty_map
signature: "(cd: list) -> dict"
description: >
Builds a mapping from symbol to minimum tradable quantity using exchange
contract metadata.
- name: main
signature: "(runWriter: bool, contracts: list, args) -> None"
description: >
Orchestrates contract selection, data stream initialization, and optional
downstream writing or processing behavior based on runtime configuration.
interpretive_summary: >
Acts as a market data ingestion and normalization layer for perpetual futures
trading, abstracting away exchange-specific REST and websocket mechanics.
This module is intentionally focused on data acquisition and standardization,
avoiding any trading or decision logic so that downstream components can rely
on consistent, exchange-agnostic market data streams.
invariants:
- Only actively trading perpetual contracts are streamed
- Symbol normalization is applied consistently across all downstream consumers
- Market data is distributed via asynchronous queues to avoid tight coupling
- Websocket reconnection logic preserves the original contract set
tags:
domain:
- market_data
- exchange_integration
trading_function:
- data_ingest
- feature_engineering
strategy_layer:
- signal
system_layer:
- worker
intent:
- abstraction
- isolation
- observability
data_type:
- raw_market_ticks
- aggregated_candles
risk:
- latency
- data_corruption
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: manual_from_docs
confidence_score: 0.75
+104
View File
@@ -0,0 +1,104 @@
source_file: exch_live/btcc.py
schema_version: v1.1
factual_summary: >
Implements the exchange integration for the BTCC perpetual futures exchange.
This module manages authentication, HTTP and websocket connectivity, order
lifecycle handling, and synchronization of positions and execution state
between the exchange and the trading system. It extends shared exchange
behavior from a base exchange abstraction.
methods:
- name: CustomAdapter.process
signature: "(msg, kwargs) -> str"
description: >
Modifies log messages by applying a standardized prefix to improve
identification of BTCC-related log output.
- name: sympathToSymbol
signature: "(sympath: str) -> str"
description: >
Converts BTCC-specific symbol path representations into standardized
symbol identifiers used internally by the trading system.
- name: volumesListToLeverage
signature: "(vl: str) -> int"
description: >
Extracts leverage information from a volume list string provided by
the exchange.
- name: __init__
signature: "(trader, exchName) -> None"
description: >
Initializes the BTCC exchange instance, sets up internal state tracking,
and schedules asynchronous initialization tasks.
- name: up
signature: "() -> None"
description: >
Establishes authenticated websocket and HTTP sessions, initializes
exchange state, retrieves position information, and synchronizes
positions with the trader component.
- name: run
signature: "() -> None"
description: >
Coordinates asynchronous processing of websocket messages and queued
tasks, ensuring proper handling once all exchange subsystems are ready.
- name: send_order
signature: "(instr, side, price, qty, isHedge=0) -> None"
description: >
Submits a new order to the exchange and tracks its lifecycle using
exchange-provided order identifiers.
- name: cancel_order
signature: "(sym, eOid) -> None"
description: >
Cancels an existing order on the exchange using the exchange order ID.
interpretive_summary: >
Serves as the concrete execution-layer integration for the BTCC exchange,
encapsulating all exchange-specific protocols, state management, and
authentication details. This module deliberately isolates exchange mechanics
from trading strategy logic, allowing the broader system to interact with
BTCC through a consistent interface while reducing coupling and operational
risk across exchanges.
invariants:
- Exchange connectivity must be fully established before order processing
- Exchange order IDs are the source of truth for order lifecycle tracking
- Internal state reflects readiness of user, trade, position, and instrument subsystems
- Symbol normalization is applied consistently across all exchange interactions
tags:
domain:
- exchange_integration
- order_execution
trading_function:
- order_execution
- position_management
strategy_layer:
- execution
system_layer:
- engine
intent:
- abstraction
- isolation
- fault_tolerance
- observability
data_type:
- orders
- positions
- configs
risk:
- capital_loss
- latency
- exchange_ban
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: manual_from_docs
confidence_score: 0.78
+92
View File
@@ -0,0 +1,92 @@
source_file: exch_live/bullish.py
schema_version: v1.1
factual_summary: >
Implements the exchange integration for the Bullish exchange, providing
authenticated access via HTTP and private WebSocket connections. This module
manages authentication using HMAC and JWT tokens, establishes exchange
connectivity, retrieves position data, and maintains readiness state for
downstream trading operations.
methods:
- name: __init__
signature: "(trader, exchName) -> None"
description: >
Initializes the Bullish exchange instance, configures authentication
parameters, and prepares internal state for asynchronous startup.
- name: up
signature: "() -> None"
description: >
Retrieves an authentication token, establishes a private WebSocket
connection, and fetches initial position data from the exchange.
- name: run
signature: "() -> None"
description: >
Waits for all required exchange subsystems to be fully initialized before
allowing further processing or interaction.
- name: get_jwt_token
signature: "() -> str"
description: >
Generates and retrieves a JWT token using HMAC-based authentication for
authorized access to the Bullish API.
- name: http_get
signature: "(url: str) -> dict"
description: >
Performs an asynchronous HTTP GET request to the specified API endpoint
and returns the parsed JSON response.
- name: all_up
signature: "() -> bool"
description: >
Determines whether all exchange components, including connectivity and
position state, are fully initialized and operational.
interpretive_summary: >
Serves as the execution-layer adapter for the Bullish exchange, encapsulating
all exchange-specific authentication, connectivity, and state initialization
logic. This module intentionally isolates Bullish protocol details from the
rest of the trading system, enabling consistent exchange interaction while
reducing coupling and simplifying multi-exchange support.
invariants:
- Authentication must be completed successfully before private data access
- Position data must be synchronized before the exchange is considered ready
- Trading operations are gated on full exchange readiness
- Exchange-specific credentials are handled exclusively within this module
tags:
domain:
- exchange_integration
- order_execution
trading_function:
- order_execution
- position_management
strategy_layer:
- execution
system_layer:
- engine
intent:
- abstraction
- isolation
- safety
- observability
data_type:
- orders
- positions
- configs
risk:
- capital_loss
- latency
- exchange_ban
- security
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: manual_from_docs
confidence_score: 0.76
+109
View File
@@ -0,0 +1,109 @@
# artifacts/exchange/bybit_exchange.yaml
source_file: exch_live/bybit.py
schema_version: v1.1
factual_summary: >
Implements the exchange integration for the Bybit exchange, supporting
authenticated HTTP and websocket connectivity for order, trade, position,
and instrument data. This module manages exchange state tracking, order
lifecycle mapping, rate limit observability, and synchronization of execution
and position information with the trading system.
methods:
- name: gen_signature
signature: "(param_str: str) -> str"
description: >
Generates an HMAC SHA256 signature for authenticated API requests using
exchange credentials.
- name: parse_and_print_rate_limiting_data
signature: "(d: dict) -> None"
description: >
Parses and logs rate limit usage information provided by the exchange.
- name: __init__
signature: "(trader, exchName, mode) -> None"
description: >
Initializes the Bybit exchange instance, configures operational mode,
prepares websocket endpoints, and initializes internal state tracking.
- name: up
signature: "() -> None"
description: >
Establishes authenticated connections to order and trade websockets and
handles exchange authentication responses.
- name: run
signature: "() -> None"
description: >
Executes the main asynchronous processing loop, monitoring websocket
messages and handling order updates, executions, and cancellations.
- name: send_auth
signature: "(ws) -> None"
description: >
Sends authentication messages to a websocket connection to authorize
private data access.
- name: set_up
signature: "(component: str) -> None"
description: >
Marks a specific exchange subsystem as operational.
- name: all_up
signature: "() -> bool"
description: >
Returns whether all exchange subsystems are fully initialized and ready.
- name: update_pos_for_pos_entry_line
signature: "(posEntry: dict) -> None"
description: >
Updates internal position state using position entry data received from
the exchange.
interpretive_summary: >
Serves as the execution-layer adapter for the Bybit exchange, encapsulating
all exchange-specific authentication, websocket coordination, and order and
position state management. This module deliberately isolates Bybit protocol
details from strategy and decision logic, enabling consistent multi-exchange
execution while maintaining clear readiness and lifecycle semantics.
invariants:
- Exchange subsystems must be fully initialized before trading operations proceed
- Exchange order identifiers are consistently mapped to internal order IDs
- Position state reflects the latest exchange-provided execution data
- Authentication is required before processing private websocket messages
tags:
domain:
- exchange_integration
- order_execution
trading_function:
- order_execution
- position_management
strategy_layer:
- execution
system_layer:
- engine
intent:
- abstraction
- isolation
- observability
- fault_tolerance
data_type:
- orders
- positions
- configs
risk:
- capital_loss
- latency
- exchange_ban
- security
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: manual_from_docs
confidence_score: 0.77
+94
View File
@@ -0,0 +1,94 @@
# artifacts/exchange/mexc_exchange.yaml
source_file: exchange/mexc.py
schema_version: v1.1
factual_summary: >
Implements the exchange integration for the MEXC exchange, providing
authenticated REST and websocket connectivity for order execution and
exchange state monitoring. This module manages request signing, connection
setup, instrument discovery, and asynchronous processing of exchange events
using a shared base exchange abstraction.
methods:
- name: sign_rest
signature: "(key, obj) -> MexcSignature"
description: >
Generates a REST API signature for authenticated requests by serializing
request data and producing a signed representation.
- name: sign_ws
signature: "() -> MexcSignature"
description: >
Generates a signature required for authenticated websocket connections
to the exchange.
- name: headers_for_sign
signature: "(s: MexcSignature) -> dict"
description: >
Constructs HTTP headers required for authenticated REST requests using
a previously generated signature.
- name: __init__
signature: "(trader, exchName) -> None"
description: >
Initializes the MEXC exchange instance, configures internal state tracking,
and starts background tasks required for exchange interaction.
- name: up
signature: "() -> None"
description: >
Establishes authenticated websocket and HTTP connections, retrieves
instrument information, and initializes exchange readiness state.
- name: run
signature: "() -> None"
description: >
Monitors websocket streams and asynchronous task queues, processes
incoming exchange events, and logs relevant operational information.
interpretive_summary: >
Serves as the execution-layer adapter for the MEXC exchange, encapsulating
all exchange-specific authentication, request signing, and connectivity
logic. This module isolates MEXC protocol and security requirements from
trading strategy and decision-making components, enabling consistent and
safer multi-exchange execution within the trading system.
invariants:
- Exchange authentication must be completed before private data is processed
- Request signing is required for all authenticated REST and websocket interactions
- Instrument information is fetched before the exchange is considered ready
- Exchange readiness state reflects user, trade, position, and instrument subsystems
tags:
domain:
- exchange_integration
- order_execution
trading_function:
- order_execution
- position_management
strategy_layer:
- execution
system_layer:
- engine
intent:
- abstraction
- isolation
- security
- observability
data_type:
- orders
- positions
- configs
risk:
- capital_loss
- latency
- exchange_ban
- security
maturity:
- production
provenance:
generated_at: 2026-02-02
generated_by: manual_from_docs
confidence_score: 0.77
+69
View File
@@ -0,0 +1,69 @@
source_file: ./live.py
schema_version: v1.1
factual_summary: >
Provides the runtime framework for executing a market making strategy
in either paper or live trading modes. Responsible for initializing
exchanges, loading symbol universes, configuring execution components,
processing real-time market data, and coordinating the continuous
operation of the trading system.
methods:
main() -> None:
description: >
Asynchronous entry point that orchestrates system initialization,
strategy and trader setup, exchange connectivity, optional logging
and metrics configuration, and continuous processing of incoming
market data events.
interpretive_summary: >
Acts as the top-level execution driver for the trading system, wiring
together strategy logic, execution components, and exchange data feeds
into a single long-running process. Designed to support both simulated
and live trading environments while enabling observability and
operational control during runtime.
invariants:
- Trading mode must be explicitly selected before execution begins.
- Exchanges must be initialized prior to processing market data.
- Strategy and trader instances must be fully constructed before
entering the main processing loop.
- Market data processing must run continuously until the process
terminates.
- Logging and metrics configuration must not interfere with execution.
tags:
domain:
- market_data
- strategy_execution
- order_execution
- exchange_integration
trading_function:
- data_ingest
- entry_logic
- exit_logic
strategy_layer:
- execution
system_layer:
- worker
intent:
- orchestration
- isolation
- safety
data_type:
- signals
- orders
- exchanges
risk:
- latency
- data_corruption
- capital_loss
maturity:
- production
+149
View File
@@ -0,0 +1,149 @@
source_file: ./trader.py
schema_version: v1.1
factual_summary: >
Implements the execution component responsible for placing and managing
long and short trades, including associated hedge operations, across
supported exchanges. Maintains order book state, open positions, fees,
and profit and loss tracking while supporting both live trading and
backtesting modes.
methods:
"_getBracket(exch: str, sym: str) -> List[]":
description: >
Retrieves the current bid and ask information for a given symbol
on a specified exchange using maintained order book state.
"startBidding(exch: str, sym: str, ts: float, entryPrice: float or None) -> None":
description: >
Used in the Strat class to update the entry price of a symbol on a particular exchange. Logic for updating the
entry price is contained there.
"stopBidding(exch: str, sym: str, ts: float) -> None":
description: >
Halts the bidding by removing the order from the limitBuy dictionary.
"updateBracket(exch: str, sym: str, bidQuote: float, askQuote: float) -> None":
description: >
Updates the bracketed strings in our brackets dictionary.
"startOffering(exch: str, sym: str, ts: float, entryPrice: float or None) -> None":
description: >
Executes the trades when we are going short.
"stopOffering(exch: str, sym: str, ts: float) -> None":
description: >
Stops the short position.
"logIntradayPnl(val: float) -> None":
description: >
Updates the value of our account to our pnl database
"fpnl() -> None":
description: >
Returns a list of floating profit and loss summaries
"posCounts() -> dict":
description: >
Returns a python dictionary of the total positions we have on each exchange.
"logFee(fee: float) -> None":
description: >
Updates our fees in our database
"posValueAtExchOld(exch: str, sym: str) -> int":
description: >
Calculates the values of the long and short positions.
"fpnlnotional() -> int":
description: >
This method calculates the profit/loss (P&L) based on the notional value of long and short positions for each
unique symbol across different exchanges. It computes the average price of long positions, sums the notional
values of both long and short positions, and then uses these to compute the P&L.
"fillLong(exch: str, sym: str, price: float, quan: float, ts: int, tag: str, feeType: str) -> None":
description: >
The fillLong method handles the process of filling a buy order for a specified asset on an exchange. It updates
the trader's position with the new quantity and calculates the average price if necessary. If there was a short
position, it also calculates the profit/loss from buying back some or all of the shorted amount. Additionally,
it logs the trade, applies fees, and ensures data integrity by asserting non-negative values.
"fillShort(exch: str, sym: str, price: float, quan: float, ts: int, tag: str, feeType: str) -> None":
description: >
The fillShort method handles the process of filling a sell order for a specified asset on an exchange. It updates
the trader's position with the new quantity and calculates the average price if necessary. If there was a long
position, it also calculates the profit/loss from selling some or all of the held amount. Additionally, it logs
the trade, applies fees, and ensures data integrity by asserting non-negative values.
"hedgeLong(exch: str, sym: str, price: float, quan: float, ts: int, force: bool) -> Exchange":
description: >
The hedgeLong method finds an appropriate exchange to hedge a long position by executing a corresponding sell
order. It iterates over possible exchanges, calculates adjusted bid prices based on offsets, and selects the
highest bid price for execution. If no suitable exchange is found without forcing, it forces hedging. The method
ensures that the hedge does not exceed maximum allowed positions and logs relevant information throughout the
process.
"hedgeShort(exch: str, sym: str, price: float, quan: float, ts: int, force: bool) -> Exchange":
description: >
The hedgeShort method finds an appropriate exchange to hedge a short position by executing a corresponding buy
order. It iterates over possible exchanges, calculates adjusted ask prices based on offsets, and selects the
lowest ask price for execution. If no suitable exchange is found without forcing, it forces hedging. The method
ensures that the hedge does not exceed maximum allowed positions and logs relevant information throughout the
process.
"processTrade(exch: str, sym: str, price: float, tradeDir: list) -> None":
description: >
The first half of the processTrade method handles the scenario where a market sell order results in hitting a
limit buy order. It processes the fill, hedges the position, and logs various portfolio and holdings-related
details. The second half of the processTrade method handles the scenario where a market buy order results in
hitting a limit sell order. It processes the fill, hedges the position, and logs various portfolio and
holdings-related details.
interpretive_summary: >
Acts as the execution boundary between strategy decision logic and
exchange interaction, encapsulating trade placement, position tracking,
and operational mode handling. Designed to centralize execution-side
concerns such as fee application, position state, and environment-aware
behavior for live and simulated trading.
invariants:
- Trades must only be executed when trading is explicitly enabled.
- Exchange-specific state must be initialized before order execution.
- Position and profit tracking must remain consistent with executed trades.
- Execution behavior must respect live versus backtest operating modes.
tags:
domain:
- order_execution
- exchange_integration
- pnl_accounting
trading_function:
- entry_logic
- exit_logic
strategy_layer:
- execution
system_layer:
- service
intent:
- abstraction
- safety
- isolation
data_type:
- orders
- positions
- pnl
- exchanges
risk:
- capital_loss
- latency
- data_corruption
maturity:
- production
View File
+95
View File
@@ -0,0 +1,95 @@
source_file: ./strat.py
schema_version: v1.1
factual_summary: >
Implements a market making strategy module responsible for maintaining
top-of-book state, calculating bid and ask prices using threshold-based
logic, validating tradable assets, and coordinating trade logging.
Manages exchange instances, symbol discovery across exchanges, and
runtime strategy state used for order placement and monitoring.
methods:
"registerExchange(exch: str, exchInst: Exchange) -> None":
description: >
Registers an exchange instance under a named key for use by the
strategy during execution.
"setMinQty(exch: str, qtyMap: dict) -> None":
description: >
Updates minimum and maximum trade quantity constraints for a given
exchange using a provided quantity mapping.
"printFloating(ts: float) -> None":
description: >
Emits a formatted runtime summary of strategy activity for logging
and monitoring purposes.
"bestEdgeBid(exch: str, sym: str) -> List[bool, float, bool, str, float]":
description: >
This method is designed to determine whether and where a bid should be placed based on several factors
related to market conditions and strategy rules. Its primary goal of is to calculate the optimal bid price for a
given symbol on a specific exchange, taking into account market positions, hedging strategies, trading limits,
price precision, and fee structures. Returns a collection of decision outputs including bid eligibility status,
relative pricing divergence, long-position participation, selected hedge exchange, and the computed bid price.
"bestEdgeOffer(exch: str, sym: str) -> list":
description: >
The primary goal of bestEdgeOffer is to calculate the optimal offer (ask) price for a given symbol on a specific
exchange, considering market positions, hedging strategies, trading limits, price precision, fee structures.
Returns almost identical items as bestEdgeBid, but on the other side of the order book
"updateTob(exch: str, sym: str, tob: dict, ts: float, force: bool) -> None":
description: >
The primary goal of the updateTob function is to update the top of the order book for quick access to perform
market making operations (bidding and selling) using the highest bid and lowest ask prices.
interpretive_summary: >
Serves as the central coordination unit for the market making strategy,
maintaining shared runtime state across exchanges and assets while
enforcing eligibility rules and operational constraints. Designed to
balance responsiveness with safety through internal validation,
rate limiting counters, and symbol qualification logic.
invariants:
- Only assets validated by runtime eligibility checks may be traded.
- Exchange instances must be registered before strategy execution.
- Top-of-book data must exist before bid or ask calculations occur.
- Rate limiting counters must accurately reflect order activity.
tags:
domain:
- market_data
- strategy_execution
- order_execution
- exchange_integration
trading_function:
- position_sizing
- entry_logic
- exit_logic
strategy_layer:
- execution
- decision
system_layer:
- engine
intent:
- orchestration
- validation
- safety
data_type:
- signals
- orders
- positions
- exchanges
risk:
- capital_loss
- latency
- data_corruption
maturity:
- production
Binary file not shown.
+6
View File
@@ -0,0 +1,6 @@
{
"total_chunks": 33,
"total_artifacts": 3,
"dimensions": 1536,
"index_type": "IndexFlatIP"
}
File diff suppressed because it is too large Load Diff
+2
View File
@@ -8,6 +8,7 @@ click==8.3.1
colorama==0.4.6 colorama==0.4.6
distro==1.9.0 distro==1.9.0
fastapi==0.128.0 fastapi==0.128.0
google-auth>=2.20.0
h11==0.16.0 h11==0.16.0
httpcore==1.0.9 httpcore==1.0.9
httptools==0.7.1 httptools==0.7.1
@@ -31,3 +32,4 @@ urllib3==2.6.3
uvicorn==0.40.0 uvicorn==0.40.0
watchfiles==1.1.1 watchfiles==1.1.1
websockets==16.0 websockets==16.0
faiss-cpu>=1.12.0
+102
View File
@@ -0,0 +1,102 @@
# artifact_schema.yaml
schema_name: trading_code_artifact
schema_version: v1.1
description: >
Defines the structure for derived knowledge artifacts generated from
trading system source code. Artifacts capture factual structure,
interpretive intent, invariants, and validated tags for reasoning
and retrieval by the AI backend.
required_fields:
- source_file
- schema_version
- factual_summary
- tags
optional_fields:
- methods
- interpretive_summary
- invariants
- provenance
field_definitions:
source_file:
type: string
description: Relative path to the source file in the trading repository.
constraints:
- non_empty
- relative_path
schema_version:
type: string
description: Version of the artifact schema used to generate this artifact.
factual_summary:
type: text
description: >
Objective description of what the file or module does.
Focuses on responsibilities and structure, not rationale.
constraints:
- min_length: 40
- max_length: 500
- no_interpretive_language
methods:
type: object
optional: true
description: >
Optional method-level descriptions for classes or modules where
fine-grained behavioral understanding is useful.
method_definition:
signature:
type: string
description:
type: text
constraints:
- no_implementation_details
- max_length: 200
interpretive_summary:
type: text
optional: true
description: >
High-level explanation of why the file exists, what architectural
role it plays, and what tradeoffs or risks it is designed to manage.
constraints:
- min_length: 40
- max_length: 500
- no_code_identifiers
invariants:
type: list
optional: true
description: >
Conditions that must always hold true for correct behavior.
item_constraints:
- type: string
- min_length: 10
tags:
type: object
description: >
Classification metadata for the artifact. All tag dimensions
and values must conform to the backend tag schema.
constraints:
- validated_against_tag_schema
provenance:
type: object
optional: true
fields:
generated_at:
type: datetime
generated_by:
type: string
source_commit:
type: string
confidence_score:
type: float
constraints:
- min: 0.0
- max: 1.0
+60
View File
@@ -0,0 +1,60 @@
# tag_schema.yaml
schema_version: v1
dimensions:
domain:
- market_data
- signal_generation
- strategy_execution
- risk_management
- order_execution
- exchange_integration
- pnl_accounting
trading_function:
- data_ingest
- alpha_generation
- position_sizing
- entry_logic
- exit_logic
strategy_layer:
- signal
- decision
- execution
- evaluation
system_layer:
- api
- service
- engine
- worker
- persistence
intent:
- orchestration
- abstraction
- isolation
- validation
- optimization
- safety
data_type:
- signals
- orders
- positions
- pnl
- configs
- exchanges
risk:
- capital_loss
- liquidation
- latency
- data_corruption
- security
- hedging
maturity:
- experimental
- production