> ## Documentation Index
> Fetch the complete documentation index at: https://finance.chiefpriest.design/llms.txt
> Use this file to discover all available pages before exploring further.

# Configuration

> Customize and configure the Financial MCP Server for your needs

## Overview

The Financial MCP Server offers extensive configuration options to adapt to different use cases, from development to enterprise production environments.

## Environment Variables

### Core Configuration

<ParamField env="FMP_API_KEY" type="string" required>
  Financial Modeling Prep API key for professional data access

  **Default**: `"demo"` (limited functionality)

  **Example**: `"reQn7ur3NVLXq7dt5nSZEHo3ijo1fE8n"`
</ParamField>

<ParamField env="PORT" type="integer">
  Port number for the server to listen on

  **Default**: `8001`

  **Range**: `1024-65535` (use non-privileged ports)
</ParamField>

<ParamField env="HOST" type="string">
  Host address to bind the server to

  **Default**: `"0.0.0.0"` (all interfaces)

  **Options**: `"127.0.0.1"` (localhost only), `"0.0.0.0"` (all interfaces)
</ParamField>

<ParamField env="LOG_LEVEL" type="string">
  Logging verbosity level

  **Default**: `"INFO"`

  **Options**: `"DEBUG"`, `"INFO"`, `"WARNING"`, `"ERROR"`, `"CRITICAL"`
</ParamField>

### Advanced Configuration

<ParamField env="CORS_ORIGINS" type="string">
  Comma-separated list of allowed CORS origins

  **Default**: `"*"` (allow all origins)

  **Example**: `"http://localhost:3000,https://myapp.com"`
</ParamField>

<ParamField env="REQUEST_TIMEOUT" type="integer">
  Timeout for external API requests in seconds

  **Default**: `30`

  **Range**: `5-300`
</ParamField>

<ParamField env="CACHE_TTL" type="integer">
  Cache time-to-live in seconds

  **Default**: `60` (1 minute)

  **Range**: `0-3600` (0 disables caching)
</ParamField>

<ParamField env="MAX_CONNECTIONS" type="integer">
  Maximum concurrent connections to external APIs

  **Default**: `100`

  **Range**: `10-1000`
</ParamField>

## Configuration Examples

### Development Environment

<CodeGroup>
  ```bash Local Development theme={null}
  # .env file for development
  FMP_API_KEY=your_development_api_key
  PORT=8001
  HOST=127.0.0.1
  LOG_LEVEL=DEBUG
  CORS_ORIGINS=http://localhost:3000,http://localhost:3001
  REQUEST_TIMEOUT=30
  CACHE_TTL=60
  ```

  ```bash Quick Start theme={null}
  # Minimal setup for testing
  export FMP_API_KEY="demo"
  export PORT=8001
  python3 main.py
  ```
</CodeGroup>

### Production Environment

<CodeGroup>
  ```bash Production Server theme={null}
  # Production environment variables
  FMP_API_KEY=your_production_api_key
  PORT=8001
  HOST=0.0.0.0
  LOG_LEVEL=INFO
  CORS_ORIGINS=https://yourdomain.com,https://app.yourdomain.com
  REQUEST_TIMEOUT=15
  CACHE_TTL=300
  MAX_CONNECTIONS=200
  ```

  ```bash High Performance theme={null}
  # High-traffic production setup
  FMP_API_KEY=your_enterprise_api_key
  PORT=8001
  HOST=0.0.0.0
  LOG_LEVEL=WARNING
  CORS_ORIGINS=https://yourdomain.com
  REQUEST_TIMEOUT=10
  CACHE_TTL=600
  MAX_CONNECTIONS=500
  ```
</CodeGroup>

### Docker Configuration

<CodeGroup>
  ```yaml Docker Compose theme={null}
  version: '3.8'
  services:
    financial-mcp:
      build: .
      environment:
        - FMP_API_KEY=${FMP_API_KEY}
        - PORT=8001
        - HOST=0.0.0.0
        - LOG_LEVEL=INFO
        - CORS_ORIGINS=*
        - REQUEST_TIMEOUT=30
        - CACHE_TTL=300
      ports:
        - "8001:8001"
      restart: unless-stopped
  ```

  ```dockerfile Dockerfile ENV theme={null}
  # Set environment variables in Dockerfile
  ENV FMP_API_KEY=""
  ENV PORT=8001
  ENV HOST=0.0.0.0
  ENV LOG_LEVEL=INFO
  ENV CORS_ORIGINS="*"
  ENV REQUEST_TIMEOUT=30
  ENV CACHE_TTL=300
  ```
