The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

incline - a replicator for RDB shards

SYNOPSIS

incline [options] command

DESCRIPTION

Incline is a replicator for MySQL / PostgreSQL with following characteristics.

  • replicates information within a single database node or between database shards

  • replication rules defined in JSON files

  • synchronous replication within a single database through the use of automatically-generated triggers

  • asynchronous (eventually consistent) replication between database nodes using automatically-generated queue tables and fault-torelant forwarders

This manual consists of three parts, INSTALLATION, TUTORIAL, COMMAND REFENENCE, and FILE FORMATS. For design documentation and background knowledge, please refer to the URLs listed in the SEE ALSO section.

INSTALLATION

Incline uses autotools and automatically tries to detect the client libraries of MySQL and / or PostgreSQL, so a typical installation procedure will be as follows.

    % ./configure
    % make
    # make install

If configure fails to locate the client libraries, --with-mysql and --with-pgsql options can be used.

    % ./configure --with-mysql=my_mysql_installation_dir

Also, if you have perl and its DBI drivers installed, it is possible to run the embedded tests using make.

    % make test

TUTORIAL

The tutorial explains how to create a microblog service (like twitter) running on four database shards. Incline (by itself) does not support adding database nodes without stopping the service. If you are interested in such feature, please refer to the documentation of Pacific after reading this tutorial.

CREATING TABLES

