← Back to Portfolio

AI Economics Advisor

Financial Analytics

Real-time economic analysis platform with live market data and vector database for precedent search.

Python FastAPI LanceDB React Chart.js

title: AI Economics Advisor slug: ai-economics-advisor description: Real-time economic analysis platform with live market data and vector database for precedent search. featured: false hero: false status: Prototype published: published category: AI & Machine Learning technologies: - Python - FastAPI - LanceDB - React - Chart.js date: 2025-01-15

AI Economics Advisor

Real-time economic analysis platform combining live market data feeds, AI-powered insights, and vector database precedent search for comprehensive economic advisory.

Overview

An intelligent economics advisory platform that aggregates data from FRED (Federal Reserve Economic Data), Polygon.io financial markets, and World Bank datasets to provide real-time economic analysis and forecasting. The system uses vector embeddings to search historical precedents and AI models to generate contextual insights.

Unlike traditional economic dashboards that simply display data, this platform understands relationships between indicators, identifies similar historical periods, and generates natural language explanations of economic conditions.

Architecture Overview

graph TB
    subgraph "Data Sources"
        FRED[FRED API<br/>Economic Indicators]
        POLY[Polygon.io<br/>Market Data]
        WB[World Bank<br/>Development Indicators]
    end

    subgraph "Backend Services"
        API[FastAPI<br/>REST + WebSocket]
        FEED[Data Feed Engine<br/>Pluggable Sources]
        METRICS[Metrics Processor<br/>Event-Driven]
    end

    subgraph "AI Layer"
        LANCE[(LanceDB<br/>Vector Embeddings)]
        LLM[LLM Analyzer<br/>GPT-4/Claude]
        SEARCH[Precedent Search<br/>Similarity]
    end

    subgraph "Frontend"
        DASH[React Dashboard<br/>Real-time Charts]
        VIZ[Chart.js<br/>Time Series]
        WS[WebSocket Client<br/>Live Updates]
    end

    FRED --> FEED
    POLY --> FEED
    WB --> FEED
    FEED --> METRICS
    METRICS --> API
    METRICS --> LANCE
    API --> LLM
    LANCE --> SEARCH
    SEARCH --> LLM
    LLM --> API
    API --> WS
    WS --> DASH
    DASH --> VIZ

    style LANCE fill:#4f46e5
    style LLM fill:#dc2626
    style METRICS fill:#059669

Core Components

1. Pluggable Data Feed Engine

Multi-Source Architecture:

from abc import ABC, abstractmethod

class DataFeed(ABC):
    """Base class for economic data sources"""

    @abstractmethod
    async def fetch_latest(self, indicators: List[str]) -> DataFrame:
        """Fetch latest values for indicators"""
        pass

    @abstractmethod
    async def fetch_historical(
        self,
        indicators: List[str],
        start_date: str,
        end_date: str
    ) -> DataFrame:
        """Fetch historical time series"""
        pass

class FREDFeed(DataFeed):
    """Federal Reserve Economic Data"""

    async def fetch_latest(self, indicators: List[str]) -> DataFrame:
        # FRED API: GDP, unemployment, inflation, etc.
        return await self.client.get_series(indicators)

class PolygonFeed(DataFeed):
    """Financial market data"""

    async def fetch_latest(self, indicators: List[str]) -> DataFrame:
        # Stock prices, forex, commodities
        return await self.client.get_aggregates(indicators)

class WorldBankFeed(DataFeed):
    """Global development indicators"""

    async def fetch_latest(self, indicators: List[str]) -> DataFrame:
        # International data: GDP by country, trade, etc.
        return await self.client.get_indicators(indicators)

Feed Registry:

class FeedRegistry:
    """Manage multiple data sources"""

    def __init__(self):
        self.feeds = {
            "fred": FREDFeed(),
            "polygon": PolygonFeed(),
            "worldbank": WorldBankFeed()
        }

    async def fetch_indicator(self, source: str, indicator: str):
        feed = self.feeds[source]
        return await feed.fetch_latest([indicator])

    async def fetch_all(self, indicators: Dict[str, List[str]]):
        """Fetch from multiple sources concurrently"""
        tasks = [
            self.feeds[source].fetch_latest(inds)
            for source, inds in indicators.items()
        ]
        return await asyncio.gather(*tasks)

