Partitioning and "live snapshots" of data in PostgreSQL

Although the theme of partition has been raised previously I want her to come back and tell us about your experience of solving this problem that arose in connection with the need for analytical processing of large amounts of data. In addition to partitioning, I will consider an extremely simplified implementation of "snapshots" of aggregate queries that are automatically updated when the source data changes.

One of the main requirements to the system being developed was the use of free software therefore, the choice fell on PostgreSQL. At the beginning of the work on the project, I'm pretty superficial know PostgreSQL, but was well familiar with the capabilities of Oracle Database. Since it was on analytical treatment, I wanted to have analogues of these options, Oracle as Partitioning and Materialized Views. After getting acquainted with the possibilities PostgreSQL, it became clear that this functionality one way or another, will have to write it yourself.

Of course, it was not about any full implementation of Materialized Views involving query rewriting. For my needs it is quite enough, the possibility of creating automatically updating the aggregated single-table samples (support connection tables will likely be added in the near future). For partitioning, I was planning to use repeatedly described the approach using inheritable tables insert data-driven trigger. I had thought to use for control partitioning, Rulesbut I rejected it because, in my case, prevailed inserting data into single records.

I began, of course, with tables to store metadata

ps_tables.sql
create sequence ps_table_seq;

create table ps_table (
id bigint default nextval('ps_table_seq') not null,
name varchar(50) not null unique,
primary key(id)
);

create sequence ps_column_seq;

create table ps_column (
id bigint default nextval('ps_column_seq') not null,
table_id bigint not null references ps_table(id),
name varchar(50) not null,
parent_name varchar(50),
type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
unique (table_id, name),
primary key(id)
);

create table ps_range_partition (
table_id bigint not null references ps_table(id),
type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')),
start_value date not null,
end_value date not null,
primary key(table_id, start_value)
);

create table ps_snapshot (
snapshot_id bigint not null references ps_table(id),
table_id bigint not null references ps_table(id),
type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')),
primary key(snapshot_id)
);


Everything here is pretty obvious. The only thing worth to say is column types:
the the the the the the the the
Type
Description
date
the Column containing the calendar date used in the partitioning and aggregation of data (supported types are date and timestamp PostgreSQL)
key
the Key used in the phrase group by, aggregation of the data (supports all integer types PostgreSQL)
nullable
the Key used when aggregation of data may contain a null value
sum
the values
min
the Minimum value
max
Maximum value
cnt
Count non-null values

The basis of all decisions was the function that performs the rebuild of the functions of the triggers for the table that contains the source data:

ps_trigger_regenerate(bigint)
create or replace function ps_trigger_regenerate(in p_table bigint) returns void
as $$
declare
l_sql text;
l_table_name varchar(50);
l_date_column varchar(50);
l_flag boolean;
tabs record;
columns record;
begin
select name into l_table_name
from ps_table where id = p_table;

l_sql := 
'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || '  set  ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and not type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
end if;
if columns.type_name = 'min' then
l_sql := l_sql ||
columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'max' then
l_sql := l_sql ||
columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower("' || tabs.snapshot_type || "'), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ' ||
'if not FOUND then' ||
'insert into' || tabs.table_name || '(';
l_flag = FALSE;
for columns in
select name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') values (';
l_flag = FALSE;
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower("' || tabs.snapshot_type || "'), NEW.' || columns.parent_name || ')';
elsif columns.type_name = 'cnt' then
l_sql := l_sql || 'case when NEW.' || columns.parent_name || 'is null then 0 else 1 end';
elsif columns.type_name in ('nullable', 'sum') then
l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
else
l_sql := l_sql || 'NEW.' || columns.parent_name;
end if;
end loop;
l_sql := l_sql || '); ' ||
'end if; ';
end loop;
select name into l_date_column
from ps_column
where table_id = p_table
and type_name = 'date';
for tabs in
select to_char(start_value, 'YYYYMMDD') as start_value,
to_char(end_value, 'YYYYMMDD') as end_value,
type_name
from ps_range_partition
where table_id = p_table
order by start_value desc
loop
l_sql := l_sql ||
'if NEW.' || l_date_column || ' >= to_date("' || tabs.start_value || "', "YYYYMMDD") and NEW.' || l_date_column || ' < to_date("' || tabs.end_value || "', "YYYYMMDD") then ' ||
'insert into' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); '||
'return null;' ||
'end if; ';
end loop;
l_sql := l_sql ||
'return NEW; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;

