Compare commits

..

No commits in common. "de642efd87908f4e23bcb7d8982c3a3e09ddb990" and "f497fde153faa2af9e5caf82c808e00c3989cc54" have entirely different histories.

32 changed files with 11 additions and 3453 deletions

View File

@ -1,9 +1,6 @@
# LLM Mode: "local", "remote", "openai", or "asksage" # LLM Mode: "local", "remote", "openai", or "asksage"
LLM_MODE=local LLM_MODE=local
# CORS Configuration (comma-separated origins for frontend access)
CORS_ORIGINS=http://localhost:3000
# Remote LLM Configuration (required if LLM_MODE=remote) # Remote LLM Configuration (required if LLM_MODE=remote)
LLM_REMOTE_URL=https://your-llm-service.com/generate LLM_REMOTE_URL=https://your-llm-service.com/generate
LLM_REMOTE_TOKEN= LLM_REMOTE_TOKEN=

View File

@ -9,9 +9,6 @@ RUN pip install --no-cache-dir -r requirements.txt
# Copy application code # Copy application code
COPY app/ ./app/ COPY app/ ./app/
# Copy YAML artifacts for RAG indexing
COPY artifacts/ ./artifacts/
# Default model (API key must be passed at runtime for security) # Default model (API key must be passed at runtime for security)
ENV OPENAI_MODEL=gpt-4o-mini ENV OPENAI_MODEL=gpt-4o-mini

View File

@ -1,114 +0,0 @@
"""GCP Cloud Run service-to-service authentication module."""
import logging
from typing import Annotated
from fastapi import Depends, HTTPException, Request
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token
from app.config import settings
logger = logging.getLogger(__name__)
def verify_gcp_identity_token(token: str, audience: str) -> dict:
"""Verify a GCP identity token and return the decoded claims.
Args:
token: The identity token to verify.
audience: The expected audience (backend Cloud Run URL).
Returns:
The decoded token claims.
Raises:
ValueError: If the token is invalid or verification fails.
"""
try:
claims = id_token.verify_oauth2_token(
token,
google_requests.Request(),
audience=audience,
)
return claims
except Exception as e:
logger.warning(f"Token verification failed: {e}")
raise ValueError(f"Token verification failed: {e}")
async def verify_service_auth(request: Request) -> dict | None:
"""FastAPI dependency to verify GCP service-to-service authentication.
Returns None if auth is disabled (local dev), otherwise verifies the
identity token and checks the service account allowlist.
Returns:
The decoded token claims, or None if auth is disabled.
Raises:
HTTPException: 401 if authentication fails.
"""
# Skip auth if disabled (local development)
if not settings.auth_enabled:
logger.debug("Authentication disabled, skipping verification")
return None
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
logger.warning("Missing Authorization header")
raise HTTPException(
status_code=401,
detail="Missing Authorization header",
)
if not auth_header.startswith("Bearer "):
logger.warning("Invalid Authorization header format")
raise HTTPException(
status_code=401,
detail="Invalid Authorization header format. Expected 'Bearer <token>'",
)
token = auth_header[7:] # Remove "Bearer " prefix
# Verify the token
if not settings.auth_audience:
logger.error("AUTH_AUDIENCE not configured")
raise HTTPException(
status_code=500,
detail="Server authentication not properly configured",
)
try:
claims = verify_gcp_identity_token(token, settings.auth_audience)
except ValueError as e:
logger.warning(f"Token verification failed: {e}")
raise HTTPException(
status_code=401,
detail="Invalid or expired token",
)
# Check service account allowlist if configured
allowed_accounts = settings.allowed_service_accounts_list
if allowed_accounts:
email = claims.get("email", "")
if email not in allowed_accounts:
logger.warning(
f"Service account '{email}' not in allowlist",
extra={"allowed": allowed_accounts},
)
raise HTTPException(
status_code=403,
detail="Service account not authorized",
)
logger.info(
"Service authentication successful",
extra={"service_account": claims.get("email", "unknown")},
)
return claims
# Type alias for clean dependency injection
ServiceAuthDependency = Annotated[dict | None, Depends(verify_service_auth)]

View File

@ -19,36 +19,6 @@ class Settings(BaseSettings):
asksage_api_key: str = "" asksage_api_key: str = ""
asksage_model: str = "gpt-4o" asksage_model: str = "gpt-4o"
# CORS configuration
cors_origins: str = "http://localhost:3000"
# Conversation memory configuration
conversation_ttl: int = 86400 # 24 hours
# Embedding configuration
embedding_model: str = "text-embedding-3-small"
# RAG configuration
rag_top_k: int = 5
rag_similarity_threshold: float = 0.70
artifacts_path: str = "artifacts"
embeddings_path: str = "embeddings"
# Authentication settings
auth_enabled: bool = False # Set to True in production
auth_audience: str = "" # Backend Cloud Run URL (e.g., https://backend-xxx.run.app)
allowed_service_accounts: str = "" # Comma-separated list of allowed service account emails
@property
def cors_origins_list(self) -> list[str]:
"""Parse comma-separated CORS origins into a list."""
return [origin.strip() for origin in self.cors_origins.split(",") if origin.strip()]
@property
def allowed_service_accounts_list(self) -> list[str]:
"""Parse comma-separated allowed service accounts into a list."""
return [sa.strip() for sa in self.allowed_service_accounts.split(",") if sa.strip()]
class Config: class Config:
env_file = ".env" env_file = ".env"
env_file_encoding = "utf-8" env_file_encoding = "utf-8"

View File

@ -1,18 +0,0 @@
"""Embeddings module for YAML artifact indexing and retrieval."""
from app.embeddings.client import EmbeddingClient, get_embedding_client, EmbeddingClientDependency
from app.embeddings.indexer import ArtifactIndexer, get_indexer, IndexerDependency
from app.embeddings.retriever import Retriever, get_retriever, RetrieverDependency, reset_retriever
__all__ = [
"EmbeddingClient",
"get_embedding_client",
"EmbeddingClientDependency",
"ArtifactIndexer",
"get_indexer",
"IndexerDependency",
"Retriever",
"get_retriever",
"RetrieverDependency",
"reset_retriever",
]

View File