2. Event-Driven Metrics Processor

Real-Time Processing Pipeline:

from dataclasses import dataclass

@dataclass
class MetricEvent:
    indicator: str
    value: float
    timestamp: datetime
    source: str
    metadata: Dict

class MetricsProcessor:
    """Process incoming metric updates"""

    def __init__(self):
        self.subscribers: List[Callable] = []
        self.history: Dict[str, List[MetricEvent]] = {}

    async def process_event(self, event: MetricEvent):
        # Store in history
        self.history.setdefault(event.indicator, []).append(event)

        # Calculate derived metrics
        derived = self.calculate_derived(event)

        # Notify subscribers (WebSocket clients)
        await self.notify_subscribers(event, derived)

        # Update vector database
        await self.update_embeddings(event)

    def calculate_derived(self, event: MetricEvent) -> List[MetricEvent]:
        """Calculate derived indicators"""
        derived = []

        if event.indicator == "GDP":
            # Calculate GDP growth rate
            history = self.history["GDP"][-4:]  # Last 4 quarters
            if len(history) >= 2:
                growth = (event.value - history[-2].value) / history[-2].value
                derived.append(MetricEvent(
                    indicator="GDP_GROWTH",
                    value=growth,
                    timestamp=event.timestamp,
                    source=event.source,
                    metadata={"period": "quarterly"}
                ))

        return derived

3. Vector Database for Precedent Search

Historical Period Embeddings:

import lancedb
from sentence_transformers import SentenceTransformer

class EconomicPrecedentSearch:
    """Find similar historical economic periods"""

    def __init__(self):
        self.db = lancedb.connect("./economics.lance")
        self.encoder = SentenceTransformer("all-mpnet-base-v2")
        self.table = self.db.create_table("periods", schema={
            "period": str,
            "description": str,
            "indicators": dict,
            "embedding": list,
            "outcomes": dict
        })

    async def index_period(
        self,
        period: str,
        description: str,
        indicators: Dict[str, float],
        outcomes: Dict[str, Any]
    ):
        """Index historical economic period"""

        # Create textual description
        text = f"{description}. Indicators: {self.format_indicators(indicators)}"

        # Generate embedding
        embedding = self.encoder.encode(text)

        # Store in LanceDB
        self.table.add([{
            "period": period,
            "description": description,
            "indicators": indicators,
            "embedding": embedding.tolist(),
            "outcomes": outcomes
        }])

    async def find_similar_periods(
        self,
        current_indicators: Dict[str, float],
        top_k: int = 5
    ) -> List[Dict]:
        """Find historical periods similar to current conditions"""

        # Create query text
        query_text = f"Economic conditions: {self.format_indicators(current_indicators)}"
        query_embedding = self.encoder.encode(query_text)

        # Vector similarity search
        results = (
            self.table
            .search(query_embedding.tolist())
            .limit(top_k)
            .to_list()
        )

        return results

    def format_indicators(self, indicators: Dict[str, float]) -> str:
        return ", ".join([
            f"{k}: {v:.2f}" for k, v in indicators.items()
        ])

Example Usage:

# Index the 2008 financial crisis
await precedent_search.index_period(
    period="2008-Q4",
    description="Financial crisis with housing market collapse",
    indicators={
        "unemployment": 7.3,
        "gdp_growth": -8.4,
        "inflation": 0.1,
        "fed_rate": 0.25
    },
    outcomes={
        "recession_duration": "18 months",
        "recovery_speed": "slow",
        "policy_response": "QE + TARP"
    }
)

# Find similar periods to today
current = {
    "unemployment": 3.7,
    "gdp_growth": 2.1,
    "inflation": 3.2,
    "fed_rate": 5.25
}
similar = await precedent_search.find_similar_periods(current, top_k=3)

4. AI-Powered Analysis

LLM Integration:

from anthropic import Anthropic