l_sql := 
'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger' ||
'as $'|| '$ ' ||
'begin' ||
'raise EXCEPTION, "Can""t support % on MIN or MAX aggregate", TG_OP;' ||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;

l_sql := 
'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower("' || tabs.snapshot_type || "'), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';

if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;

l_sql := 
'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name ||
'- case when OLD.' || columns.parent_name || 'is null then 0 else 1 end' ||
'+ case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower("' || tabs.snapshot_type || "'), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
end;
$$ language plpgsql;


Despite his frightening appearance, this function is quite simple. Its goal is to develop (based on existing metadata), four functions are used to construct triggers:

the
    the
  • ps_TABLE_insert_trigger() — Function management panel data
  • the
  • ps_TABLE_update_trigger () Function manages updating of data
  • the
  • ps_TABLE_delete_trigger() — a Function of managing deletion of data
  • the
  • ps_TABLE_raise_trigger () is a Function of prohibiting updating, and deleting data

Here is the TABLE populated with the name of the table containing the source data. A typical function definition ps_TABLE_insert_trigger() will look as follows:

the
create or replace function ps_data_insert_trigger() returns trigger
as $$
begin
update set data_month
sum_field = sum_field + NEW.sum_field
, min_field = least(min_field, NEW.min_field)
where date_field = date_trunc('month', NEW.date_field)
and key_field = NEW.key_field;
if not FOUND then
insert into data_month(date_field, key_field, sum_field, min_field)
values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
end if;
if NEW.date_field > = to_date('20130101', 'YYYYMMDD') and 
NEW.date_field < to_date('20130201', 'YYYYMMDD') then
insert into data_20130101 values (NEW.*);
return null;
end if;
return NEW;
end;
$$ language plpgsql;

In fact, the function looks more complicated because in a special way processed null values. But, as an illustration, the above example is quite adequate. The logic of this code is obvious:

the
    the
  • When you insert in the source table data, trying to update counters in an aggregated view, data_month
  • the
  • If it failed (entry not found in data_month), add a new record
  • Forth, check the contact date range for each partition (in the example one section), and if successful, insert the record into the appropriate section (as the section is inherited from the main table, you can safely use the asterisk) and return null to prevent the insert of a record in the main table the

  • If none of the sections is not suitable to be returned NEW, allowing to insert into a main table


The last point leads to the fact that if an appropriate partition is not found, data is appended to the main table. In practice, this is quite convenient. Even if we do not create partition in advance or receive data with incorrect date, insertion of data is successful. You can subsequently analyze the contents of the main table by running the query:

the
select * from data only

Then, create the missing section (as will be shown below, the data are automatically transferred from the main table in the created section). In such cases, the number of records that did not appear in his section, as a rule, is not large and the costs for data transfer are insignificant.
Now we have to make a harness. Let's start with creating a new partition:

ps_add_range_partition(varchar, varchar, varchar, date)
create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, 
in p_type varchar, in p_start date) returns void
as $$
declare
l_sql text;
l_end date;
l_start_str varchar(10);
l_end_str varchar(10);
l_table bigint;
l_flag boolean;
columns record;
begin
perform 1
from ps_table a, b ps_column
where a.id = b.table_id and lower(a.name) = lower(p_table)
and b.type_name = 'date' and lower(b.name) <> lower(p_column);
if FOUND then
raise EXCEPTION 'Conflict DATE columns';
end if;