</CodeGroup>

## Server Customization

### Logging Configuration

<CodeGroup>
  ```python Custom Logging theme={null}
  # main.py - Custom logging setup
  import logging
  import sys
  from datetime import datetime

  # Create custom formatter
  class CustomFormatter(logging.Formatter):
      def format(self, record):
          record.timestamp = datetime.now().isoformat()
          return super().format(record)

  # Configure logging
  log_level = os.getenv("LOG_LEVEL", "INFO").upper()
  logging.basicConfig(
      level=getattr(logging, log_level),
      format='%(timestamp)s - %(name)s - %(levelname)s - %(message)s',
      handlers=[
          logging.StreamHandler(sys.stdout),
          logging.FileHandler('financial-mcp.log')
      ]
  )

  # Set custom formatter
  for handler in logging.getLogger().handlers:
      handler.setFormatter(CustomFormatter())
  ```

  ```python Structured Logging theme={null}
  # JSON structured logging for production
  import json
  import logging

  class JSONFormatter(logging.Formatter):
      def format(self, record):
          log_entry = {
              "timestamp": datetime.utcnow().isoformat(),
              "level": record.levelname,
              "message": record.getMessage(),
              "module": record.module,
              "function": record.funcName,
              "line": record.lineno
          }
          
          # Add extra fields if present
          if hasattr(record, 'user_id'):
              log_entry['user_id'] = record.user_id
          if hasattr(record, 'request_id'):
              log_entry['request_id'] = record.request_id
              
          return json.dumps(log_entry)
  ```
</CodeGroup>

### CORS Configuration

<CodeGroup>
  ```python Custom CORS theme={null}
  # main.py - Advanced CORS setup
  from fastapi.middleware.cors import CORSMiddleware

  # Parse CORS origins from environment
  cors_origins = os.getenv("CORS_ORIGINS", "*").split(",")
  cors_origins = [origin.strip() for origin in cors_origins]

  app.add_middleware(
      CORSMiddleware,
      allow_origins=cors_origins,
      allow_credentials=True,
      allow_methods=["GET", "POST", "PUT", "DELETE"],
      allow_headers=["*"],
      expose_headers=["X-Request-ID", "X-Response-Time"]
  )
  ```

  ```python Dynamic CORS theme={null}
  # Environment-based CORS configuration
  def configure_cors():
      if os.getenv("ENVIRONMENT") == "development":
          return ["http://localhost:3000", "http://localhost:3001"]
      elif os.getenv("ENVIRONMENT") == "staging":
          return ["https://staging.yourdomain.com"]
      else:  # production
          return ["https://yourdomain.com", "https://app.yourdomain.com"]

  app.add_middleware(
      CORSMiddleware,
      allow_origins=configure_cors(),
      allow_credentials=True,
      allow_methods=["GET", "POST"],
      allow_headers=["Authorization", "Content-Type"]
  )
  ```
</CodeGroup>

### Rate Limiting

<CodeGroup>
  ```python Basic Rate Limiting theme={null}
  # Add rate limiting middleware
  from slowapi import Limiter, _rate_limit_exceeded_handler
  from slowapi.util import get_remote_address
  from slowapi.errors import RateLimitExceeded

  limiter = Limiter(key_func=get_remote_address)
  app.state.limiter = limiter
  app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

  @app.get("/api/v1/stock/{symbol}")
  @limiter.limit("60/minute")  # 60 requests per minute
  async def get_stock_quote(request: Request, symbol: str):
      return await stock_quote_handler(symbol)
  ```

  ```python Advanced Rate Limiting theme={null}
  # Custom rate limiting with different tiers
  def get_rate_limit(request: Request) -> str:
      # Check if user has API key (premium tier)
      api_key = request.headers.get("X-API-Key")
      if api_key and validate_api_key(api_key):
          return "1000/hour"  # Premium: 1000 requests/hour
      else:
          return "100/hour"   # Free: 100 requests/hour

  @app.get("/api/v1/stock/{symbol}")
  async def get_stock_quote(request: Request, symbol: str):
      rate_limit = get_rate_limit(request)
      # Apply dynamic rate limiting
      return await stock_quote_handler(symbol)
  ```
