Skip to main content

Data Modeling Guide

Master OpsHub’s powerful data management capabilities built on Supabase and PostgreSQL. This guide covers everything from basic schema design to advanced data optimization techniques.

Data Architecture Overview

Core Data Models

Fund Structure

OpsHub uses a hierarchical fund structure supporting complex investment vehicles
-- Core fund hierarchy
CREATE TABLE funds (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name VARCHAR(255) NOT NULL,
    code VARCHAR(50) UNIQUE NOT NULL,
    type VARCHAR(50), -- 'master', 'feeder', 'standalone'
    parent_fund_id UUID REFERENCES funds(id),
    inception_date DATE,
    base_currency CHAR(3),
    status VARCHAR(20) DEFAULT 'active',
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Fund share classes
CREATE TABLE share_classes (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    fund_id UUID REFERENCES funds(id),
    name VARCHAR(255),
    code VARCHAR(50),
    currency CHAR(3),
    management_fee DECIMAL(5,4),
    performance_fee DECIMAL(5,4),
    minimum_investment DECIMAL(20,2)
);

Position & Holdings

-- Position tracking
CREATE TABLE positions (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    fund_id UUID REFERENCES funds(id),
    security_id UUID REFERENCES securities(id),
    position_date DATE,
    quantity DECIMAL(20,6),
    cost_basis DECIMAL(20,2),
    market_value DECIMAL(20,2),
    unrealized_pnl DECIMAL(20,2),
    weight DECIMAL(5,4),
    PRIMARY KEY (fund_id, security_id, position_date)
);

-- Create optimized indexes
CREATE INDEX idx_positions_fund_date ON positions(fund_id, position_date);
CREATE INDEX idx_positions_security ON positions(security_id);

Transaction Management

-- Transaction records
CREATE TABLE transactions (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    fund_id UUID REFERENCES funds(id),
    transaction_type VARCHAR(50), -- 'buy', 'sell', 'dividend', etc.
    trade_date DATE,
    settlement_date DATE,
    security_id UUID REFERENCES securities(id),
    quantity DECIMAL(20,6),
    price DECIMAL(20,6),
    gross_amount DECIMAL(20,2),
    fees DECIMAL(20,2),
    net_amount DECIMAL(20,2),
    status VARCHAR(20),
    metadata JSONB
);

Data Ingestion Patterns

Batch Processing

  • File Upload
  • API Integration
  • Database Sync
// CSV file ingestion
const ingestCSV = async (file: File) => {
  const { data, error } = await supabase
    .storage
    .from('data-uploads')
    .upload(`batch/${Date.now()}_${file.name}`, file);

  if (!error) {
    // Trigger processing workflow
    await triggerWorkflow('process_csv', {
      fileId: data.path,
      mappingProfile: 'standard_positions'
    });
  }
};

Real-time Streaming

// Real-time market data subscription
const subscribeToMarketData = () => {
  const channel = supabase
    .channel('market-updates')
    .on(
      'postgres_changes',
      {
        event: 'INSERT',
        schema: 'public',
        table: 'market_data'
      },
      (payload) => {
        updateDashboard(payload.new);
        checkAlerts(payload.new);
      }
    )
    .subscribe();
};

Data Transformation

ETL Pipelines

# Example transformation pipeline
from opshub import DataPipeline, Transform

pipeline = DataPipeline("daily_nav_calculation")

# Extract
pipeline.extract_from_source("positions", date=TODAY)
pipeline.extract_from_source("market_data", date=TODAY)
pipeline.extract_from_source("fx_rates", date=TODAY)

# Transform
pipeline.add_transform(Transform.JOIN,
    left="positions",
    right="market_data",
    on="security_id")

pipeline.add_transform(Transform.CALCULATE,
    formula="quantity * price * fx_rate",
    output_column="market_value_base")

pipeline.add_transform(Transform.AGGREGATE,
    group_by="fund_id",
    agg_func="SUM",
    column="market_value_base")

# Load
pipeline.load_to_target("fund_navs")
pipeline.execute()

Data Quality Rules

-- Implement data quality checks
CREATE OR REPLACE FUNCTION check_position_quality()
RETURNS TABLE (
    fund_id UUID,
    issue_type VARCHAR,
    details TEXT
) AS $$
BEGIN
    -- Check for missing prices
    RETURN QUERY
    SELECT
        p.fund_id,
        'missing_price'::VARCHAR,
        'Security ' || s.ticker || ' has no price'
    FROM positions p
    JOIN securities s ON p.security_id = s.id
    LEFT JOIN market_data md ON p.security_id = md.security_id
        AND p.position_date = md.price_date
    WHERE md.price IS NULL;

    -- Check for position mismatches
    RETURN QUERY
    SELECT
        fund_id,
        'position_mismatch'::VARCHAR,
        'Total weight is ' || SUM(weight)::TEXT
    FROM positions
    WHERE position_date = CURRENT_DATE
    GROUP BY fund_id
    HAVING ABS(SUM(weight) - 1.0) > 0.001;
END;
$$ LANGUAGE plpgsql;

Advanced Features

Row-Level Security (RLS)

-- Enable RLS for multi-tenant data isolation
ALTER TABLE positions ENABLE ROW LEVEL SECURITY;

-- Create policies
CREATE POLICY tenant_isolation ON positions
    FOR ALL
    USING (fund_id IN (
        SELECT fund_id FROM user_fund_access
        WHERE user_id = current_user_id()
    ));

-- Function to get current user context
CREATE OR REPLACE FUNCTION current_user_id()
RETURNS UUID AS $$
    SELECT current_setting('app.user_id')::UUID;
$$ LANGUAGE SQL SECURITY DEFINER;

Temporal Data Management

-- Historical data tracking with temporal tables
CREATE TABLE positions_history (
    LIKE positions INCLUDING ALL,
    valid_from TIMESTAMPTZ NOT NULL,
    valid_to TIMESTAMPTZ,
    operation CHAR(1),
    user_id UUID,
    PERIOD FOR valid_time (valid_from, valid_to)
);

-- Trigger for history tracking
CREATE OR REPLACE FUNCTION track_position_history()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'UPDATE' THEN
        INSERT INTO positions_history
        SELECT OLD.*, NOW(), NULL, 'U', current_user_id();
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO positions_history
        SELECT OLD.*, NOW(), NULL, 'D', current_user_id();
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Data Archival Strategy

-- Automated archival for old data
CREATE OR REPLACE FUNCTION archive_old_data()
RETURNS VOID AS $$
DECLARE
    archive_date DATE := CURRENT_DATE - INTERVAL '2 years';
BEGIN
    -- Move to archive schema
    INSERT INTO archive.positions
    SELECT * FROM public.positions
    WHERE position_date < archive_date;

    -- Remove from main table
    DELETE FROM public.positions
    WHERE position_date < archive_date;

    -- Update statistics
    ANALYZE public.positions;
    ANALYZE archive.positions;
END;
$$ LANGUAGE plpgsql;

-- Schedule monthly archival
SELECT cron.schedule('archive-old-data', '0 2 1 * *',
    'SELECT archive_old_data()');

Performance Optimization

Indexing Strategies

-- Covering indexes for common queries
CREATE INDEX idx_positions_covering
ON positions(fund_id, position_date)
INCLUDE (security_id, quantity, market_value);

-- Partial indexes for filtered queries
CREATE INDEX idx_active_funds
ON funds(id)
WHERE status = 'active';

-- Expression indexes
CREATE INDEX idx_positions_month
ON positions(fund_id, DATE_TRUNC('month', position_date));

Query Optimization

-- Use materialized views for complex calculations
CREATE MATERIALIZED VIEW fund_performance AS
WITH daily_returns AS (
    SELECT
        fund_id,
        position_date,
        (nav - LAG(nav) OVER (PARTITION BY fund_id ORDER BY position_date))
            / NULLIF(LAG(nav) OVER (PARTITION BY fund_id ORDER BY position_date), 0) AS daily_return
    FROM fund_navs
)
SELECT
    fund_id,
    DATE_TRUNC('month', position_date) AS month,
    AVG(daily_return) * 252 AS annualized_return,
    STDDEV(daily_return) * SQRT(252) AS annualized_volatility,
    AVG(daily_return) / NULLIF(STDDEV(daily_return), 0) * SQRT(252) AS sharpe_ratio
FROM daily_returns
GROUP BY fund_id, DATE_TRUNC('month', position_date);

-- Refresh strategy
CREATE UNIQUE INDEX ON fund_performance (fund_id, month);
REFRESH MATERIALIZED VIEW CONCURRENTLY fund_performance;

Caching Layer

// Redis caching for frequently accessed data
import { Redis } from '@upstash/redis';

const redis = new Redis({
  url: process.env.REDIS_URL,
  token: process.env.REDIS_TOKEN
});

export async function getCachedPositions(fundId: string, date: string) {
  const cacheKey = `positions:${fundId}:${date}`;

  // Check cache first
  const cached = await redis.get(cacheKey);
  if (cached) return cached;

  // Fetch from database
  const { data } = await supabase
    .from('positions')
    .select('*')
    .eq('fund_id', fundId)
    .eq('position_date', date);

  // Cache for 1 hour
  await redis.setex(cacheKey, 3600, JSON.stringify(data));

  return data;
}

Data Governance

Audit Trail

-- Comprehensive audit logging
CREATE TABLE audit_log (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    table_name VARCHAR(50),
    operation VARCHAR(10),
    user_id UUID,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    old_data JSONB,
    new_data JSONB,
    ip_address INET,
    user_agent TEXT
);

-- Generic audit trigger
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO audit_log (
        table_name,
        operation,
        user_id,
        old_data,
        new_data
    ) VALUES (
        TG_TABLE_NAME,
        TG_OP,
        current_user_id(),
        row_to_json(OLD),
        row_to_json(NEW)
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Data Lineage

# Track data lineage
from opshub import DataLineage

lineage = DataLineage()

# Register transformation
lineage.register_transformation(
    source_tables=["positions", "market_data"],
    target_table="fund_navs",
    transformation="calculate_nav",
    sql_query=nav_calculation_sql,
    business_rules=["Apply FX rates", "Include accruals"]
)

# Generate lineage diagram
lineage.visualize("fund_navs")

Best Practices

Schema Design

Use normalized schemas with strategic denormalization for performance

Data Validation

Implement constraints and triggers for data integrity

Backup Strategy

Regular automated backups with point-in-time recovery

Documentation

Maintain data dictionary and column descriptions

Next Steps


Pro Tip: Use Supabase’s real-time subscriptions for live data updates in dashboards without polling.