@ -1,109 +0,0 @@
"""OpenAI embedding client wrapper."""
import logging
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from openai import AsyncOpenAI, AuthenticationError, RateLimitError, APIConnectionError, APIError
from app.config import settings
logger = logging.getLogger(__name__)
class EmbeddingError(Exception):
"""Base exception for embedding operations."""
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(message)
class EmbeddingClient:
"""Async wrapper for OpenAI embeddings API."""
def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
"""Initialize the embedding client.
Args:
api_key: OpenAI API key
model: Embedding model identifier
"""
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
self.dimensions = 1536 # text-embedding-3-small dimension
async def embed(self, text: str) -> list[float]:
"""Generate embedding for a single text.
Args:
text: Text to embed
Returns:
Embedding vector (1536 dimensions)
Raises:
EmbeddingError: If embedding generation fails
"""
try:
response = await self.client.embeddings.create(
model=self.model,
input=text,
)
return response.data[0].embedding
except AuthenticationError as e:
raise EmbeddingError(f"OpenAI authentication failed: {e.message}", 401)
except RateLimitError as e:
raise EmbeddingError(f"OpenAI rate limit exceeded: {e.message}", 429)
except APIConnectionError as e:
raise EmbeddingError(f"Could not connect to OpenAI: {str(e)}", 503)
except APIError as e:
raise EmbeddingError(f"OpenAI API error: {e.message}", e.status_code or 500)
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Generate embeddings for multiple texts.
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
Raises:
EmbeddingError: If embedding generation fails
"""
if not texts:
return []
try:
response = await self.client.embeddings.create(
model=self.model,
input=texts,
)
# Sort by index to ensure correct ordering
sorted_embeddings = sorted(response.data, key=lambda x: x.index)
return [item.embedding for item in sorted_embeddings]
except AuthenticationError as e:
raise EmbeddingError(f"OpenAI authentication failed: {e.message}", 401)
except RateLimitError as e:
raise EmbeddingError(f"OpenAI rate limit exceeded: {e.message}", 429)
except APIConnectionError as e:
raise EmbeddingError(f"Could not connect to OpenAI: {str(e)}", 503)
except APIError as e:
raise EmbeddingError(f"OpenAI API error: {e.message}", e.status_code or 500)
@lru_cache()
def get_embedding_client() -> EmbeddingClient:
"""Get cached embedding client instance."""
return EmbeddingClient(
api_key=settings.openai_api_key,
model=settings.embedding_model,
)
EmbeddingClientDependency = Annotated[EmbeddingClient, Depends(get_embedding_client)]

View File

@ -1,227 +0,0 @@
"""YAML artifact indexer for building FAISS index."""
import json
import logging
from dataclasses import dataclass, asdict
from functools import lru_cache
from pathlib import Path
from typing import Annotated
import faiss
import numpy as np
import yaml
from fastapi import Depends
from app.config import settings
from app.embeddings.client import EmbeddingClient, get_embedding_client, EmbeddingError
logger = logging.getLogger(__name__)
@dataclass
class Chunk:
"""Represents a text chunk from a YAML artifact."""
chunk_id: str
content: str
chunk_type: str # factual_summary, interpretive_summary, method, invariants
artifact_file: str
source_file: str
tags: dict
@dataclass
class IndexResult:
"""Result of indexing operation."""
chunks_indexed: int
artifacts_processed: int
status: str
class ArtifactIndexer:
"""Parses YAML artifacts and builds FAISS index."""
def __init__(self, embedding_client: EmbeddingClient):
"""Initialize the indexer.
Args:
embedding_client: Client for generating embeddings
"""
self.embedding_client = embedding_client
self.artifacts_path = Path(settings.artifacts_path)
self.embeddings_path = Path(settings.embeddings_path)
self.dimensions = 1536
def _parse_yaml_to_chunks(self, yaml_path: Path) -> list[Chunk]:
"""Parse a YAML artifact file into chunks.
Args:
yaml_path: Path to the YAML file
Returns:
List of chunks extracted from the file
"""
chunks = []
artifact_file = str(yaml_path.relative_to(self.artifacts_path))
try:
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
except Exception as e:
logger.warning(f"Failed to parse {yaml_path}: {e}")
return chunks
if not data:
return chunks
source_file = data.get("source_file", "unknown")
tags = data.get("tags", {})
# Chunk 1: Factual summary
if factual := data.get("factual_summary"):
chunks.append(Chunk(
chunk_id=f"{artifact_file}::factual_summary",
content=f"[{source_file}] Factual summary: {factual.strip()}",
chunk_type="factual_summary",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk 2: Interpretive summary
if interpretive := data.get("interpretive_summary"):
chunks.append(Chunk(
chunk_id=f"{artifact_file}::interpretive_summary",
content=f"[{source_file}] Interpretive summary: {interpretive.strip()}",
chunk_type="interpretive_summary",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk per method
if methods := data.get("methods"):
for method_sig, method_data in methods.items():
description = method_data.get("description", "") if isinstance(method_data, dict) else method_data
if description:
chunks.append(Chunk(
chunk_id=f"{artifact_file}::method::{method_sig}",
content=f"[{source_file}] Method {method_sig}: {description.strip()}",
chunk_type="method",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
# Chunk for invariants (combined)
if invariants := data.get("invariants"):
invariants_text = " ".join(f"- {inv}" for inv in invariants)
chunks.append(Chunk(
chunk_id=f"{artifact_file}::invariants",
content=f"[{source_file}] Invariants: {invariants_text}",
chunk_type="invariants",
artifact_file=artifact_file,
source_file=source_file,
tags=tags,
))
return chunks
def _collect_all_chunks(self) -> list[Chunk]:
"""Collect chunks from all YAML artifacts.
Returns:
List of all chunks from all artifacts
"""
all_chunks = []
for yaml_path in self.artifacts_path.rglob("*.yaml"):
chunks = self._parse_yaml_to_chunks(yaml_path)
all_chunks.extend(chunks)
logger.debug(f"Parsed {len(chunks)} chunks from {yaml_path}")
return all_chunks
async def build_index(self) -> IndexResult:
"""Build FAISS index from all YAML artifacts.
Returns:
IndexResult with statistics
Raises:
EmbeddingError: If embedding generation fails
"""
# Collect all chunks
chunks = self._collect_all_chunks()
if not chunks:
logger.warning("No chunks found in artifacts")
return IndexResult(
chunks_indexed=0,
artifacts_processed=0,
status="no_artifacts",
)
logger.info(f"Generating embeddings for {len(chunks)} chunks...")
# Generate embeddings in batches
batch_size = 100
all_embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [chunk.content for chunk in batch]
embeddings = await self.embedding_client.embed_batch(texts)
all_embeddings.extend(embeddings)
logger.debug(f"Embedded batch {i // batch_size + 1}")
# Build FAISS index (IndexFlatIP for inner product / cosine similarity on normalized vectors)
embeddings_array = np.array(all_embeddings, dtype=np.float32)
# Normalize for cosine similarity
faiss.normalize_L2(embeddings_array)
index = faiss.IndexFlatIP(self.dimensions)
index.add(embeddings_array)
# Create embeddings directory if needed
self.embeddings_path.mkdir(parents=True, exist_ok=True)
# Save FAISS index
faiss.write_index(index, str(self.embeddings_path / "faiss_index.bin"))
# Save metadata
metadata = {
"chunks": [asdict(chunk) for chunk in chunks],
}
with open(self.embeddings_path / "metadata.json", "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2)
# Save index info
artifact_files = set(chunk.artifact_file for chunk in chunks)
index_info = {
"total_chunks": len(chunks),
"total_artifacts": len(artifact_files),
"dimensions": self.dimensions,
"index_type": "IndexFlatIP",
}
with open(self.embeddings_path / "index_info.json", "w", encoding="utf-8") as f:
json.dump(index_info, f, indent=2)
logger.info(f"Indexed {len(chunks)} chunks from {len(artifact_files)} artifacts")
return IndexResult(
chunks_indexed=len(chunks),
artifacts_processed=len(artifact_files),
status="completed",
)
@lru_cache()
def get_indexer() -> ArtifactIndexer:
"""Get cached indexer instance."""
return ArtifactIndexer(embedding_client=get_embedding_client())
IndexerDependency = Annotated[ArtifactIndexer, Depends(get_indexer)]

View File

@ -1,221 +0,0 @@
"""FAISS-based retrieval with adaptive selection."""
import json
import logging
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Annotated
import faiss
import numpy as np
from fastapi import Depends
from app.config import settings
from app.embeddings.client import EmbeddingClient, get_embedding_client
logger = logging.getLogger(__name__)
@dataclass
class RetrievedChunk:
"""A chunk retrieved from FAISS search."""
chunk_id: str
content: str
chunk_type: str
artifact_file: str
source_file: str
tags: dict
score: float
class Retriever:
"""FAISS-based retriever with adaptive selection logic."""
def __init__(self, embedding_client: EmbeddingClient):
"""Initialize the retriever.
Args:
embedding_client: Client for generating query embeddings
"""
self.embedding_client = embedding_client
self.embeddings_path = Path(settings.embeddings_path)
self.top_k = settings.rag_top_k
self.threshold = settings.rag_similarity_threshold
self._index: faiss.IndexFlatIP | None = None
self._metadata: list[dict] | None = None
self._loaded = False
def load_index(self) -> bool:
"""Load FAISS index and metadata from disk.
Returns:
True if successfully loaded, False otherwise
"""
index_path = self.embeddings_path / "faiss_index.bin"
metadata_path = self.embeddings_path / "metadata.json"
if not index_path.exists() or not metadata_path.exists():
logger.warning("FAISS index or metadata not found. Run /index first.")
self._loaded = False
return False
try:
self._index = faiss.read_index(str(index_path))
with open(metadata_path, "r", encoding="utf-8") as f:
data = json.load(f)
self._metadata = data.get("chunks", [])
self._loaded = True
logger.info(f"Loaded FAISS index with {self._index.ntotal} vectors")
return True
except Exception as e:
logger.error(f"Failed to load FAISS index: {e}")
self._loaded = False
return False
@property
def is_loaded(self) -> bool:
"""Check if index is loaded."""
return self._loaded and self._index is not None
@property
def index_size(self) -> int:
"""Get number of vectors in index."""
if self._index is None:
return 0
return self._index.ntotal
def _adaptive_select(
self,
indices: np.ndarray,
scores: np.ndarray,
) -> list[tuple[int, float]]:
"""Apply adaptive selection logic.
- Always include top 2 chunks (regardless of score)
- For chunks 3-5: apply threshold
- Limit to self.top_k chunks total
Args:
indices: FAISS result indices
scores: FAISS result scores
Returns:
List of (index, score) tuples for selected chunks
"""
selected = []
for i, (idx, score) in enumerate(zip(indices, scores)):
if idx == -1: # FAISS returns -1 for no match
continue
# Always take top 2
if i < 2:
selected.append((int(idx), float(score)))
# Apply threshold for remaining
elif score >= self.threshold and len(selected) < self.top_k:
selected.append((int(idx), float(score)))
return selected
def _apply_diversity_filter(
self,
candidates: list[tuple[int, float]],
max_per_artifact: int = 2,
) -> list[tuple[int, float]]:
"""Limit chunks per artifact for diversity.
Args:
candidates: List of (index, score) tuples
max_per_artifact: Maximum chunks from same artifact
Returns:
Filtered list of (index, score) tuples
"""
artifact_counts: dict[str, int] = {}
filtered = []
for idx, score in candidates:
chunk = self._metadata[idx]
artifact = chunk["artifact_file"]
if artifact_counts.get(artifact, 0) < max_per_artifact:
filtered.append((idx, score))
artifact_counts[artifact] = artifact_counts.get(artifact, 0) + 1
return filtered
async def search(self, query: str) -> list[RetrievedChunk]:
"""Search for relevant chunks.
Args:
query: User's question
Returns:
List of retrieved chunks with relevance scores
"""
if not self.is_loaded:
if not self.load_index():
return []
# Generate query embedding
query_embedding = await self.embedding_client.embed(query)
query_vector = np.array([query_embedding], dtype=np.float32)
# Normalize for cosine similarity
faiss.normalize_L2(query_vector)
# Search FAISS (get more candidates than needed for filtering)
k_search = min(8, self._index.ntotal)
scores, indices = self._index.search(query_vector, k_search)
# Apply adaptive selection
selected = self._adaptive_select(indices[0], scores[0])
# Apply diversity filter
filtered = self._apply_diversity_filter(selected)
# Build result chunks
results = []
for idx, score in filtered:
chunk_data = self._metadata[idx]
results.append(RetrievedChunk(
chunk_id=chunk_data["chunk_id"],
content=chunk_data["content"],
chunk_type=chunk_data["chunk_type"],
artifact_file=chunk_data["artifact_file"],
source_file=chunk_data["source_file"],
tags=chunk_data.get("tags", {}),
score=score,
))
logger.debug(f"Retrieved {len(results)} chunks for query")
return results
# Singleton retriever instance
_retriever: Retriever | None = None
def get_retriever() -> Retriever:
"""Get singleton retriever instance (lazily initialized)."""
global _retriever
if _retriever is None:
_retriever = Retriever(embedding_client=get_embedding_client())
# Attempt to load index at startup
_retriever.load_index()
return _retriever
def reset_retriever() -> None:
"""Reset the singleton retriever (for reloading after re-indexing)."""
global _retriever
_retriever = None
RetrieverDependency = Annotated[Retriever, Depends(get_retriever)]

View File

@ -1,9 +0,0 @@
"""Intent classification module."""
from app.intent.classifier import IntentClassifier, get_intent_classifier, IntentClassifierDependency
__all__ = [
"IntentClassifier",
"get_intent_classifier",
"IntentClassifierDependency",
]

View File

@ -1,98 +0,0 @@
"""Lightweight intent classification using gpt-4o-mini."""
import logging
from functools import lru_cache
from typing import Annotated, Literal
from fastapi import Depends
from openai import AsyncOpenAI
from app.config import settings
from app.memory.conversation import Message
logger = logging.getLogger(__name__)
Intent = Literal["codebase", "general", "clarification"]
INTENT_PROMPT = """You are classifying questions for a Tyndale trading system documentation assistant.
Classify this user message into one category:
- "codebase": ANY question about trading, strategies, exchanges, orders, positions, risk, execution, hedging, market making, P&L, or how the system works. Also includes questions with "our", "the system", "this", or references to specific functionality.
- "general": ONLY greetings or completely off-topic questions ("How are you?", "What's the weather?", "Hello")
- "clarification": Follow-ups that reference previous answers ("Tell me more", "What did you mean?", "Can you explain that?")
DEFAULT TO "codebase" if uncertain. This is a trading system assistant - assume trading questions are about the codebase.
Respond with ONLY the category name, nothing else."""
class IntentClassifier:
"""Lightweight intent classifier using gpt-4o-mini."""
def __init__(self, api_key: str):
"""Initialize the classifier.
Args:
api_key: OpenAI API key
"""
self.client = AsyncOpenAI(api_key=api_key)
self.model = "gpt-4o-mini"
async def classify(
self,
message: str,
history: list[Message] | None = None,
) -> Intent:
"""Classify user message intent.
Args:
message: User's message
history: Optional conversation history for context
Returns:
Classified intent: "codebase", "general", or "clarification"
"""
# Build context from history (last 2 turns)
context = ""
if history and len(history) >= 2:
recent = history[-4:] # Last 2 exchanges
context = "Recent conversation:\n"
for msg in recent:
role = "User" if msg.role == "user" else "Assistant"
context += f"{role}: {msg.content[:100]}...\n" if len(msg.content) > 100 else f"{role}: {msg.content}\n"
context += "\n"
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": INTENT_PROMPT},
{"role": "user", "content": f"{context}Current message: {message}"},
],
max_tokens=10,
temperature=0,
)
raw_intent = response.choices[0].message.content.strip().lower()
# Validate intent
if raw_intent in ("codebase", "general", "clarification"):
logger.info(f"Intent classified: '{message[:50]}...' -> {raw_intent}")
return raw_intent
# Default to codebase for ambiguous cases (safer for RAG)
logger.warning(f"Unexpected intent response: {raw_intent}, defaulting to codebase")
return "codebase"
except Exception as e:
logger.warning(f"Intent classification failed: {e}, defaulting to codebase")
return "codebase"
@lru_cache()
def get_intent_classifier() -> IntentClassifier:
"""Get cached intent classifier instance."""
return IntentClassifier(api_key=settings.openai_api_key)
IntentClassifierDependency = Annotated[IntentClassifier, Depends(get_intent_classifier)]

View File

@ -3,7 +3,6 @@
import asyncio import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from functools import lru_cache from functools import lru_cache
from collections.abc import AsyncIterator
from typing import Annotated from typing import Annotated
import httpx import httpx
@ -47,27 +46,6 @@ class LLMAdapter(ABC):
""" """
pass pass
async def generate_stream(
self, conversation_id: str, message: str
) -> AsyncIterator[str]:
"""Stream a response for the given message.
Default implementation yields the full response as a single chunk.
Subclasses can override this to provide true streaming.
Args:
conversation_id: The conversation identifier
message: The user's message
Yields:
Response content chunks
Raises:
LLMError: If generation fails for any reason
"""
response = await self.generate(conversation_id, message)
yield response
class LocalAdapter(LLMAdapter): class LocalAdapter(LLMAdapter):
"""Local stub adapter for development and testing.""" """Local stub adapter for development and testing."""
@ -205,46 +183,6 @@ class OpenAIAdapter(LLMAdapter):
f"OpenAI API error: {e.message}", status_code=e.status_code or 500 f"OpenAI API error: {e.message}", status_code=e.status_code or 500
) )
async def generate_stream(
self, conversation_id: str, message: str
) -> AsyncIterator[str]:
"""Stream a response using the OpenAI API.
Args:
conversation_id: The conversation identifier (for future use with context)
message: The user's message
Yields:
Response content chunks
Raises:
LLMAuthenticationError: If API key is invalid
LLMRateLimitError: If rate limit is exceeded
LLMConnectionError: If connection fails
LLMError: For other API errors
"""
try:
stream = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": message}],
stream=True,
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except AuthenticationError as e:
raise LLMAuthenticationError(f"OpenAI authentication failed: {e.message}")
except RateLimitError as e:
raise LLMRateLimitError(f"OpenAI rate limit exceeded: {e.message}")
except APIConnectionError as e:
raise LLMConnectionError(f"Could not connect to OpenAI: {str(e)}")
except APIError as e:
raise LLMError(
f"OpenAI API error: {e.message}", status_code=e.status_code or 500
)
class AskSageAdapter(LLMAdapter): class AskSageAdapter(LLMAdapter):
"""AskSage API adapter using the official asksageclient SDK.""" """AskSage API adapter using the official asksageclient SDK."""

View File

@ -1,29 +1,11 @@
import logging import logging
import uuid import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from app.auth import ServiceAuthDependency
from app.config import settings, MAX_MESSAGE_LENGTH from app.config import settings, MAX_MESSAGE_LENGTH
from app.embeddings import RetrieverDependency, IndexerDependency, get_retriever, reset_retriever, get_indexer
from app.embeddings.retriever import RetrievedChunk
from app.intent import IntentClassifierDependency
from app.llm import AdapterDependency, LLMError, llm_exception_to_http from app.llm import AdapterDependency, LLMError, llm_exception_to_http
from app.memory import ConversationMemoryDependency from app.schemas import ChatRequest, ChatResponse, HealthResponse
from app.prompts import build_rag_prompt
from app.schemas import (
ChatRequest,
ChatResponse,
HealthResponse,
IndexResponse,
SourceReference,
StreamChunkEvent,
StreamDoneEvent,
StreamErrorEvent,
)
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@ -32,144 +14,27 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle management."""
# Startup: auto-index if FAISS index is missing
retriever = get_retriever()
if not retriever.is_loaded:
logger.info("FAISS index not found, building index on startup...")
try:
indexer = get_indexer()
result = await indexer.build_index()
reset_retriever() # Reset to load newly built index
logger.info(f"Startup indexing completed: {result.chunks_indexed} chunks from {result.artifacts_processed} artifacts")
except Exception as e:
logger.error(f"Startup indexing failed: {e}")
else:
logger.info(f"FAISS index loaded: {retriever.index_size} vectors")
yield # App runs here
# Shutdown: nothing to clean up
# Create FastAPI app # Create FastAPI app
app = FastAPI( app = FastAPI(
title="Tyndale AI Service", title="Tyndale AI Service",
description="LLM Chat Service for algorithmic trading support", description="LLM Chat Service for algorithmic trading support",
version="0.1.0", version="0.1.0",
lifespan=lifespan,
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
) )
@app.get("/health", response_model=HealthResponse) @app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse: async def health_check() -> HealthResponse:
"""Health check endpoint with FAISS status.""" """Health check endpoint."""
retriever = get_retriever() return HealthResponse(status="ok")
return HealthResponse(
status="ok",
faiss_loaded=retriever.is_loaded,
index_size=retriever.index_size,
)
@app.get("/debug/search")
async def debug_search(
q: str,
retriever: RetrieverDependency,
) -> dict:
"""Debug endpoint to test retrieval directly."""
chunks = await retriever.search(q)
return {
"query": q,
"chunks_found": len(chunks),
"chunks": [
{
"source_file": c.source_file,
"chunk_type": c.chunk_type,
"score": round(c.score, 4),
"content_preview": c.content[:200] + "..." if len(c.content) > 200 else c.content,
}
for c in chunks
],
}
@app.post("/index", response_model=IndexResponse)
async def reindex_artifacts(
background_tasks: BackgroundTasks,
indexer: IndexerDependency,
_auth: ServiceAuthDependency,
) -> IndexResponse:
"""Trigger re-indexing of YAML artifacts.
Builds FAISS index from all YAML files in the artifacts directory.
"""
logger.info("Starting artifact indexing...")
try:
result = await indexer.build_index()
# Reset retriever to reload new index
reset_retriever()
logger.info(
f"Indexing completed: {result.chunks_indexed} chunks from {result.artifacts_processed} artifacts"
)
return IndexResponse(
status=result.status,
chunks_indexed=result.chunks_indexed,
artifacts_processed=result.artifacts_processed,
)
except Exception as e:
logger.error(f"Indexing failed: {e}")
raise HTTPException(status_code=500, detail=f"Indexing failed: {str(e)}")
def _chunks_to_sources(chunks: list[RetrievedChunk]) -> list[SourceReference]:
"""Convert retrieved chunks to source references."""
return [
SourceReference(
artifact_file=chunk.artifact_file,
source_file=chunk.source_file,
chunk_type=chunk.chunk_type,
relevance_score=chunk.score,
)
for chunk in chunks
]
@app.post("/chat", response_model=ChatResponse) @app.post("/chat", response_model=ChatResponse)
async def chat( async def chat(request: ChatRequest, adapter: AdapterDependency) -> ChatResponse:
request: ChatRequest, """Process a chat message through the LLM adapter.
adapter: AdapterDependency,
retriever: RetrieverDependency,
memory: ConversationMemoryDependency,
classifier: IntentClassifierDependency,
_auth: ServiceAuthDependency,
) -> ChatResponse:
"""Process a chat message through the RAG pipeline.
- Validates message length - Validates message length
- Generates conversation_id if not provided - Generates conversation_id if not provided
- Classifies intent (codebase/general/clarification) - Routes to appropriate LLM adapter based on LLM_MODE
- Retrieves relevant context from FAISS (for codebase intent)
- Builds RAG prompt and generates response
- Stores conversation turn in Redis
""" """
# Validate message length # Validate message length
if len(request.message) > MAX_MESSAGE_LENGTH: if len(request.message) > MAX_MESSAGE_LENGTH:
@ -182,42 +47,19 @@ async def chat(
# Generate or use provided conversation_id # Generate or use provided conversation_id
conversation_id = request.conversation_id or str(uuid.uuid4()) conversation_id = request.conversation_id or str(uuid.uuid4())
# Get conversation history # Log request metadata (not content)
history = await memory.get_history(conversation_id)
# Classify intent
intent = await classifier.classify(request.message, history)
# Log request metadata
logger.info( logger.info(
"Chat request received", "Chat request received",
extra={ extra={
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message_length": len(request.message), "message_length": len(request.message),
"mode": settings.llm_mode, "mode": settings.llm_mode,
"intent": intent,
}, },
) )
# Retrieve context for codebase questions
chunks: list[RetrievedChunk] = []
if intent == "codebase":
chunks = await retriever.search(request.message)
logger.debug(f"Retrieved {len(chunks)} chunks for codebase question")
# Build RAG prompt
system_prompt, user_content = build_rag_prompt(
user_message=request.message,
intent=intent,
chunks=chunks,
history=history,
)
# Generate response with exception handling # Generate response with exception handling
try: try:
# For RAG, we pass the full constructed prompt response_text = await adapter.generate(conversation_id, request.message)
full_prompt = f"{system_prompt}\n\n{user_content}"
response_text = await adapter.generate(conversation_id, full_prompt)
except LLMError as e: except LLMError as e:
logger.error( logger.error(
"LLM generation failed", "LLM generation failed",
@ -229,15 +71,6 @@ async def chat(
) )
raise llm_exception_to_http(e) raise llm_exception_to_http(e)
# Store conversation turn
sources = _chunks_to_sources(chunks)
await memory.store_turn(
conversation_id=conversation_id,
user_message=request.message,
assistant_message=response_text,
sources=[s.model_dump() for s in sources] if sources else None,
)
# Log response metadata # Log response metadata
logger.info( logger.info(
"Chat response generated", "Chat response generated",
@ -245,8 +78,6 @@ async def chat(
"conversation_id": conversation_id, "conversation_id": conversation_id,
"response_length": len(response_text), "response_length": len(response_text),
"mode": settings.llm_mode, "mode": settings.llm_mode,
"intent": intent,
"sources_count": len(sources),
}, },
) )
@ -254,129 +85,7 @@ async def chat(
conversation_id=conversation_id, conversation_id=conversation_id,
response=response_text, response=response_text,
mode=settings.llm_mode, mode=settings.llm_mode,
intent=intent, sources=[],
sources=sources,
)
@app.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
adapter: AdapterDependency,
retriever: RetrieverDependency,
memory: ConversationMemoryDependency,
classifier: IntentClassifierDependency,
_auth: ServiceAuthDependency,
) -> StreamingResponse:
"""Stream a chat response through the RAG pipeline using Server-Sent Events.
- Validates message length
- Generates conversation_id if not provided
- Classifies intent and retrieves context
- Streams response with SSE format
- Stores conversation turn after completion
"""
# Validate message length
if len(request.message) > MAX_MESSAGE_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Message exceeds maximum length of {MAX_MESSAGE_LENGTH:,} characters. "
f"Your message has {len(request.message):,} characters.",
)
# Generate or use provided conversation_id
conversation_id = request.conversation_id or str(uuid.uuid4())
# Get conversation history
history = await memory.get_history(conversation_id)
# Classify intent
intent = await classifier.classify(request.message, history)
# Retrieve context for codebase questions
chunks: list[RetrievedChunk] = []
if intent == "codebase":
chunks = await retriever.search(request.message)
# Build RAG prompt
system_prompt, user_content = build_rag_prompt(
user_message=request.message,
intent=intent,
chunks=chunks,
history=history,
)
sources = _chunks_to_sources(chunks)
# Log request metadata
logger.info(
"Chat stream request received",
extra={
"conversation_id": conversation_id,
"message_length": len(request.message),
"mode": settings.llm_mode,
"intent": intent,
},
)
async def stream_response():
"""Async generator that yields SSE-formatted events."""
full_response = []
try:
full_prompt = f"{system_prompt}\n\n{user_content}"
async for chunk in adapter.generate_stream(conversation_id, full_prompt):
full_response.append(chunk)
event = StreamChunkEvent(content=chunk, conversation_id=conversation_id)
yield f"data: {event.model_dump_json()}\n\n"
# Store conversation turn
response_text = "".join(full_response)
await memory.store_turn(
conversation_id=conversation_id,
user_message=request.message,
assistant_message=response_text,
sources=[s.model_dump() for s in sources] if sources else None,
)
# Send completion event with sources
done_event = StreamDoneEvent(
conversation_id=conversation_id,
mode=settings.llm_mode,
intent=intent,
sources=sources,
)
yield f"data: {done_event.model_dump_json()}\n\n"
logger.info(
"Chat stream completed",
extra={
"conversation_id": conversation_id,
"mode": settings.llm_mode,
"intent": intent,
"sources_count": len(sources),
},
)
except LLMError as e:
logger.error(
"LLM streaming failed",
extra={
"conversation_id": conversation_id,
"error_type": type(e).__name__,
"error_message": e.message,
},
)
error_event = StreamErrorEvent(message=e.message, code=e.status_code)
yield f"data: {error_event.model_dump_json()}\n\n"
return StreamingResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
) )