At least four tables are needed to create a microblog service on database shards. Instead of a single table representing follower <=> followee relationship, each user needs to have a list of followers (or list of following users) to him / her on his / her database shard. Also, each user need to have his / her `timeline' table on his / her shard (or else the service would not scale out). The example below is a minimal schema on MySQL. All shards should have the same schema applied. Two tables, `following' and `tweet' will be modified by the application. `Follower' and `timeline' tables will be automatically kept (eventually) in sync by incline with the former two tables.

    CREATE TABLE following (
      userer_id INT UNSIGNED NOT NULL,
      following_id INT UNSIGNED NOT NULL,
      PRIMARY KEY (user_id,following_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    CREATE TABLE follower (
      user_id INT UNSIGNED NOT NULL,
      follower_id INT UNSIGNED NOT NULL,
      PRIMARY KEY (user,following_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    CREATE TABLE tweet (
      tweet_id INT UNSIGNED NOT NULL AUTO_INCREMENT,
      user_id INT UNSIGNED NOT NULL,
      creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
      body VARCHAR(255) NOT NULL,
      PRIMARY KEY (tweet_id),
      KEY (user_id,tweet_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    CREATE TABLE timeline (
      user_id INT UNSIGNED NOT NULL,
      tweet_user_id INT UNSIGNED NOT NULL,
      tweet_id INT UNSIGEND NOT NULL,
      creation_time TIMESTAMP NOT NULL,
      PRIMARY KEY (user_id,creation_time,tweet_user_id,tweet_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

WRITING THE REPLICATION DEFINITION FILE

To keep `follower' and `timeline' tables in sync with the other two, replication rules should be defined. The example below show the definition corresponding to the table schema above.

The first hash defines how the `follower' tables should be kept synchronized to the `following' tables. `Following_id' and `user_id' columns of `following' tables are mapped to `user_id' and `follower_id' columns of `follower' tables, and `follower' tables are sharded using the `user_id' column.

The second hash defines how the `timeline' tables should be constructed from the `follower' tables and `tweet' tables. In addition to the definitions of `pk_columns' and `shard-key', `merge' property of the hash defines how the two source tables should be merged (using INNER JOIN).

    [
      {
        "destination" : "follower",
        "source"      : "following",
        "pk_columns"  : {
          "following.following_id" : "user_id",
          "following.user_id"      : "follower_id"
        },
        "shard-key"   : "user_id"
      },
      {
        "destination" : "timeline",
        "source"      : [ "follower", "tweet" ],
        "pk_columns"  : {
          "follower.follower_id" : "user_id",
          "tweet.user_id"        : "tweet_user_id",
          "tweet.tweet_id"       : "tweet_id",
          "tweet.creation_time"  : "creation_time"
        },
        "merge"       : {
          "follower.user_id" : "tweet.user_id"
        },
        "shard-key"   : "user_id"
      }
    ]

WRITING THE SHARD DEFINITION FILE

Another definition file is required when using incline for synchronizing database shards. The following example represents a distributed database with four shards using range partitioning. First node with the IP address 10.1.1.1 handles ids from 0 to 9999, second node (10.1.1.2) handles 10000 to 19999, third (10.1.1.1.3) handles 20000 to 29999, fourth (10.1.1.4) handles ids equal to or greater than 3000.

    {
      "algorithm" : "range-int",
      "map"       : {
        "0"     : [ {
          "host" : "10.1.1.1"
        } ],
        "10000" : [ {
          "host" : "10.1.1.2"
        } ],
        "20000" : [ {
          "host" : "10.1.1.3"
        } ],
        "30000" : [ {
          "host" : "10.1.1.4"
        } ]
      }
    }

In addition to `range-int', `hash-int' algorithm is also supported. A hash-based shard definition will look like below, you may use either one to run the microblog service described in this tutorial.

    {
      "algorithm" : "hash-int",
      "num"       : 4,
      "nodes"     : [
        [ {
          "host" : "10.1.1.1"
        } ],
        [ {
          "host" : "10.1.1.2"
        } ],
        [ {
          "host" : "10.1.1.3"
        } ],
        [ {
          "host" : "10.1.1.4"
        } ]
      ]
    }

INSTALLING QUEUE TABLES AND TRIGGERS

The next step is to install triggers and to create queue tables using the definitions files. The following commands create queue tables and installs triggers on the database running on 10.1.1.1. The commands should be applied to all of the database shards.

    % incline --rdbms=mysql --database=microblog --host=10.1.1.1 \
     --user=root --password=XXXXXXXX --mode=shard \
     --source=replication.json --shard-source=shard.json create-queue
    % incline --rdbms=mysql --database=microblog --host=10.1.1.1 \
     --user=root --password=XXXXXXXX --mode=shard \
     --source=replication.json --shard-source=shard.json create-trigger

The files, `replication.json' and `shard.json' should contain the definitions shown in the sections above.

RUNNING THE FORWARDER

To transfer modifications between database shards, forwarders should be run attached to each shard. The example below starts a forwarder process attached to 10.1.1.1.

    % incline --rdbms=mysql --database=microblog --host=10.1.1.1 \
     --user=root --password=XXXXXXXX --mode=shard \
     --source=replication.json --shard-source=shard.json forward

You should automatically restart the forwarder when it exits (it exits under certain conditions, for example, when it loses connection to the attached shard, or when the shard definition is being updated).

SETUP COMPLETE

Now the whole system is up and running. You can try insert / update / delete the rows in `following' or `tweet' table and see the other tables updated by incline.

    # User:100 starts following user:10100.  `Follower' table on 10.1.1.2
    # (the shard for user:10100) will be updated
    10.1.1.1> INSERT INTO following (user_id,following_id) VALUES \
              (100,10100);
    10.1.1.2> SELECT * FROM follower WHEER user_id=10100;
    +---------+-------------+
    | user_id | follower_id |
    +---------+-------------+
    |   10100 |         100 |
    +---------+-------------+
    1 row in set (0.00 sec)

    # User:10100 tweets.  `Timeline' table on 10.1.1.1 will be updated.
    10.1.1.2> INSERT INTO tweet (user_id,body) VALUES (10100,'hello');
    10.1.1.1> SELECET * FROM timeline WHERE user_id=100;
    +---------+---------------+----------+---------------------+
    | user_id | tweet_user_id | tweet_id |    creation_time    |
    +---------+---------------+----------+---------------------+
    |     100 |         10100 |        1 | 2009-10-05 20:32:07 |
    +---------+---------------+----------+---------------------+
    1 row in set (0.00 sec)

COMMAND REFERENCE

COMMANDS

create-trigger

Reads the definition files and installs triggers generated onto the specified database node.

drop-trigger

Reads the definition files and drops the triggers installed from the specified database node.

Reads the definition files and prints the triggers generated in JSON format.

create-queue

Reads the definition files and creates queue tables on the specified database node (only works if --mode is set to either `queue-table' or `shard).

drop-queue

Reads the definition files and drops the queue tables installed from the specified database node (only works if --mode is set to either `queue-table' or `shard').

forward

Reads the definition files and forwards the data from the specified database node to other nodes (only works if --mode is set to either `queue-table' or `shard'). The process will stop when connection to the specified database closes or when the shard definition file is being updated.

COMMAND OPTIONS

--rdbms=mysql|pgsql

RDBMS being used. Currently supports MySQL (5.0 or above) and PostgreSQL (8.x?).

--database=db_name

database (schema) name on the database

--host=db_host

hostname of the database. Should be either a hostname or an IP address (default: 127.0.0.1).

--port=db_port

port number of the database. If ommited, uses the default port number of the RDBMS.

--user=db_user

username of the database (default: root)

--password=db_passord

password of the databsae (default: none)

--mode=standalone|queue-table|shard
standalone

The mode is for running incline on a single database node. All updates are reflected synchronously.

queue-table

The mode is for running incline on a single database node. All updates are queued into the queue tables generated by the `create-queue' command. The queued updates are applied by the `forward' command.

shard

The mode is for running incline on multiple database nodes (shards). Updates that should be applied to the same shard are applied synchronously. Other updates are pushed into the queue tables generated by the `create-queue' command. The queued updates are applied to other nodes by the `forward' command.

--source=replication_def.json

replication definition to be used

--shard-source=shard_def.json

shard definition to be used. Mandatory if --mode is set to `shard'.

--forwarder-log-file=logfile

when set, the `forward' command logs the transactions into the log file

--version

prints version

--help

prints help

FILE FORMATS

Incline uses two files, replication definition file and shard definition file. Both of the files use JSON (see RFC 4627 for details) to represent the structures.

REPLICATION DEFINITION FILE FORMAT

The definition consists of an array. Each element represents a single replication definition as a hash: between one `destination' table and more than one `source' tables. The hash may contain following keys.

"destination" : dest_table

name of the destination table

"source" : src_table
"source" : [ src_table_a, src_table_b, ... ]

name of the source table(s)

"pk_columns" : { src_column_a : dest_column_a, ... }

maps columns of source tables(s) to the columns of the destination table consisting the primary key. Source column should include the name of the table when using multiple source tables (like: "src_table_a.column").

"npk_columns" : { src_column_a : dest_column_a, ... }

same as pk_columns, however defines relations to the non-primary-key columns of the destination table

"merge" : { src_table_column_a : src_table_column_b, ... }

defines INNER JOIN conditions when using multiple source tables.

"shard-key" : dest_column

when using --mode=shard, defines the column name of the destination table used as sharding key

SHARD DEFINITION FILE FORMAT

The shard definition file is required only if --mode is set to `shard'. The file consists of a single JSON hash. Incline recognizes following keys in the hash.

"algorithm" : "hash-int" | "range-int" (required)

defines the shard algorithm. Incline supports hash-based partitioning and range-based partitioning of integer columns of 64-bits or smaller.

"num" : number_of_nodes (hash-int only)

defines number of database shards

"nodes" : [ node_def, ... ] (hash-int only)

list of database shards (the number of elements should match the value of the num property)

"map" : { lower-bound : node_def, ... } (range-int only)

list of database shards (keys specify the lower bounds for each node)

The node definitions in nodes or map should be a hash or a array of hashes with following key-value pairs. When using arrays of hashes, incline will only use the first element of the array. Other elements in the array may be used by other middlewares such as Pacific, for example for defining slave database nodes.

"host" : host

hostname or IP address of the database node (required)

"port" : port

port number of the database node (default: uses the default port number of the RDBMS used)

"username" : db_user

username of the database node (default: "root")

"password" : db_password

password of the database node (default: empty password)

SEE ALSO

Incline & Pacific (in Japanese) http://www.slideshare.net/kazuho/incline-pacific

A Clever Way to Scale-out a Web Application http://www.slideshare.net/kazuho/a-clever-way-to-scaleout-a-web-application

Kazuho@Cybozu Labs: Intruducing Incline - a synchronization tool for RDB shards (outdated) http://developer.cybozu.co.jp/kazuho/2009/07/intruducing-inc.html

AUTHOR

Kazuho Oku <kazuhooku@gmail.com>

LICENSE

The software is licensed under the new BSD license. See COPYING.