</CodeGroup>

### Caching Configuration

<CodeGroup>
  ```python Memory Caching theme={null}
  # In-memory caching with TTL
  from functools import lru_cache
  import time
  import asyncio

  class TTLCache:
      def __init__(self, ttl_seconds=300):
          self.cache = {}
          self.ttl = ttl_seconds
      
      def get(self, key):
          if key in self.cache:
              value, timestamp = self.cache[key]
              if time.time() - timestamp < self.ttl:
                  return value
              else:
                  del self.cache[key]
          return None
      
      def set(self, key, value):
          self.cache[key] = (value, time.time())

  # Global cache instance
  cache = TTLCache(ttl_seconds=int(os.getenv("CACHE_TTL", "300")))

  async def cached_fmp_request(endpoint):
      cached_result = cache.get(endpoint)
      if cached_result:
          return cached_result
      
      result = await fmp_request(endpoint)
      cache.set(endpoint, result)
      return result
  ```

  ```python Redis Caching theme={null}
  # Redis-based caching for production
  import redis
  import json

  redis_client = redis.Redis(
      host=os.getenv("REDIS_HOST", "localhost"),
      port=int(os.getenv("REDIS_PORT", "6379")),
      db=int(os.getenv("REDIS_DB", "0")),
      decode_responses=True
  )

  async def cached_request(key, fetch_func, ttl=300):
      # Try to get from cache
      cached_data = redis_client.get(key)
      if cached_data:
          return json.loads(cached_data)
      
      # Fetch fresh data
      data = await fetch_func()
      
      # Store in cache
      redis_client.setex(key, ttl, json.dumps(data))
      return data

  # Usage
  async def get_stock_quote(symbol):
      return await cached_request(
          f"stock_quote:{symbol}",
          lambda: fetch_stock_quote(symbol),
          ttl=int(os.getenv("CACHE_TTL", "300"))
      )
  ```
</CodeGroup>

## Tool Configuration

### Enabling/Disabling Tools

<CodeGroup>
  ```python Selective Tool Loading theme={null}
  # Configure which tools to load
  ENABLED_YFINANCE_TOOLS = os.getenv("YFINANCE_TOOLS", "all").split(",")
  ENABLED_FMP_TOOLS = os.getenv("FMP_TOOLS", "all").split(",")

  def register_tools():
      # YFinance tools
      if "all" in ENABLED_YFINANCE_TOOLS or "quote" in ENABLED_YFINANCE_TOOLS:
          mcp.tool("get_stock_quote")(get_stock_quote)
      
      if "all" in ENABLED_YFINANCE_TOOLS or "overview" in ENABLED_YFINANCE_TOOLS:
          mcp.tool("get_company_overview")(get_company_overview)
      
      # FMP tools
      if "all" in ENABLED_FMP_TOOLS or "gainers" in ENABLED_FMP_TOOLS:
          mcp.tool("fmp_get_market_gainers")(fmp_get_market_gainers)
      
      # Continue for other tools...

  register_tools()
  ```

  ```bash Environment Tool Selection theme={null}
  # Enable only specific tools
  export YFINANCE_TOOLS="quote,overview,historical"
  export FMP_TOOLS="gainers,losers,crypto"

  # Or enable all tools
  export YFINANCE_TOOLS="all"
  export FMP_TOOLS="all"
  ```
</CodeGroup>

### Custom Tool Parameters