View File

@ -1,9 +0,0 @@
"""Memory module for conversation history management."""
from app.memory.conversation import ConversationMemory, get_conversation_memory, ConversationMemoryDependency
__all__ = [
"ConversationMemory",
"get_conversation_memory",
"ConversationMemoryDependency",
]

View File

@ -1,122 +0,0 @@
"""Conversation history management with in-memory storage."""
import logging
import time
from dataclasses import dataclass, asdict, field
from typing import Annotated
from fastapi import Depends
from app.config import settings
logger = logging.getLogger(__name__)
MAX_HISTORY_MESSAGES = 20
@dataclass
class Message:
"""A single message in conversation history."""
role: str # "user" or "assistant"
content: str
sources: list[dict] | None = None
@dataclass
class ConversationData:
"""Container for conversation messages with timestamp for TTL."""
messages: list[Message] = field(default_factory=list)
last_updated: float = field(default_factory=time.time)
# Module-level storage for conversations
_conversations: dict[str, ConversationData] = {}
class ConversationMemory:
"""Manages conversation history in memory."""
def __init__(self, ttl: int):
"""Initialize conversation memory.
Args:
ttl: Time-to-live in seconds for conversations
"""
self.ttl = ttl
async def get_history(self, conversation_id: str) -> list[Message]:
"""Get conversation history.
Args:
conversation_id: Conversation identifier
Returns:
List of messages in chronological order, or empty list if expired/not found
"""
data = _conversations.get(conversation_id)
if data is None:
return []
# Check if expired
if time.time() - data.last_updated > self.ttl:
del _conversations[conversation_id]
return []
return data.messages
async def store_turn(
self,
conversation_id: str,
user_message: str,
assistant_message: str,
sources: list[dict] | None = None,
) -> None:
"""Store a conversation turn (user message + assistant response).
Args:
conversation_id: Conversation identifier
user_message: User's message
assistant_message: Assistant's response
sources: Optional source references used in response
"""
# Get existing history (checks TTL)
history = await self.get_history(conversation_id)
# Add new messages
history.append(Message(role="user", content=user_message))
history.append(Message(role="assistant", content=assistant_message, sources=sources))
# Trim to max size (keep most recent)
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
# Store with updated timestamp
_conversations[conversation_id] = ConversationData(
messages=history,
last_updated=time.time(),
)
logger.debug(f"Stored conversation turn for {conversation_id}")
async def clear(self, conversation_id: str) -> bool:
"""Clear conversation history.
Args:
conversation_id: Conversation identifier
Returns:
True if cleared successfully
"""
if conversation_id in _conversations:
del _conversations[conversation_id]
return True
def get_conversation_memory() -> ConversationMemory:
"""Get conversation memory instance."""
return ConversationMemory(ttl=settings.conversation_ttl)
ConversationMemoryDependency = Annotated[ConversationMemory, Depends(get_conversation_memory)]