l_end := p_start + ('1' || p_type)::INTERVAL;

perform 1
from ps_table a, b ps_range_partition
where a.id = b.table_id and lower(a.name) = lower(p_table)
and (( p_start >= b.start_value and p_start < b.end_value ) or
( b.start_value > = p_start and b.start_value < l_end ));
if FOUND then
raise EXCEPTION 'Range intervals intersects';
end if;

perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;

select id into l_table
from ps_table
where lower(name) = lower(p_table);

perform 1
from ps_column
where table_id = l_table and type_name = 'date'
and lower(name) = lower(p_column);
if not FOUND then
insert into ps_column(table_id, name, type_name)
values (l_table, lower(p_column), 'date');
end if;

insert into ps_range_partition(table_id, type_name, start_value, end_value)
values (l_table, p_type, p_start, l_end);

l_start_str = to_char(p_start, 'YYYYMMDD');
l_end_str = to_char(l_end, 'YYYYMMDD');

l_sql :=
'create table' || p_table || '_' || l_start_str || '(' ||
'check (' || p_column || ' >= to_date("' || l_start_str || "', "YYYYMMDD") and ' ||
p_column || ' < to_date("' || l_end_str || "', "YYYYMMDD")), ' ||
'primary key (';

l_flag := FALSE;
for columns in
select f.name as name
from ( select ps_array_to_set(a.conkey) as nn
from pg_constraint a, pg_class b
where b.oid = a.conrelid
and a.contype = 'p'
and b.relname = p_table ) c, 
( select d.attname as name, d.attnum as nn
d from pg_attribute, pg_class e
where e.oid = d.attrelid
and e.relname = p_table ) f
where f.nn = c.nn
order by f.nn
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;

l_sql := l_sql ||
')) inherits (' || p_table || ')';
execute l_sql;

l_sql := 
'create index' || p_table || '_' || l_start_str || '_date on' || p_table || '_' || l_start_str || '(' || p_column || ')';
execute l_sql;

perform ps_trigger_regenerate(l_table);

execute 'drop trigger if exists ps_' || p_table || '_before_insert on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on' || p_table;

l_sql := 
'insert into' || p_table || '_' || l_start_str || '' ||
'select * from' || p_table || 'where' ||
p_column || ' >= to_date("' || l_start_str || "', "YYYYMMDD") and ' ||
p_column || ' < to_date("' || l_end_str || "', "YYYYMMDD")';
execute l_sql;

l_sql := 
'delete from only' || p_table || 'where' ||
p_column || ' >= to_date("' || l_start_str || "', "YYYYMMDD") and ' ||
p_column || ' < to_date("' || l_end_str || "', "YYYYMMDD")';
execute l_sql;

l_sql := 
'create trigger ps_' || p_table || '_before_insert' ||
'before insert on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, b ps_column
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update' ||
'after update on' || p_table || '_' || l_start_str || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete' ||
'after delete on' || p_table || '_' || l_start_str || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update' ||
'after update on' || p_table || '_' || l_start_str || 'for each row' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete' ||
'after delete on' || p_table || '_' || l_start_str || 'for each row' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end;
$$ language plpgsql;


Here, after checking the validity of input data, we add the necessary metadata, and then create inherited table. Then, we recreate the trigger function call ps_trigger_regenerate, and then move the data under the condition that the partition created partition dynamic query and recreate the triggers themselves.