<CodeGroup>
  ```python Tool Defaults theme={null}
  # Customize default parameters for tools
  DEFAULT_STOCK_LIMIT = int(os.getenv("DEFAULT_STOCK_LIMIT", "20"))
  DEFAULT_CRYPTO_LIMIT = int(os.getenv("DEFAULT_CRYPTO_LIMIT", "50"))
  DEFAULT_HISTORICAL_PERIOD = os.getenv("DEFAULT_HISTORICAL_PERIOD", "1y")

  @mcp.tool("fmp_get_market_gainers")
  async def fmp_get_market_gainers(limit: int = DEFAULT_STOCK_LIMIT):
      return await fmp_market_gainers(limit)

  @mcp.tool("fmp_get_crypto_prices")
  async def fmp_get_crypto_prices(limit: int = DEFAULT_CRYPTO_LIMIT):
      return await fmp_crypto_prices(limit)
  ```

  ```python Tool Validation theme={null}
  # Add custom validation for tool parameters
  def validate_symbol(symbol: str) -> str:
      if not symbol or len(symbol) > 10:
          raise ValueError("Invalid symbol: must be 1-10 characters")
      
      # Remove common prefixes/suffixes
      symbol = symbol.upper().strip()
      symbol = symbol.replace("$", "").replace(".", "")
      
      return symbol

  def validate_limit(limit: int, max_limit: int = 100) -> int:
      if limit < 1:
          return 1
      elif limit > max_limit:
          return max_limit
      return limit

  @mcp.tool("get_stock_quote")
  async def get_stock_quote(symbol: str):
      symbol = validate_symbol(symbol)
      return await fetch_stock_quote(symbol)
  ```
</CodeGroup>

## Security Configuration

### API Key Management

<CodeGroup>
  ```python Multi-Key Support theme={null}
  # Support multiple FMP API keys for load balancing
  FMP_API_KEYS = os.getenv("FMP_API_KEYS", "").split(",")
  FMP_API_KEYS = [key.strip() for key in FMP_API_KEYS if key.strip()]

  import random

  def get_api_key():
      if not FMP_API_KEYS:
          return os.getenv("FMP_API_KEY", "demo")
      return random.choice(FMP_API_KEYS)

  async def fmp_request(endpoint):
      api_key = get_api_key()
      url = f"https://financialmodelingprep.com/api/v3/{endpoint}?apikey={api_key}"
      # Make request...
  ```

  ```python Key Rotation theme={null}
  # Automatic API key rotation on rate limits
  class APIKeyManager:
      def __init__(self):
          self.keys = os.getenv("FMP_API_KEYS", "").split(",")
          self.current_key_index = 0
          self.key_cooldowns = {}
      
      def get_active_key(self):
          now = time.time()
          
          # Find a key that's not in cooldown
          for i, key in enumerate(self.keys):
              if key not in self.key_cooldowns or now > self.key_cooldowns[key]:
                  self.current_key_index = i
                  return key
          
          # If all keys are in cooldown, use the first one
          return self.keys[0] if self.keys else os.getenv("FMP_API_KEY", "demo")
      
      def mark_rate_limited(self, key):
          # Put key in cooldown for 1 hour
          self.key_cooldowns[key] = time.time() + 3600

  key_manager = APIKeyManager()
  ```
</CodeGroup>

### Access Control

<CodeGroup>
  ```python IP Whitelisting theme={null}
  # Restrict access to specific IP addresses
  ALLOWED_IPS = os.getenv("ALLOWED_IPS", "").split(",")
  ALLOWED_IPS = [ip.strip() for ip in ALLOWED_IPS if ip.strip()]

  @app.middleware("http")
  async def ip_whitelist_middleware(request: Request, call_next):
      client_ip = request.client.host
      
      if ALLOWED_IPS and client_ip not in ALLOWED_IPS:
          return JSONResponse(
              status_code=403,
              content={"error": "Access denied", "ip": client_ip}
          )
      
      response = await call_next(request)
      return response
  ```

  ```python API Authentication theme={null}
  # Optional API key authentication for your endpoints
  API_KEYS = os.getenv("API_KEYS", "").split(",")
  API_KEYS = [key.strip() for key in API_KEYS if key.strip()]

  def require_api_key(func):
      @wraps(func)
      async def wrapper(request: Request, *args, **kwargs):
          if not API_KEYS:  # No authentication required
              return await func(request, *args, **kwargs)
          
          api_key = request.headers.get("X-API-Key")
          if not api_key or api_key not in API_KEYS:
              raise HTTPException(status_code=401, detail="Invalid API key")
          
          return await func(request, *args, **kwargs)
      return wrapper

  @app.get("/api/v1/stock/{symbol}")
  @require_api_key
  async def protected_stock_quote(request: Request, symbol: str):
      return await get_stock_quote(symbol)
  ```