View File

@ -1,136 +0,0 @@
"""System prompts for RAG-based codebase Q&A."""
from app.embeddings.retriever import RetrievedChunk
from app.memory.conversation import Message
# For codebase questions WITH retrieved context
CODEBASE_SYSTEM_PROMPT = """You answer questions about the Tyndale trading system using ONLY the provided YAML artifacts.
HARD CONSTRAINTS:
- Do NOT assume access to source code
- Do NOT invent implementation details not in the artifacts
- Do NOT speculate about code mechanics beyond what artifacts describe
- If artifacts do not contain enough information, say so explicitly
RESPONSE STYLE:
- Prefer architectural and behavioral explanations over mechanics
- Reference source files by path (e.g., ./trader.py)
- Explain trading concepts for developers without finance background
- Keep responses focused and concise"""
# For general questions (no RAG context)
GENERAL_SYSTEM_PROMPT = """You are an assistant for the Tyndale trading system documentation.
You can answer general questions, but for specific codebase questions, you need artifact context.
If the user asks about specific code without context, ask them to rephrase or be more specific."""
# For clarification/follow-ups (uses conversation history)
CLARIFICATION_SYSTEM_PROMPT = """You are continuing a conversation about the Tyndale trading system.
Use the conversation history to answer follow-up questions.
If you need to look up new information, ask the user to rephrase as a standalone question."""
def select_system_prompt(intent: str, has_context: bool) -> str:
"""Select appropriate system prompt based on intent and context.
Args:
intent: Classified intent (codebase, general, clarification)
has_context: Whether RAG context was retrieved
Returns:
System prompt string
"""
if intent == "codebase" and has_context:
return CODEBASE_SYSTEM_PROMPT
elif intent == "codebase" and not has_context:
return CODEBASE_SYSTEM_PROMPT + "\n\nNOTE: No relevant artifacts were found for this question. Acknowledge this limitation in your response."
elif intent == "clarification":
return CLARIFICATION_SYSTEM_PROMPT
else:
return GENERAL_SYSTEM_PROMPT
def format_context(chunks: list[RetrievedChunk]) -> str:
"""Format retrieved chunks as context for the LLM.
Args:
chunks: List of retrieved chunks
Returns:
Formatted context string
"""
if not chunks:
return ""
context_parts = ["## Retrieved Artifact Context\n"]
for i, chunk in enumerate(chunks, 1):
context_parts.append(f"### Source {i}: {chunk.source_file} ({chunk.chunk_type})")
context_parts.append(chunk.content)
context_parts.append("") # Empty line separator
return "\n".join(context_parts)
def format_history(history: list[Message], max_messages: int = 10) -> str:
"""Format conversation history for the LLM.
Args:
history: List of conversation messages
max_messages: Maximum messages to include
Returns:
Formatted history string
"""
if not history:
return ""
# Take most recent messages
recent = history[-max_messages:] if len(history) > max_messages else history
history_parts = ["## Conversation History\n"]
for msg in recent:
role = "User" if msg.role == "user" else "Assistant"
history_parts.append(f"**{role}**: {msg.content}")
history_parts.append("")
return "\n".join(history_parts)
def build_rag_prompt(
user_message: str,
intent: str,
chunks: list[RetrievedChunk],
history: list[Message],
) -> tuple[str, str]:
"""Build complete RAG prompt with system message and user content.
Args:
user_message: Current user message
intent: Classified intent
chunks: Retrieved context chunks
history: Conversation history
Returns:
Tuple of (system_prompt, user_content)
"""
# Select system prompt
system_prompt = select_system_prompt(intent, bool(chunks))
# Build user content
parts = []
# Add context if available
if chunks:
parts.append(format_context(chunks))
# Add history for clarification intent
if intent == "clarification" and history:
parts.append(format_history(history))
# Add current question
parts.append(f"## Current Question\n{user_message}")
user_content = "\n\n".join(parts)
return system_prompt, user_content

