Build your own aggregate

Build your own aggregate

Before the release of pg_chameleon 2.0 I had to write an upgrade procedure to allow a smooth migration from the version 1.8.

An interesting challenge I faced was to determine the maximum and minimum position for the MySQL’s binary logs belonging to the same server but stored with different values in multiple sources.

The user defined aggregates, a feature pretty unique to PostgreSQL, allowed me to solve the problem in a robust and simple way.

MySQL’s binlog coordinates max and min

When configured for the replica, MySQL stores the data changes into the local binary log files which name is composed by a prefix specified in the variable log-bin (e.g log-bin = mysql-bin) and a numerical counter separated by a dot (e.g. mysql-bin.001915).

When a new event (query, row image, etc.) is generated MySQL stores the information in the binlog file. The offset in bytes within the binlog plus the binlog name are the event’s coordinates.

Meanwhile new events come in binlog is filled until it reaches the log’s end. Then a new log is created. The binlog size set by the parameter max_binlog_size. Because the transactions are written in single chunks which do not split between different logs it’s possible to have logs larger than max_binlog_size.

Because of pg_chameleon version 1 allowed in some way the replica from multiple schemas having multiple sources configured. This implementation apart from the inefficiency, resulted in schemas from the same MySQL server having different log coordinates stored in the table sch_chameleon.t_replica_batch.

pg_chameleon 2 maps multiple schemas within the same MySQL database with one set of coordinates per MySQL server.

The migration procedure from the version 1 to the version 2 required grouping the different schemas and to determine the minimum and maximum binlog location stored for those schemas.

Without digging into the replica tool’s schema lets create a table populated with dummy MySQL log coordinates and schemas.

CREATE TABLE test_binlog
(
    id_source bigint,
    schema_name text,
    binlog_name text,
    binlog_position bigint
);

ALTER TABLE test_binlog ADD CONSTRAINT pk_test_binlog PRIMARY KEY (id_source);

db_test=# INSERT INTO test_binlog
        (
                id_source,
                schema_name,
                binlog_name,
                binlog_position
        )
VALUES
        (1,'sakila1','mysql-bin.001915 ','194'),
        (2,'sakila2','mysql-bin.001915 ','1241344'),
        (3,'sakila3','mysql-bin.001917 ','4494'),
        (4,'sakila4','mysql-bin.001916 ','3194'),
        (5,'sakila5','mysql-bin.001920 ','4');


db_test=# SELECT id_source,schema_name,binlog_name,binlog_position FROM test_binlog;
 id_source | schema_name |    binlog_name    | binlog_position
-----------+-------------+-------------------+-----------------
         1 | sakila1     | mysql-bin.001915  |             194
         2 | sakila2     | mysql-bin.001915  |         1241344
         3 | sakila3     | mysql-bin.001917  |            4494
         4 | sakila4     | mysql-bin.001916  |            3194
         5 | sakila5     | mysql-bin.001920  |               4
(5 rows)

In our example we’ll build the additional aggregates required to get and array of schemas with the max and min coordinates for those schemas.

Writing the binlog_min aggregate

For our binlog_min aggregate we need to write two functions.

A state function, where we evaluate and save the state of our aggregation for each row in the data set. An exit function which finalise the aggregation and returns the final result.

We also need to decide the startup value for our aggregation which is used when the first row is evaluated.

As we are manipulating a text array we set the startup value to ARRAY['0','0'].

Our state function will have two parameters. The first parameter is the internal state. The second parameter is the next data value. We also need to specify the state’s data type when creating the aggregate.

In our specific case we want an aggregate which accepts a text array composed by the binlog name and the coordinate within that log and returns a text array with the minimum of the dataset’s values.

So, what’s the minimum of a binlog coordinate then?

We need to find the minimum value of the binlog’s progression number and, within this value the minimum value of the binlog location. Despite the apparent complexity we can use a simple SQL function using the CASE construct to achieve this task.

CREATE OR REPLACE FUNCTION  fn_binlog_min(text[],text[])
RETURNS text[] AS
$BODY$
 SELECT
  CASE
   WHEN $1=array[0,0]::TEXT[]
    THEN $2
   WHEN (string_to_array($1[1],'.'))[2]::integer > (string_to_array($2[1],'.'))[2]::integer --$1[1]>$2[1]
    THEN $2
   WHEN $1[1]=$2[1] and $1[2]::integer>=$2[2]::integer
    THEN $2
   ELSE $1
  END
	;
$BODY$
LANGUAGE SQL;

Let’s analyse the CASE construct in the details.

 SELECT
  CASE
   WHEN $1=array[0,0]::TEXT[]
    THEN $2
   WHEN (string_to_array($1[1],'.'))[2]::integer > (string_to_array($2[1],'.'))[2]::integer --$1[1]>$2[1]
    THEN $2
   WHEN $1[1]=$2[1] and $1[2]::integer>=$2[2]::integer
    THEN $2
   ELSE $1
  END
	;

The first condition manages the special case of the startup value. The function saves the first evaluated value in the state variable.

 SELECT
  CASE
   WHEN $1=array[0,0]::TEXT[]
    THEN $2
   WHEN (string_to_array($1[1],'.'))[2]::integer > (string_to_array($2[1],'.'))[2]::integer --$1[1]>$2[1]
    THEN $2
   WHEN $1[1]=$2[1] and $1[2]::integer>=$2[2]::integer
    THEN $2
   ELSE $1
  END
	;

