6.4. Load Data

6.4.1. Loading Data from a File
6.4.2. Loading data from a PostgreSQL Table, View or Function
6.4.3. Migrating a Database Schema

The data loading process is an important part of the Shardman migration process. You can use external utilities to load data, but we recommend our shardmanctl load command, which speeds up data loading by running in multiple threads being aware of where and how data is stored. With it, there are the following methods to load data:

Now let's look at these methods.

6.4.1. Loading Data from a File

To load data into a Shardman cluster, run the following command:

$ shardmanctl --store-endpoints http://etcdserver:2379 load --file=/var/load/data.tsv --table=mytable --source file --format text -j 8

In this example, data is loaded from the /var/load/data.tsv data file (tab-delimited) into the mytable table in eight parallel threads. You can use schema.table as the table name. The data can also be provided via stdin (using the --file=- option).

The shardmanctl load command supports text and csv data formats.

The system automatically detects gzip archives and unpacks them before loading data. (So you can use somefile.tsv.gz files as source data.)

To optimize the process of loading from a file/table, you should use the --distributed-keys command-line option, the value of which will be used to calculate the destination node. The option value is a comma-separated list of pairs. Each pair consists of a field number (starting with zero) and a type, separated by a colon. The following types are supported: bool, char, float4, float8, int2, int4, int8, name, text, varchar and uuid (for example: --distributed-keys=0:int8,1:varchar).

6.4.1.1. Input Data Example

Destination table:

CREATE TABLE users (
    id uuid NOT NULL PRIMARY KEY, 
    name VARCHAR(255)
) WITH(distributed_by='id',num_parts=16);
                    

Source file:

f7007387-4a83-416f-a07f-4564736c53e3	Mike
32bac248-3376-4011-8bd1-1528cd67b9c3	Alexander
28601c80-4a9a-496c-8f4e-cde107aad55b	Dmitry
928c4f9b-52d0-4bb4-bcc1-841ebcc4a59f	Nikita
a9a09b3c-27c2-4ce7-951b-eefb55ff3e2b	Pavel                    
                    

6.4.2. Loading data from a PostgreSQL Table, View or Function

To load data from table to table, source and destination tables must have the same number of columns, which have compatible types. By compatible attribute type we mean that strings generated by the type output function on the source are acceptable to the type input function on the destination.

To load data into a Shardman cluster from a PostgreSQL table, run the following command:

$ shardmanctl --store-endpoints http://etcdserver:2379 load -t desttable --source postgres --source-connstr "dbname=db host=srchost port=srcport user=login password=passwd" --source-table sourcetable -j 8

In this example, data is loaded from the table sourcetable (can be a view or a function call) to desttable table in eight parallel threads. You can use schema.table as table names.

6.4.3. Migrating a Database Schema

The loader can create the destination database schema and load the selected data. But in this case, you need to describe the structure of migrated tables in the specification file (load_schema.yaml). This file has the following format:

    version: "1.0"
    migrate:
      connstr: "dbname=workdb host=workhost port=workport user=workuser password=workpassword"
      jobs: 8
      batch: 1000
      options:
        - create_schema
        - create_table
        - create_index
        - create_sequence
        - create_foreign_key
        - create_role
        - copy_ownership
        - copy_grants
        - truncate_table
        - skip_no_pkey_tables
        - skip_create_index_error
        - skip_create_extension_error
        - skip_load_errors
        - skip_create_foreign_key_error
        - skip_create_role_error
        - skip_copy_grants_error
        - skip_copy_ownership_error
      schemas:
        - name: public
          all: false
          tables:
            - name: table1
              type: sharded
              partitions: 6
              distributedby: id
              priority: 3
            - name: table2
              type: global
            - name: table3
              type: sharded
              partitions: 6
              distributedby: field_id
              colocatewith: table1
            - name: table4
              type: global
              source: schema.view
              source_pk: field_id
            - name: table5
              type: global
              source: schema.func(arg)
              source_pk: field_id
        - name: schema2
          all: false
          default_type: sharded
          default_partitions: 6
          tables:
            - name: table1
              distributedby: field_id
              priority: 2
            - name: table2
              type: global
            - name: table3
              source: schema.view
              distributedby: field_id
              priority: 3
            - name: table4
              distributedby: field_id
              source: schema.func(arg)
            - name: table5
              source: schema."complex.""table.name"
              distributedby: field_id
        - name: schema3
          all: true
          skip_tables: [table1, table2, table3]
      roles:
        - name: test_user1
          password: test_password
        - name: test_user2  

The migrate.jobs parameter defines the number of parallel data loader processes.

The migrate.batch parameter is the number of rows in one batch (the recommended value is 1000).

The migrate.schemas section defines an array of source database schemas that we are working with. All other schemas will be skipped.

If the all parameter is set to true, then all tables from the current schema will be migrated (with global type by default). If a table is listed in the migrate.schemas.tables array, then the target table type must be explicitly specified for it. Two types of tables are currently supported: global and sharded. Global tables are loaded first, then sharded tables and at the end sharded tables with the colocatedwith parameter. The order of loading tables of the same type can be changed using priority option.

The migrate.schemas.skip_tables section defines an array of table names that will be skipped when the schema is loaded even if the all parameter is set to true.

For sharded tables, the following attributes must be set: distributedby (specifies the name of the column to use for the table partitioning) and partitions (number of partitions that will be created for this table). Optionally, for sharded tables, the colocatewith parameter can be set (name of the table to colocate with). Shardman will try to place partitions of the created table with the same partition key on the same nodes as partitions of the colocation table.

You can specify the table default_type option for a schema: global or sharded (default: global). For the sharded type you can also specify the default_partitions option (default: 20). If you set default_type to sharded, you need to specify the distributedby option for each table.

The source option for a table should include the schema and table source: schema.source. The source can be a table, view or function. For example: public.table, public.view, public.func(arg). If you set the source view or function for a global table, you should specify source_pk to set the primary key for this table. If source is not specified or contains the name of a table, you can also specify source_pk to create a primary key or override the existing one.

The priority option for table determines the order in which the tables of the same type are loaded. Tables with higher priority are loaded earlier. Default priority value is 0.

The migrate.roles section defines an array of role names and passwords that will be copied from the source database if create_role specified.

The schema supports the following options:

  • create_schema — create database schemas if they do not exist.

  • create_table — create tables if they do not exist.

  • create_index — create indexes after creating tables.

  • create_sequence — create sequences if they do not exist.

  • create_foreign_key — create foreign keys after creating tables.

  • truncate_table — truncate tables before data load.

  • create_role — create global roles defined in migrate.roles and copy role parameters from the source database.

  • copy_grants — copy access privileges from source database.

  • copy_ownership — change of table owners to the owner in the source database.

  • skip_no_pkey_tables — skip tables without primary keys.

  • skip_create_index_error — skip index creation errors.

  • skip_create_extension_error — skip extension creation errors.

  • skip_load_errors — continue loading if errors occur.

  • skip_create_foreign_key_error — skip foreign key creation errors.

  • skip_create_role_error — skip role creation errors.

  • skip_copy_ownership_error — skip table owner changing errors.

  • skip_copy_grants_error — skip grant errors.

Now, to load data with the schema into a Shardman cluster from PostgreSQL, run the following command:

$ shardmanctl --store-endpoints http://etcdserver:2379 load --schema load_schema.yaml

During loading, the nodes on which the data should be located are automatically determined and data is loaded directly to them.