Data Modeling Guide
Master OpsHub’s powerful data management capabilities built on Supabase and PostgreSQL. This guide covers everything from basic schema design to advanced data optimization techniques.Data Architecture Overview
Core Data Models
Fund Structure
OpsHub uses a hierarchical fund structure supporting complex investment vehicles
Copy
-- Core fund hierarchy
CREATE TABLE funds (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
code VARCHAR(50) UNIQUE NOT NULL,
type VARCHAR(50), -- 'master', 'feeder', 'standalone'
parent_fund_id UUID REFERENCES funds(id),
inception_date DATE,
base_currency CHAR(3),
status VARCHAR(20) DEFAULT 'active',
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Fund share classes
CREATE TABLE share_classes (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
fund_id UUID REFERENCES funds(id),
name VARCHAR(255),
code VARCHAR(50),
currency CHAR(3),
management_fee DECIMAL(5,4),
performance_fee DECIMAL(5,4),
minimum_investment DECIMAL(20,2)
);
Position & Holdings
Copy
-- Position tracking
CREATE TABLE positions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
fund_id UUID REFERENCES funds(id),
security_id UUID REFERENCES securities(id),
position_date DATE,
quantity DECIMAL(20,6),
cost_basis DECIMAL(20,2),
market_value DECIMAL(20,2),
unrealized_pnl DECIMAL(20,2),
weight DECIMAL(5,4),
PRIMARY KEY (fund_id, security_id, position_date)
);
-- Create optimized indexes
CREATE INDEX idx_positions_fund_date ON positions(fund_id, position_date);
CREATE INDEX idx_positions_security ON positions(security_id);
Transaction Management
Copy
-- Transaction records
CREATE TABLE transactions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
fund_id UUID REFERENCES funds(id),
transaction_type VARCHAR(50), -- 'buy', 'sell', 'dividend', etc.
trade_date DATE,
settlement_date DATE,
security_id UUID REFERENCES securities(id),
quantity DECIMAL(20,6),
price DECIMAL(20,6),
gross_amount DECIMAL(20,2),
fees DECIMAL(20,2),
net_amount DECIMAL(20,2),
status VARCHAR(20),
metadata JSONB
);
Data Ingestion Patterns
Batch Processing
- File Upload
- API Integration
- Database Sync
Copy
// CSV file ingestion
const ingestCSV = async (file: File) => {
const { data, error } = await supabase
.storage
.from('data-uploads')
.upload(`batch/${Date.now()}_${file.name}`, file);
if (!error) {
// Trigger processing workflow
await triggerWorkflow('process_csv', {
fileId: data.path,
mappingProfile: 'standard_positions'
});
}
};
Real-time Streaming
Copy
// Real-time market data subscription
const subscribeToMarketData = () => {
const channel = supabase
.channel('market-updates')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'market_data'
},
(payload) => {
updateDashboard(payload.new);
checkAlerts(payload.new);
}
)
.subscribe();
};
Data Transformation
ETL Pipelines
Copy
# Example transformation pipeline
from opshub import DataPipeline, Transform
pipeline = DataPipeline("daily_nav_calculation")
# Extract
pipeline.extract_from_source("positions", date=TODAY)
pipeline.extract_from_source("market_data", date=TODAY)
pipeline.extract_from_source("fx_rates", date=TODAY)
# Transform
pipeline.add_transform(Transform.JOIN,
left="positions",
right="market_data",
on="security_id")
pipeline.add_transform(Transform.CALCULATE,
formula="quantity * price * fx_rate",
output_column="market_value_base")
pipeline.add_transform(Transform.AGGREGATE,
group_by="fund_id",
agg_func="SUM",
column="market_value_base")
# Load
pipeline.load_to_target("fund_navs")
pipeline.execute()
Data Quality Rules
Copy
-- Implement data quality checks
CREATE OR REPLACE FUNCTION check_position_quality()
RETURNS TABLE (
fund_id UUID,
issue_type VARCHAR,
details TEXT
) AS $$
BEGIN
-- Check for missing prices
RETURN QUERY
SELECT
p.fund_id,
'missing_price'::VARCHAR,
'Security ' || s.ticker || ' has no price'
FROM positions p
JOIN securities s ON p.security_id = s.id
LEFT JOIN market_data md ON p.security_id = md.security_id
AND p.position_date = md.price_date
WHERE md.price IS NULL;
-- Check for position mismatches
RETURN QUERY
SELECT
fund_id,
'position_mismatch'::VARCHAR,
'Total weight is ' || SUM(weight)::TEXT
FROM positions
WHERE position_date = CURRENT_DATE
GROUP BY fund_id
HAVING ABS(SUM(weight) - 1.0) > 0.001;
END;
$$ LANGUAGE plpgsql;
Advanced Features
Row-Level Security (RLS)
Copy
-- Enable RLS for multi-tenant data isolation
ALTER TABLE positions ENABLE ROW LEVEL SECURITY;
-- Create policies
CREATE POLICY tenant_isolation ON positions
FOR ALL
USING (fund_id IN (
SELECT fund_id FROM user_fund_access
WHERE user_id = current_user_id()
));
-- Function to get current user context
CREATE OR REPLACE FUNCTION current_user_id()
RETURNS UUID AS $$
SELECT current_setting('app.user_id')::UUID;
$$ LANGUAGE SQL SECURITY DEFINER;
Temporal Data Management
Copy
-- Historical data tracking with temporal tables
CREATE TABLE positions_history (
LIKE positions INCLUDING ALL,
valid_from TIMESTAMPTZ NOT NULL,
valid_to TIMESTAMPTZ,
operation CHAR(1),
user_id UUID,
PERIOD FOR valid_time (valid_from, valid_to)
);
-- Trigger for history tracking
CREATE OR REPLACE FUNCTION track_position_history()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
INSERT INTO positions_history
SELECT OLD.*, NOW(), NULL, 'U', current_user_id();
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO positions_history
SELECT OLD.*, NOW(), NULL, 'D', current_user_id();
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Data Archival Strategy
Copy
-- Automated archival for old data
CREATE OR REPLACE FUNCTION archive_old_data()
RETURNS VOID AS $$
DECLARE
archive_date DATE := CURRENT_DATE - INTERVAL '2 years';
BEGIN
-- Move to archive schema
INSERT INTO archive.positions
SELECT * FROM public.positions
WHERE position_date < archive_date;
-- Remove from main table
DELETE FROM public.positions
WHERE position_date < archive_date;
-- Update statistics
ANALYZE public.positions;
ANALYZE archive.positions;
END;
$$ LANGUAGE plpgsql;
-- Schedule monthly archival
SELECT cron.schedule('archive-old-data', '0 2 1 * *',
'SELECT archive_old_data()');
Performance Optimization
Indexing Strategies
Copy
-- Covering indexes for common queries
CREATE INDEX idx_positions_covering
ON positions(fund_id, position_date)
INCLUDE (security_id, quantity, market_value);
-- Partial indexes for filtered queries
CREATE INDEX idx_active_funds
ON funds(id)
WHERE status = 'active';
-- Expression indexes
CREATE INDEX idx_positions_month
ON positions(fund_id, DATE_TRUNC('month', position_date));
Query Optimization
Copy
-- Use materialized views for complex calculations
CREATE MATERIALIZED VIEW fund_performance AS
WITH daily_returns AS (
SELECT
fund_id,
position_date,
(nav - LAG(nav) OVER (PARTITION BY fund_id ORDER BY position_date))
/ NULLIF(LAG(nav) OVER (PARTITION BY fund_id ORDER BY position_date), 0) AS daily_return
FROM fund_navs
)
SELECT
fund_id,
DATE_TRUNC('month', position_date) AS month,
AVG(daily_return) * 252 AS annualized_return,
STDDEV(daily_return) * SQRT(252) AS annualized_volatility,
AVG(daily_return) / NULLIF(STDDEV(daily_return), 0) * SQRT(252) AS sharpe_ratio
FROM daily_returns
GROUP BY fund_id, DATE_TRUNC('month', position_date);
-- Refresh strategy
CREATE UNIQUE INDEX ON fund_performance (fund_id, month);
REFRESH MATERIALIZED VIEW CONCURRENTLY fund_performance;
Caching Layer
Copy
// Redis caching for frequently accessed data
import { Redis } from '@upstash/redis';
const redis = new Redis({
url: process.env.REDIS_URL,
token: process.env.REDIS_TOKEN
});
export async function getCachedPositions(fundId: string, date: string) {
const cacheKey = `positions:${fundId}:${date}`;
// Check cache first
const cached = await redis.get(cacheKey);
if (cached) return cached;
// Fetch from database
const { data } = await supabase
.from('positions')
.select('*')
.eq('fund_id', fundId)
.eq('position_date', date);
// Cache for 1 hour
await redis.setex(cacheKey, 3600, JSON.stringify(data));
return data;
}
Data Governance
Audit Trail
Copy
-- Comprehensive audit logging
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
table_name VARCHAR(50),
operation VARCHAR(10),
user_id UUID,
timestamp TIMESTAMPTZ DEFAULT NOW(),
old_data JSONB,
new_data JSONB,
ip_address INET,
user_agent TEXT
);
-- Generic audit trigger
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (
table_name,
operation,
user_id,
old_data,
new_data
) VALUES (
TG_TABLE_NAME,
TG_OP,
current_user_id(),
row_to_json(OLD),
row_to_json(NEW)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Data Lineage
Copy
# Track data lineage
from opshub import DataLineage
lineage = DataLineage()
# Register transformation
lineage.register_transformation(
source_tables=["positions", "market_data"],
target_table="fund_navs",
transformation="calculate_nav",
sql_query=nav_calculation_sql,
business_rules=["Apply FX rates", "Include accruals"]
)
# Generate lineage diagram
lineage.visualize("fund_navs")
Best Practices
Schema Design
Use normalized schemas with strategic denormalization for performance
Data Validation
Implement constraints and triggers for data integrity
Backup Strategy
Regular automated backups with point-in-time recovery
Documentation
Maintain data dictionary and column descriptions
Next Steps
- Explore Database Schema Reference
- Review API Data Access
- Learn about Workflow Integration
Pro Tip: Use Supabase’s real-time subscriptions for live data updates in dashboards without polling.