Operational Analytics
Overview​
Operational analytics combines transactional processing with real-time analytical capabilities. This use case focuses on maintaining a columnstore copy of your rowstore table that remains relatively up-to-date for analytical queries.
Note: pg_mooncake v0.1 is not fully optimized for real-time operational analytics. This capability will be significantly enhanced in v0.2. The approach described below provides a workable solution for v0.1.
Implementation Pattern​
To achieve near real-time analytics with pg_mooncake v0.1, you can implement the following pattern:
1. Create a Staging Table​
First, create an intermediate rowstore table that will temporarily hold new data:
-- Create a staging table with the same schema as your source table
CREATE TABLE orders_staging (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10,2),
order_date TIMESTAMP,
-- Add any other columns from your source table
PRIMARY KEY (order_id)
);
2. Set Up a Trigger on the Source Table​
Create a trigger that captures changes to your source table and copies them to the staging table:
CREATE OR REPLACE FUNCTION copy_to_staging()
RETURNS TRIGGER AS $$
BEGIN
-- Insert the new row into the staging table
INSERT INTO orders_staging VALUES (NEW.*)
ON CONFLICT (order_id) DO UPDATE
SET
customer_id = EXCLUDED.customer_id,
product_id = EXCLUDED.product_id,
quantity = EXCLUDED.quantity,
unit_price = EXCLUDED.unit_price,
order_date = EXCLUDED.order_date;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_to_staging
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION copy_to_staging();
3. Create the Columnstore Table​
CREATE TABLE orders_analytics (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10,2),
order_date TIMESTAMP,
-- Add any other columns from your source table
last_updated TIMESTAMP
) USING columnstore;
4. Set Up a Periodic Sync Job​
Create a function to sync data from the staging table to the columnstore table:
CREATE OR REPLACE FUNCTION sync_to_columnstore()
RETURNS void AS $$
DECLARE
sync_count INT;
BEGIN
-- Insert new records from staging into columnstore
INSERT INTO orders_analytics
SELECT
s.*,
NOW() AS last_updated
FROM
orders_staging s
LEFT JOIN orders_analytics c ON s.order_id = c.order_id
WHERE
c.order_id IS NULL;
-- Get count of synced records
GET DIAGNOSTICS sync_count = ROW_COUNT;
-- Log the sync operation
RAISE NOTICE 'Synced % new records to columnstore', sync_count;
-- Clear the staging table after successful sync
TRUNCATE orders_staging;
-- Optional: Run a vacuum operation to optimize storage
-- VACUUM orders_analytics;
RETURN;
END;
$$ LANGUAGE plpgsql;
5. Schedule the Sync Job​
Use pg_cron to run the sync job at your desired frequency:
-- Install pg_cron if not already installed
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Schedule the sync job to run every minute
SELECT cron.schedule('sync-to-columnstore', '* * * * *', 'SELECT sync_to_columnstore()');
Considerations and Limitations​
-
Append-Only Operations: This approach works best for append-only workloads where you're primarily inserting new data rather than updating existing records.
-
Sync Frequency: The frequency of your sync job determines how "real-time" your analytics will be. More frequent syncs provide more up-to-date data but increase system load.