Skip to main content
Version: 0.1

Operational Analytics

Overview​

Operational analytics combines transactional processing with real-time analytical capabilities. This use case focuses on maintaining a columnstore copy of your rowstore table that remains relatively up-to-date for analytical queries.

Note: pg_mooncake v0.1 is not fully optimized for real-time operational analytics. This capability will be significantly enhanced in v0.2. The approach described below provides a workable solution for v0.1.

Implementation Pattern​

To achieve near real-time analytics with pg_mooncake v0.1, you can implement the following pattern:

1. Create a Staging Table​

First, create an intermediate rowstore table that will temporarily hold new data:

-- Create a staging table with the same schema as your source table
CREATE TABLE orders_staging (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10,2),
order_date TIMESTAMP,
-- Add any other columns from your source table
PRIMARY KEY (order_id)
);

2. Set Up a Trigger on the Source Table​

Create a trigger that captures changes to your source table and copies them to the staging table:

CREATE OR REPLACE FUNCTION copy_to_staging()
RETURNS TRIGGER AS $$
BEGIN
-- Insert the new row into the staging table
INSERT INTO orders_staging VALUES (NEW.*)
ON CONFLICT (order_id) DO UPDATE
SET
customer_id = EXCLUDED.customer_id,
product_id = EXCLUDED.product_id,
quantity = EXCLUDED.quantity,
unit_price = EXCLUDED.unit_price,
order_date = EXCLUDED.order_date;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER orders_to_staging
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION copy_to_staging();

3. Create the Columnstore Table​

CREATE TABLE orders_analytics (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10,2),
order_date TIMESTAMP,
-- Add any other columns from your source table
last_updated TIMESTAMP
) USING columnstore;

4. Set Up a Periodic Sync Job​

Create a function to sync data from the staging table to the columnstore table:

CREATE OR REPLACE FUNCTION sync_to_columnstore()
RETURNS void AS $$
DECLARE
sync_count INT;
BEGIN
-- Insert new records from staging into columnstore
INSERT INTO orders_analytics
SELECT
s.*,
NOW() AS last_updated
FROM
orders_staging s
LEFT JOIN orders_analytics c ON s.order_id = c.order_id
WHERE
c.order_id IS NULL;

-- Get count of synced records
GET DIAGNOSTICS sync_count = ROW_COUNT;

-- Log the sync operation
RAISE NOTICE 'Synced % new records to columnstore', sync_count;

-- Clear the staging table after successful sync
TRUNCATE orders_staging;

-- Optional: Run a vacuum operation to optimize storage
-- VACUUM orders_analytics;

RETURN;
END;
$$ LANGUAGE plpgsql;

5. Schedule the Sync Job​

Use pg_cron to run the sync job at your desired frequency:

-- Install pg_cron if not already installed
CREATE EXTENSION IF NOT EXISTS pg_cron;

-- Schedule the sync job to run every minute
SELECT cron.schedule('sync-to-columnstore', '* * * * *', 'SELECT sync_to_columnstore()');

Considerations and Limitations​

  • Append-Only Operations: This approach works best for append-only workloads where you're primarily inserting new data rather than updating existing records.

  • Sync Frequency: The frequency of your sync job determines how "real-time" your analytics will be. More frequent syncs provide more up-to-date data but increase system load.