</CodeGroup>

## Performance Tuning

### Connection Pooling

<CodeGroup>
  ```python HTTP Client Configuration theme={null}
  # Optimize HTTP client for performance
  import aiohttp
  import asyncio

  class HTTPClientManager:
      def __init__(self):
          self.session = None
      
      async def get_session(self):
          if self.session is None:
              timeout = aiohttp.ClientTimeout(
                  total=int(os.getenv("REQUEST_TIMEOUT", "30"))
              )
              
              connector = aiohttp.TCPConnector(
                  limit=int(os.getenv("MAX_CONNECTIONS", "100")),
                  limit_per_host=int(os.getenv("MAX_CONNECTIONS_PER_HOST", "30")),
                  keepalive_timeout=30,
                  enable_cleanup_closed=True
              )
              
              self.session = aiohttp.ClientSession(
                  connector=connector,
                  timeout=timeout
              )
          
          return self.session
      
      async def close(self):
          if self.session:
              await self.session.close()

  http_manager = HTTPClientManager()

  # Use in startup/shutdown events
  @app.on_event("shutdown")
  async def shutdown_event():
      await http_manager.close()
  ```

  ```python Async Optimization theme={null}
  # Optimize for concurrent requests
  import asyncio
  from typing import List

  async def fetch_multiple_quotes(symbols: List[str]):
      """Fetch multiple stock quotes concurrently"""
      semaphore = asyncio.Semaphore(10)  # Limit concurrent requests
      
      async def fetch_with_semaphore(symbol):
          async with semaphore:
              return await get_stock_quote(symbol)
      
      tasks = [fetch_with_semaphore(symbol) for symbol in symbols]
      results = await asyncio.gather(*tasks, return_exceptions=True)
      
      # Filter out exceptions
      return [result for result in results if not isinstance(result, Exception)]
  ```
</CodeGroup>

### Memory Management

<CodeGroup>
  ```python Memory Optimization theme={null}
  # Monitor and optimize memory usage
  import gc
  import psutil
  import os

  def get_memory_usage():
      process = psutil.Process(os.getpid())
      memory_mb = process.memory_info().rss / 1024 / 1024
      return memory_mb

  @app.middleware("http")
  async def memory_monitor_middleware(request: Request, call_next):
      memory_before = get_memory_usage()
      
      response = await call_next(request)
      
      memory_after = get_memory_usage()
      memory_diff = memory_after - memory_before
      
      # Log high memory usage requests
      if memory_diff > 50:  # 50MB threshold
          logger.warning(f"High memory usage: {memory_diff:.2f}MB for {request.url}")
      
      # Trigger garbage collection if memory usage is high
      if memory_after > 500:  # 500MB threshold
          gc.collect()
      
      return response
  ```

  ```python Cache Management theme={null}
  # Intelligent cache management
  class SmartCache:
      def __init__(self, max_size=1000, ttl=300):
          self.cache = {}
          self.access_times = {}
          self.max_size = max_size
          self.ttl = ttl
      
      def get(self, key):
          if key in self.cache:
              value, timestamp = self.cache[key]
              if time.time() - timestamp < self.ttl:
                  self.access_times[key] = time.time()
                  return value
              else:
                  self._remove(key)
          return None
      
      def set(self, key, value):
          if len(self.cache) >= self.max_size:
              self._evict_lru()
          
          self.cache[key] = (value, time.time())
          self.access_times[key] = time.time()
      
      def _evict_lru(self):
          # Remove least recently used item
          if self.access_times:
              lru_key = min(self.access_times, key=self.access_times.get)
              self._remove(lru_key)
      
      def _remove(self, key):
          self.cache.pop(key, None)
          self.access_times.pop(key, None)
  ```
</CodeGroup>

## Monitoring Configuration

### Health Checks