Difficulties arose from two factors.
    the
  1. we had to suffer with the addition to the launch date month, day, or year (depending on the input parameter p_type:

    the
    l_end := p_start + ('1' || p_type)::INTERVAL;
    

  2. the
  3. Because the primary key is not inherited, it was necessary to write a request to System Catalogs to get a list of columns of the primary key of the source table (stored in its metadata description of a primary key, I felt it inappropriate):

    the
     select f.name as name
    from ( select ps_array_to_set(a.conkey) as nn
    from pg_constraint a, pg_class b
    where b.oid = a.conrelid
    and a.contype = 'p'
    and b.relname = p_table ) c, 
    ( select d.attname as name, d.attnum as nn
    d from pg_attribute, pg_class e
    where e.oid = d.attrelid
    and e.relname = p_table ) f
    where f.nn = c.nn
    order by f.nn
    


Also, it should be noted that before creating index on partition key (created section), it would be worthwhile to pre-test if he isn't the leading column of the primary key (not to create a duplicate index).

The function of deleting a partition is much simpler and needs no special comments:

ps_del_range_partition(varchar, date)
create or replace function ps_del_range_partition(in p_table varchar, in p_start date) 
returns void
as $$
declare
l_sql text;
l_start_str varchar(10);
l_table bigint;
begin
select id into l_table
from ps_table
where lower(name) = lower(p_table);

l_start_str = to_char(p_start, 'YYYYMMDD');

delete from ps_range_partition 
where table_id = l_table
and start_value = p_start;

perform ps_trigger_regenerate(l_table);

l_sql := 
'insert into' || p_table || '' ||
'select * from' || p_table || '_' || l_start_str;
execute l_sql;

perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table );
if not FOUND then
execute 'drop trigger if exists ps_' || p_table || '_before_insert on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on' || p_table;

execute 'drop function ps_' || p_table || '_insert_trigger() cascade';
execute 'drop function ps_' || p_table || '_raise_trigger() cascade';
execute 'drop function ps_' || p_table || '_update_trigger() cascade';
execute 'drop function ps_' || p_table || '_delete_trigger() cascade';

delete from ps_column where table_id = l_table;
delete from ps_table where id = l_table;
end if;

perform 1
from ps_range_partition
where table_id = l_table;
if not FOUND then
delete from ps_column 
where table_id = l_table
and type_name = 'date';
end if;

execute 'drop table' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;


When you delete a partition the data, of course, is not lost, but transferred to the main table (pre-triggers are removed because, as it turned out, the keyword only does not work in the insert statement).

It remains to add management functions "live" snapshots of the data:

ps_add_snapshot_column(varchar, varchar, varchar, varchar)
create or replace function ps_add_snapshot_column(in p_snapshot varchar, 
in p_column varchar, varchar in p_parent, in p_type varchar) returns void
as $$
declare
l_table bigint;
begin
perform 1
from ps_table
where lower(name) = lower(p_snapshot);
if not FOUND then
insert into ps_table(name) values (lower(p_snapshot));
end if;

select id into l_table
from ps_table
where lower(name) = lower(p_snapshot);

insert into ps_column(table_id, name, parent_name, type_name)
values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;


ps_add_snapshot(varchar, varchar, varchar)
create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, 
in p_type varchar) returns void
as $$
declare
l_sql text;
l_table bigint;
l_snapshot bigint;
l_flag boolean;
columns record;
begin
select id into l_snapshot
from ps_table
where lower(name) = lower(p_snapshot);

perform 1
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key');
if not FOUND then
raise EXCEPTION 'Key columns not found';
end if;

perform 1
from ps_column
where table_id = l_snapshot
and not type_name in ('date', 'key', 'nullable');
if not FOUND then
raise EXCEPTION 'Aggregate columns not found';
end if;

perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;

select id into l_table
from ps_table
where lower(name) = lower(p_table);

insert into ps_snapshot(table_id, snapshot_id, type_name)
values (l_table, l_snapshot, p_type);

perform ps_trigger_regenerate(l_table);

l_sql := 'create table' || p_snapshot || ' (';
l_flag := FALSE;
for columns in
select name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || columns.name || 'date not null';
else
l_sql := l_sql || columns.name || 'bigint not null';
end if;
end loop;
l_sql := l_sql || ', primary key (';
l_flag := FALSE;
for columns in
select name
from ps_column

and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || '))';
execute l_sql;