The second condition is some sort of Swiss army knife SQL.

In order to strip the prefix from the binlog name we are using the string_to_array function which transforms a string into an array using a separator for splitting the string.

The call (string_to_array($1[1],'.'))[2]::integer splits the binlog name stored in the accumulated value (e.g mysql-bin.00192) into an array which the first element is the prefix and the second is the progression number (e.g {mysql-bin,00192}).The second element of the array is finally cast to an integer. The same operation happens for the evaluated parameter $2. Then bot integer values are compared using the operator >.

If the binlog counter stored in the state variable is bigger than the value of the row we are evaluating, then we return the second parameter because it’s lesser than the accumulated value.

 SELECT
  CASE
   WHEN $1=array[0,0]::TEXT[]
    THEN $2
   WHEN (string_to_array($1[1],'.'))[2]::integer > (string_to_array($2[1],'.'))[2]::integer --$1[1]>$2[1]
    THEN $2
   WHEN $1[1]=$2[1] and $1[2]::integer>=$2[2]::integer
    THEN $2
   ELSE $1
  END
	;

The third condition is used to evaluate the binlog position within the same binlog file. If the state value is bigger than the next value, then we return the next value.

For any other condition we return the state value.

The final function is very simple and it returns the last value accumulated by the state function except when the state is equal to ARRAY[‘0’,‘0’]. In that case the final function returns NULL.

CREATE OR REPLACE FUNCTION fn_binlog_final(text[])
RETURNS text[] as
$BODY$
	SELECT
		CASE
			WHEN $1=array['0','0']
			THEN NULL
		ELSE
			$1
		END;
$BODY$
LANGUAGE sql;

With the two functions created we can create the custom aggregate binlog_min.

CREATE AGGREGATE binlog_min(text[])
(
    SFUNC = fn_binlog_min,
    STYPE = text[],
    FINALFUNC = fn_binlog_final,
    INITCOND = '{0,0}'
);

Writing the binlog_max aggregate

The state function for the binlog_max is similar to the binlog_min with its logic reversed.

We need to find the maximum value of the binlog’s progression number and, within this value the maximum value of the binlog location.

CREATE OR REPLACE FUNCTION  fn_binlog_max(text[],text[])
RETURNS text[] AS
$BODY$
	SELECT
		CASE
			WHEN $1=array[0,0]::TEXT[]
			THEN $2
			WHEN (string_to_array($2[1],'.'))[2]::integer>(string_to_array($1[1],'.'))[2]::integer
			THEN $2
			WHEN (string_to_array($2[1],'.'))[2]::integer<(string_to_array($1[1] ,'.'))[2]::integer
			THEN $1
			WHEN (string_to_array($2[1],'.'))[2]::integer=(string_to_array($1[1],'.'))[2]::integer AND $2[2]::integer>=$1[2]::integer
			THEN $2
			ELSE $1
		END
	;
$BODY$
LANGUAGE SQL;

The startup value is evaluated the same way as in fn_binlog_min.

The rest of the logic is the following.

  • When the evaluated binlog number is greater than the state’s we save the evaluated value $2
  • When the evaluated binlog number is lesser than the state’s we return the state value $1
  • When the evaluated binlog number is the same of the state’s and the evaluated binlog position is larger than the state’s, we save the evaluated value $2
  • Otherwise we return the state value

The binlog_max aggregate is created in a similar way of the binog_min. The final function is shared between the two aggregates as the exit logic is the same.

CREATE AGGREGATE binlog_max(text[])
(
    SFUNC = fn_binlog_max,
    STYPE = text[],
    FINALFUNC = fn_binlog_final,
    INITCOND = '{0,0}'
);

Testing the aggregates

After the aggregates are in place we can find the binlog max and min using a simple SQL statement.

db_test=> \x
Expanded display is on.

db_replica=>
 SELECT
  array_agg(schema_name) as schemas,
	binlog_max(
		ARRAY[
			binlog_name,
			binlog_position::text
		]
	) as binlog_max,
	binlog_min(
		ARRAY[
			binlog_name,
			binlog_position::text
		]
	) as binlog_min
FROM
  test_binlog;

-[ RECORD 1 ]-----------------------------------------
schemas    | {sakila1,sakila2,sakila3,sakila4,sakila5}
binlog_max | {"mysql-bin.001920 ",4}
binlog_min | {"mysql-bin.001915 ",194}

Wrap up

Features like user defined aggregates are little gems that make PostgreSQL an amazing RDBMS.

I reckon I could have done the same aggregation using Python.

However having the database logic managed by the database itself, is more appropriate and it is hundreds of times more robust than any crafted method on the app side.

For more infos about the custom aggregates I warmly recommend this Josh Berkus’s blog post. It is the post that let me understand this powerful and quite unique PostgreSQL’s feature.

There is also Alexey Bashtanov’s talk presented at the Brighton PostgreSQL Meetup. The video covers the grouping and aggregation infrastructure giving the best practices for working effectively with grouping and aggregations.

Because aggregations are in general CPU intensive operations I warmly recommend to watch the video.

Thanks for reading.

Cubes, copyright Federico Campoli