View File

@ -3,15 +3,6 @@ from typing import Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
class SourceReference(BaseModel):
"""Reference to a source artifact chunk used in RAG response."""
artifact_file: str = Field(..., description="Path to the YAML artifact file")
source_file: str = Field(..., description="Path to the source code file")
chunk_type: str = Field(..., description="Type of chunk (factual_summary, interpretive_summary, method, invariants)")
relevance_score: float = Field(..., description="Similarity score from FAISS search")
class ChatRequest(BaseModel): class ChatRequest(BaseModel):
"""Request model for the /chat endpoint.""" """Request model for the /chat endpoint."""
@ -27,60 +18,16 @@ class ChatResponse(BaseModel):
conversation_id: str = Field(..., description="Conversation ID (generated if not provided)") conversation_id: str = Field(..., description="Conversation ID (generated if not provided)")
response: str = Field(..., description="The LLM's response") response: str = Field(..., description="The LLM's response")
mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used") mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used")
intent: Literal["codebase", "general", "clarification"] = Field( sources: list = Field(default_factory=list, description="Source references (empty for now)")
default="general", description="Classified intent of the user message"
)
sources: list[SourceReference] = Field(default_factory=list, description="Source artifact references used in response")
class HealthResponse(BaseModel): class HealthResponse(BaseModel):
"""Response model for the /health endpoint.""" """Response model for the /health endpoint."""
status: str = Field(default="ok") status: str = Field(default="ok")
faiss_loaded: bool = Field(default=False, description="Whether FAISS index is loaded")
index_size: int = Field(default=0, description="Number of chunks in FAISS index")
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
"""Standard error response model.""" """Standard error response model."""
detail: str = Field(..., description="Error description") detail: str = Field(..., description="Error description")
# --- SSE Streaming Event Models ---
class StreamChunkEvent(BaseModel):
"""SSE event for content chunks during streaming."""
type: Literal["chunk"] = "chunk"
content: str = Field(..., description="Content chunk from the LLM")
conversation_id: str = Field(..., description="Conversation ID")
class StreamDoneEvent(BaseModel):
"""SSE event signaling completion of streaming."""
type: Literal["done"] = "done"
conversation_id: str = Field(..., description="Conversation ID")
mode: Literal["local", "remote", "openai", "asksage"] = Field(..., description="Which adapter was used")
intent: Literal["codebase", "general", "clarification"] = Field(
default="general", description="Classified intent of the user message"
)
sources: list[SourceReference] = Field(default_factory=list, description="Source artifact references used in response")
class IndexResponse(BaseModel):
"""Response model for the /index endpoint."""
status: str = Field(..., description="Indexing status")
chunks_indexed: int = Field(default=0, description="Number of chunks indexed")
artifacts_processed: int = Field(default=0, description="Number of YAML files processed")
class StreamErrorEvent(BaseModel):
"""SSE event for errors during streaming."""
type: Literal["error"] = "error"
message: str = Field(..., description="Error message")
code: int = Field(default=500, description="HTTP status code")