class EconomicAnalyzer:
    """Generate AI-powered economic insights"""

    def __init__(self):
        self.client = Anthropic()
        self.precedent_search = EconomicPrecedentSearch()

    async def analyze_conditions(
        self,
        indicators: Dict[str, float],
        historical_data: DataFrame
    ) -> str:
        """Generate natural language economic analysis"""

        # Find similar historical periods
        precedents = await self.precedent_search.find_similar_periods(indicators)

        # Build context prompt
        prompt = f"""
        Analyze current economic conditions:

        Current Indicators:
        {json.dumps(indicators, indent=2)}

        Historical Context:
        {self.format_precedents(precedents)}

        Recent Trends:
        {self.format_trends(historical_data)}

        Provide:
        1. Current economic state assessment
        2. Key risks and opportunities
        3. Likely near-term trajectory
        4. Historical parallels and lessons
        """

        # Get LLM analysis
        response = await self.client.messages.create(
            model="claude-3-7-sonnet-20250219",
            max_tokens=2000,
            messages=[{"role": "user", "content": prompt}]
        )

        return response.content[0].text

    def format_precedents(self, precedents: List[Dict]) -> str:
        """Format precedent search results"""
        lines = []
        for p in precedents:
            lines.append(f"Period: {p['period']}")
            lines.append(f"  {p['description']}")
            lines.append(f"  Outcomes: {p['outcomes']}")
        return "\n".join(lines)

5. Real-Time Dashboard

WebSocket Streaming:

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    async def broadcast(self, data: dict):
        for connection in self.active_connections:
            await connection.send_json(data)

manager = ConnectionManager()

@app.websocket("/ws/metrics")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)

    try:
        # Stream metrics as they arrive
        async for event in metrics_stream:
            await websocket.send_json({
                "indicator": event.indicator,
                "value": event.value,
                "timestamp": event.timestamp.isoformat(),
                "analysis": await analyzer.analyze_conditions(
                    get_current_indicators(),
                    get_historical_data()
                )
            })
    except WebSocketDisconnect:
        manager.active_connections.remove(websocket)

React Dashboard:

import { useEffect, useState } from 'react';
import { Chart } from 'chart.js';

function EconomicsDashboard() {
  const [metrics, setMetrics] = useState<Metric[]>([]);
  const [analysis, setAnalysis] = useState<string>("");

  useEffect(() => {
    // Connect to WebSocket
    const ws = new WebSocket('ws://localhost:8000/ws/metrics');

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      // Update metrics
      setMetrics(prev => [...prev, {
        indicator: data.indicator,
        value: data.value,
        timestamp: new Date(data.timestamp)
      }]);

      // Update analysis
      setAnalysis(data.analysis);
    };

    return () => ws.close();
  }, []);

  return (
    <div className="dashboard">
      <MetricsGrid metrics={metrics} />
      <TimeSeriesChart data={metrics} />
      <AnalysisPanel text={analysis} />
    </div>
  );
}

Key Features

Multi-Source Data Aggregation

Real-Time Streaming

Vector-Based Precedent Search

AI-Powered Insights

Performance Metrics

Technical Stack

Backend

{
  "framework": "FastAPI",
  "async": "asyncio + aiohttp",
  "database": "LanceDB (vector store)",
  "embeddings": "SentenceTransformers",
  "llm": "Claude 3.7 Sonnet"
}

Frontend

{
  "framework": "React",
  "charts": "Chart.js",
  "realtime": "WebSocket API",
  "styling": "Tailwind CSS"
}

Data Sources

{
  "fred": "FRED API (Federal Reserve)",
  "polygon": "Polygon.io REST + WebSocket",
  "worldbank": "World Bank Data API"
}

Use Cases

1. Investment Research

Real-time economic monitoring for portfolio managers and hedge funds.

2. Policy Analysis

Evaluate potential impacts of policy changes based on historical precedents.

3. Economic Forecasting

Combine current data with historical patterns for near-term predictions.

4. Risk Management

Early warning system for economic downturns or regime changes.

Technical Highlights

Limitations & Considerations

Data Latency:

API Costs:

Precedent Search Quality:

Future Enhancements

Status

Prototype demonstrating feasibility of AI-powered economic analysis with vector precedent search. Core architecture proven with FRED, Polygon, and World Bank integration.


Part of MacLeod Labs Financial Analytics Portfolio