execute 'drop trigger if exists ps_' || p_table || '_before_insert on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on' || p_table;

l_sql := 
'create trigger ps_' || p_table || '_before_insert' ||
'before insert on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;

perform 1
from ps_snapshot a, b ps_column
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;

l_sql := 'insert into' || p_snapshot || '(';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') select ';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower("' || p_type || "'), ' || columns.name || ')';
end if;
if columns.type_name = 'key' then
l_sql := l_sql || columns.name;
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
end if;
if columns.type_name = 'sum' then
l_sql := l_sql || 'sum(' || columns.name || ')';
end if;
if columns.type_name = 'min' then
l_sql := l_sql || 'min(' || columns.name || ')';
end if;
if columns.type_name = 'max' then
l_sql := l_sql || 'max(' || columns.name || ')';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql || 'count(' || columns.name || ')';
end if;
end loop;
l_sql := l_sql || 'from' || p_table || ' group by ';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower("' || p_type || "'), ' || columns.name || ')';
else
l_sql := l_sql || columns.name;
end if;
end loop;
execute l_sql;
end;
$$ language plpgsql;


ps_del_snapshot(varchar)
create or replace function ps_del_snapshot(in p_snapshot varchar) returns void
as $$
declare
l_sql text;
p_table varchar(50);
l_table bigint;
l_snapshot bigint;
begin
select a.table_id, c.name into l_table, p_table
from ps_snapshot a, ps_table b, ps_table c
where b.id = a.snapshot_id and c.id = a.table_id
and lower(b.name) = lower(p_snapshot);

select id into l_snapshot
from ps_table
where lower(name) = lower(p_snapshot);

delete from ps_snapshot where snapshot_id = l_snapshot;
delete from ps_column where table_id = l_snapshot;
delete from ps_table where id = l_snapshot;

execute 'drop trigger if exists ps_' || p_table || '_before_insert on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on' || p_table;

perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table );
if not FOUND then
execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
else
perform ps_trigger_regenerate(l_table);

l_sql := 
'create trigger ps_' || p_table || '_before_insert' ||
'before insert on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;

perform 1
from ps_snapshot a, b ps_column
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql := 
'create trigger ps_' || p_table || '_after_update' ||
'after update on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql := 
'create trigger ps_' || p_table || '_after_delete' ||
'after delete on' || p_table || 'for each row' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end if;

execute 'drop table if exists' || p_snapshot;
end;
$$ language plpgsql;


Here, too, there is nothing new and the only thing I would like to note is that, in the case of using aggregates 'min' or 'max', when you create triggers, use the function ps_TABLE_raise_trigger () prohibiting removal of and changes to the table on which the snapshot is built. This is because I couldn't think of an adequate performance implementation update these units when you execute the update and delete statements in the source table.
Let's see how it all works. Let's create a test table:

the
create sequence test_seq;

create table test (
id bigint default nextval('test_seq') not null,
event_time timestamp not null,
customer_id bigint not null,
value bigint not null,
primary key(id)
);

Now, to add a section, it is sufficient to run the following query:

the
select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))

As a result, will create the inherited table test_20130501 that will automatically get all the records for the month of may.

To delete sections, you can run the following query:

the
select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))


The creation of the snapshot is somewhat more complicated, because pre-need to define the columns we are interested in:

the
select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')


As a result, will create an automatically updating table based on the following query:

the
select customer_id, date_trunc('month', event_time),
sum(value) as value_sum,
count(value) as value_cnt,
max(value) as value_max
from test
group by customer_id, date_trunc('month', event_time)

To remove the snapshot, you can perform the following query:

the
select ps_del_snapshot('test_month')

This, today, everything. Scripts can be picked up at GitHub.
Article based on information from habrahabr.ru

Комментарии

Популярные сообщения из этого блога

The release of the new version of the module modLivestreet 0.3.0-rc

mSearch: search + filter for MODX Revolution

Emulator data from GNSS receiver NMEA