View File

@ -1,69 +0,0 @@
source_file: ./live.py
schema_version: v1.1
factual_summary: >
Provides the runtime framework for executing a market making strategy
in either paper or live trading modes. Responsible for initializing
exchanges, loading symbol universes, configuring execution components,
processing real-time market data, and coordinating the continuous
operation of the trading system.
methods:
main() -> None:
description: >
Asynchronous entry point that orchestrates system initialization,
strategy and trader setup, exchange connectivity, optional logging
and metrics configuration, and continuous processing of incoming
market data events.
interpretive_summary: >
Acts as the top-level execution driver for the trading system, wiring
together strategy logic, execution components, and exchange data feeds
into a single long-running process. Designed to support both simulated
and live trading environments while enabling observability and
operational control during runtime.
invariants:
- Trading mode must be explicitly selected before execution begins.
- Exchanges must be initialized prior to processing market data.
- Strategy and trader instances must be fully constructed before
entering the main processing loop.
- Market data processing must run continuously until the process
terminates.
- Logging and metrics configuration must not interfere with execution.
tags:
domain:
- market_data
- strategy_execution
- order_execution
- exchange_integration
trading_function:
- data_ingest
- entry_logic
- exit_logic
strategy_layer:
- execution
system_layer:
- worker
intent:
- orchestration
- isolation
- safety
data_type:
- signals
- orders
- exchanges
risk:
- latency
- data_corruption
- capital_loss
maturity:
- production