<CodeGroup>
  ```python Advanced Health Check theme={null}
  @app.get("/api/v1/health")
  async def health_check():
      health_data = {
          "status": "healthy",
          "timestamp": datetime.now().isoformat(),
          "version": "1.0.0",
          "tools_available": 32,
          "data_sources": ["YFinance", "FMP"]
      }
      
      # Check FMP API connectivity
      try:
          test_response = await fmp_request("quote/AAPL")
          health_data["fmp_status"] = "connected"
      except Exception as e:
          health_data["fmp_status"] = "error"
          health_data["fmp_error"] = str(e)
      
      # Check YFinance connectivity
      try:
          import yfinance as yf
          ticker = yf.Ticker("AAPL")
          ticker.info
          health_data["yfinance_status"] = "connected"
      except Exception as e:
          health_data["yfinance_status"] = "error"
          health_data["yfinance_error"] = str(e)
      
      # System metrics
      health_data["memory_usage_mb"] = get_memory_usage()
      health_data["cache_size"] = len(cache.cache) if hasattr(cache, 'cache') else 0
      
      return health_data
  ```

  ```python Metrics Endpoint theme={null}
  @app.get("/api/v1/metrics")
  async def metrics():
      return {
          "requests_total": request_counter,
          "requests_per_minute": calculate_rpm(),
          "average_response_time": calculate_avg_response_time(),
          "error_rate": calculate_error_rate(),
          "cache_hit_rate": calculate_cache_hit_rate(),
          "memory_usage": get_memory_usage(),
          "active_connections": get_active_connections()
      }
  ```
</CodeGroup>

## Deployment-Specific Configuration

### Railway Configuration

<CodeGroup>
  ```json railway.json theme={null}
  {
    "build": {
      "builder": "NIXPACKS"
    },
    "deploy": {
      "restartPolicyType": "ON_FAILURE",
      "restartPolicyMaxRetries": 10,
      "healthcheckPath": "/api/v1/health",
      "healthcheckTimeout": 30
    }
  }
  ```

  ```toml nixpacks.toml theme={null}
  [phases.setup]
  nixPkgs = ["python3", "pip"]

  [phases.install]
  cmds = ["pip install -r requirements.txt"]

  [phases.build]
  cmds = ["echo 'Build complete'"]

  [start]
  cmd = "python main.py"
  ```
</CodeGroup>

### Docker Configuration

<CodeGroup>
  ```dockerfile Production Dockerfile theme={null}
  FROM python:3.11-slim

  # Install system dependencies
  RUN apt-get update && apt-get install -y \
      curl \
      && rm -rf /var/lib/apt/lists/*

  # Create non-root user
  RUN useradd --create-home --shell /bin/bash app

  # Set working directory
  WORKDIR /home/app

  # Copy requirements and install dependencies
  COPY requirements.txt .
  RUN pip install --no-cache-dir -r requirements.txt

  # Copy application code
  COPY --chown=app:app . .

  # Switch to non-root user
  USER app

  # Health check
  HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:${PORT:-8001}/api/v1/health || exit 1

  # Run the application
  CMD ["python", "main.py"]
  ```

  ```yaml K8s ConfigMap theme={null}
  apiVersion: v1
  kind: ConfigMap
  metadata:
    name: financial-mcp-config
  data:
    PORT: "8001"
    HOST: "0.0.0.0"
    LOG_LEVEL: "INFO"
    CORS_ORIGINS: "https://yourdomain.com"
    REQUEST_TIMEOUT: "30"
    CACHE_TTL: "300"
    MAX_CONNECTIONS: "100"
  ```
</CodeGroup>

## Next Steps

<CardGroup cols={2}>
  <Card title="Deployment Guide" icon="rocket" href="/guides/deployment">
    Deploy your configured server to production
  </Card>

  <Card title="Troubleshooting" icon="wrench" href="/guides/troubleshooting">
    Solve common configuration issues
  </Card>

  <Card title="API Reference" icon="book" href="/api-reference/introduction">
    Explore all available endpoints
  </Card>

  <Card title="Architecture" icon="sitemap" href="/concepts/architecture">
    Understand the server architecture
  </Card>
</CardGroup>