View File

@ -1,149 +0,0 @@
source_file: ./trader.py
schema_version: v1.1
factual_summary: >
Implements the execution component responsible for placing and managing
long and short trades, including associated hedge operations, across
supported exchanges. Maintains order book state, open positions, fees,
and profit and loss tracking while supporting both live trading and
backtesting modes.
methods:
"_getBracket(exch: str, sym: str) -> List[]":
description: >
Retrieves the current bid and ask information for a given symbol
on a specified exchange using maintained order book state.
"startBidding(exch: str, sym: str, ts: float, entryPrice: float or None) -> None":
description: >
Used in the Strat class to update the entry price of a symbol on a particular exchange. Logic for updating the
entry price is contained there.
"stopBidding(exch: str, sym: str, ts: float) -> None":
description: >
Halts the bidding by removing the order from the limitBuy dictionary.
"updateBracket(exch: str, sym: str, bidQuote: float, askQuote: float) -> None":
description: >
Updates the bracketed strings in our brackets dictionary.
"startOffering(exch: str, sym: str, ts: float, entryPrice: float or None) -> None":
description: >
Executes the trades when we are going short.
"stopOffering(exch: str, sym: str, ts: float) -> None":
description: >
Stops the short position.
"logIntradayPnl(val: float) -> None":
description: >
Updates the value of our account to our pnl database
"fpnl() -> None":
description: >
Returns a list of floating profit and loss summaries
"posCounts() -> dict":
description: >
Returns a python dictionary of the total positions we have on each exchange.
"logFee(fee: float) -> None":
description: >
Updates our fees in our database
"posValueAtExchOld(exch: str, sym: str) -> int":
description: >
Calculates the values of the long and short positions.
"fpnlnotional() -> int":
description: >
This method calculates the profit/loss (P&L) based on the notional value of long and short positions for each
unique symbol across different exchanges. It computes the average price of long positions, sums the notional
values of both long and short positions, and then uses these to compute the P&L.
"fillLong(exch: str, sym: str, price: float, quan: float, ts: int, tag: str, feeType: str) -> None":
description: >
The fillLong method handles the process of filling a buy order for a specified asset on an exchange. It updates
the trader's position with the new quantity and calculates the average price if necessary. If there was a short
position, it also calculates the profit/loss from buying back some or all of the shorted amount. Additionally,
it logs the trade, applies fees, and ensures data integrity by asserting non-negative values.
"fillShort(exch: str, sym: str, price: float, quan: float, ts: int, tag: str, feeType: str) -> None":
description: >
The fillShort method handles the process of filling a sell order for a specified asset on an exchange. It updates
the trader's position with the new quantity and calculates the average price if necessary. If there was a long
position, it also calculates the profit/loss from selling some or all of the held amount. Additionally, it logs
the trade, applies fees, and ensures data integrity by asserting non-negative values.
"hedgeLong(exch: str, sym: str, price: float, quan: float, ts: int, force: bool) -> Exchange":
description: >
The hedgeLong method finds an appropriate exchange to hedge a long position by executing a corresponding sell
order. It iterates over possible exchanges, calculates adjusted bid prices based on offsets, and selects the
highest bid price for execution. If no suitable exchange is found without forcing, it forces hedging. The method
ensures that the hedge does not exceed maximum allowed positions and logs relevant information throughout the
process.
"hedgeShort(exch: str, sym: str, price: float, quan: float, ts: int, force: bool) -> Exchange":
description: >
The hedgeShort method finds an appropriate exchange to hedge a short position by executing a corresponding buy
order. It iterates over possible exchanges, calculates adjusted ask prices based on offsets, and selects the
lowest ask price for execution. If no suitable exchange is found without forcing, it forces hedging. The method
ensures that the hedge does not exceed maximum allowed positions and logs relevant information throughout the
process.
"processTrade(exch: str, sym: str, price: float, tradeDir: list) -> None":
description: >
The first half of the processTrade method handles the scenario where a market sell order results in hitting a
limit buy order. It processes the fill, hedges the position, and logs various portfolio and holdings-related
details. The second half of the processTrade method handles the scenario where a market buy order results in
hitting a limit sell order. It processes the fill, hedges the position, and logs various portfolio and
holdings-related details.
interpretive_summary: >
Acts as the execution boundary between strategy decision logic and
exchange interaction, encapsulating trade placement, position tracking,
and operational mode handling. Designed to centralize execution-side
concerns such as fee application, position state, and environment-aware
behavior for live and simulated trading.
invariants:
- Trades must only be executed when trading is explicitly enabled.
- Exchange-specific state must be initialized before order execution.
- Position and profit tracking must remain consistent with executed trades.
- Execution behavior must respect live versus backtest operating modes.
tags:
domain:
- order_execution
- exchange_integration
- pnl_accounting
trading_function:
- entry_logic
- exit_logic
strategy_layer:
- execution
system_layer:
- service
intent:
- abstraction
- safety
- isolation
data_type:
- orders
- positions
- pnl
- exchanges
risk:
- capital_loss
- latency
- data_corruption
maturity:
- production

View File

@ -1,95 +0,0 @@
source_file: ./strat.py
schema_version: v1.1
factual_summary: >
Implements a market making strategy module responsible for maintaining
top-of-book state, calculating bid and ask prices using threshold-based
logic, validating tradable assets, and coordinating trade logging.
Manages exchange instances, symbol discovery across exchanges, and
runtime strategy state used for order placement and monitoring.
methods:
"registerExchange(exch: str, exchInst: Exchange) -> None":
description: >
Registers an exchange instance under a named key for use by the
strategy during execution.
"setMinQty(exch: str, qtyMap: dict) -> None":
description: >
Updates minimum and maximum trade quantity constraints for a given
exchange using a provided quantity mapping.
"printFloating(ts: float) -> None":
description: >
Emits a formatted runtime summary of strategy activity for logging
and monitoring purposes.
"bestEdgeBid(exch: str, sym: str) -> List[bool, float, bool, str, float]":
description: >
This method is designed to determine whether and where a bid should be placed based on several factors
related to market conditions and strategy rules. Its primary goal of is to calculate the optimal bid price for a
given symbol on a specific exchange, taking into account market positions, hedging strategies, trading limits,
price precision, and fee structures. Returns a collection of decision outputs including bid eligibility status,
relative pricing divergence, long-position participation, selected hedge exchange, and the computed bid price.
"bestEdgeOffer(exch: str, sym: str) -> list":
description: >
The primary goal of bestEdgeOffer is to calculate the optimal offer (ask) price for a given symbol on a specific
exchange, considering market positions, hedging strategies, trading limits, price precision, fee structures.
Returns almost identical items as bestEdgeBid, but on the other side of the order book
"updateTob(exch: str, sym: str, tob: dict, ts: float, force: bool) -> None":
description: >
The primary goal of the updateTob function is to update the top of the order book for quick access to perform
market making operations (bidding and selling) using the highest bid and lowest ask prices.
interpretive_summary: >
Serves as the central coordination unit for the market making strategy,
maintaining shared runtime state across exchanges and assets while
enforcing eligibility rules and operational constraints. Designed to
balance responsiveness with safety through internal validation,
rate limiting counters, and symbol qualification logic.
invariants:
- Only assets validated by runtime eligibility checks may be traded.
- Exchange instances must be registered before strategy execution.
- Top-of-book data must exist before bid or ask calculations occur.
- Rate limiting counters must accurately reflect order activity.
tags:
domain:
- market_data
- strategy_execution
- order_execution
- exchange_integration
trading_function:
- position_sizing
- entry_logic
- exit_logic
strategy_layer:
- execution
- decision
system_layer:
- engine
intent:
- orchestration
- validation
- safety
data_type:
- signals
- orders
- positions
- exchanges
risk:
- capital_loss
- latency
- data_corruption
maturity:
- production

Binary file not shown.

View File

@ -1,6 +0,0 @@
{
"total_chunks": 33,
"total_artifacts": 3,
"dimensions": 1536,
"index_type": "IndexFlatIP"
}

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,6 @@ click==8.3.1
colorama==0.4.6 colorama==0.4.6
distro==1.9.0 distro==1.9.0
fastapi==0.128.0 fastapi==0.128.0
google-auth>=2.20.0
h11==0.16.0 h11==0.16.0
httpcore==1.0.9 httpcore==1.0.9
httptools==0.7.1 httptools==0.7.1
@ -32,4 +31,3 @@ urllib3==2.6.3
uvicorn==0.40.0 uvicorn==0.40.0
watchfiles==1.1.1 watchfiles==1.1.1
websockets==16.0 websockets==16.0
faiss-cpu>=1.12.0

View File

@ -1,102 +0,0 @@
# artifact_schema.yaml
schema_name: trading_code_artifact
schema_version: v1.1
description: >
Defines the structure for derived knowledge artifacts generated from
trading system source code. Artifacts capture factual structure,
interpretive intent, invariants, and validated tags for reasoning
and retrieval by the AI backend.
required_fields:
- source_file
- schema_version
- factual_summary
- tags
optional_fields:
- methods
- interpretive_summary
- invariants
- provenance
field_definitions:
source_file:
type: string
description: Relative path to the source file in the trading repository.
constraints:
- non_empty
- relative_path
schema_version:
type: string
description: Version of the artifact schema used to generate this artifact.
factual_summary:
type: text
description: >
Objective description of what the file or module does.
Focuses on responsibilities and structure, not rationale.
constraints:
- min_length: 40
- max_length: 500
- no_interpretive_language
methods:
type: object
optional: true
description: >
Optional method-level descriptions for classes or modules where
fine-grained behavioral understanding is useful.
method_definition:
signature:
type: string
description:
type: text
constraints:
- no_implementation_details
- max_length: 200
interpretive_summary:
type: text
optional: true
description: >
High-level explanation of why the file exists, what architectural
role it plays, and what tradeoffs or risks it is designed to manage.
constraints:
- min_length: 40
- max_length: 500
- no_code_identifiers
invariants:
type: list
optional: true
description: >
Conditions that must always hold true for correct behavior.
item_constraints:
- type: string
- min_length: 10
tags:
type: object
description: >
Classification metadata for the artifact. All tag dimensions
and values must conform to the backend tag schema.
constraints:
- validated_against_tag_schema
provenance:
type: object
optional: true
fields:
generated_at:
type: datetime
generated_by:
type: string
source_commit:
type: string
confidence_score:
type: float
constraints:
- min: 0.0
- max: 1.0

View File

@ -1,60 +0,0 @@
# tag_schema.yaml
schema_version: v1
dimensions:
domain:
- market_data
- signal_generation
- strategy_execution
- risk_management
- order_execution
- exchange_integration
- pnl_accounting
trading_function:
- data_ingest
- alpha_generation
- position_sizing
- entry_logic
- exit_logic
strategy_layer:
- signal
- decision
- execution
- evaluation
system_layer:
- api
- service
- engine
- worker
- persistence
intent:
- orchestration
- abstraction
- isolation
- validation
- optimization
- safety
data_type:
- signals
- orders
- positions
- pnl
- configs
- exchanges
risk:
- capital_loss
- liquidation
- latency
- data_corruption
- security
- hedging
maturity:
- experimental
- production