F.38. pgpro_scheduler

F.38.1.  pgpro_schedulerPostgres Pro Enterprise extension for job scheduling

pgpro_scheduler allows to schedule jobs execution and control their activity in Postgres Pro Enterprise database.

The job is a set of SQL commands. Schedule table could be described as a crontab-like string or as a JSON object. It's possible to use combination of both methods for scheduling settings.

Each job could calculate its next start time. The set of SQL commands could be executed in a single transaction or each command could be executed in individual one. It's possible to set an SQL statement to be executed on failure of main job transaction.

F.38.2. Installation

pgpro_scheduler is a Postgres Pro Enterprise extension and it has no special prerequisites.

To build extension from the source, make sure that the environment variable PATH includes path to pg_config utility. Also make sure that you have developer version of Postgres Pro Enterprise installed or Postgres Pro Enterprise was built from source code.

Install extension as follows:

$ cd pgpro_scheduler
$ make USE_PGXS=1
$ sudo make USE_PGXS=1 install
$ psql dbname -c "CREATE EXTENSION pgpro_scheduler"

F.38.3. Configuration

The extension defines a number of Postgres Pro Enterprise variables (GUC). These variables help to handle scheduler configuration.

schedule.enable (boolean)

This parameter specifies, whether the scheduler is enabled in this system. Default value: false.

schedule.database (text)

This parameter specifies list of database names on which the scheduler is enabled. Database names should be separated by commas. Default value: empty string.

schedule.scheme (text)

This parameter specifies the name of a scheme where the scheduler stores its tables and functions. Changing this parameter requires a server restart. Normally you should not change this parameter but it could be useful if you want to run scheduled jobs on a hot-standby database. So you can define foreign data wrapper on master system to wrap default scheduler schema to another one and use it on replica. Default value: schedule.

schedule.nodename (text)

This parameter specifies a node name of this instance. Default value is master. You should not change or use it if you run single server configuration. But it is necessary to change this name if you run scheduler on hot-standby database.

schedule.max_workers (integer)

This parameter specifies max number of simultaneously running jobs for one database. Default value: 2.

schedule.transaction_state (text)

This is internal parameter. It contains state of executed job. This parameter was designed to use with a next job start time calculation procedure. Possible values are:

  • success — transaction has finished successfully

  • failure — transaction has failed to finish

  • running — transaction is in progress

  • undefined — transaction has not started yet

The last two values normally should not appear inside the user procedure. If you got them probably it indicates an internal scheduler error.

F.38.4. Managing scheduler

You could control the scheduler's behavior by means of Postgres Pro Enterprise variables described above.

For example, suppose we have a fresh Postgres Pro Enterprise installation with the scheduler extension installed. We are going to use the scheduler with databases named database1 and database2. We want database1 to be capable to run 5 jobs in parallel and database2 — 3 jobs.

Put the following string in your postgresql.conf:

shared_preload_libraries = 'pgpro_scheduler'

Then start psql and execute the following commands:

ALTER SYSTEM SET schedule.enable = true;
ALTER SYSTEM SET schedule.database = 'database1,database2';
ALTER DATABASE database1 SET schedule.max_workers = 5;
ALTER DATABASE database2 SET schedule.max_workers = 3;
SELECT pg_reload_conf();

If you do not need the different values in max_workers you could store the same in configuration file. Then ask server to reload configuration. There is no need to restart.

Here is an example of postgresql.conf:

shared_preload_libraries = 'pgpro_scheduler'
schedule.enable = on
schedule.database = 'database1,database2'
schedule.max_workers = 5

The scheduler is implemented as background worker which dynamically starts another bgworkers. That's why you should care about proper value in the max_worker_processes variable. The minimal acceptable value could be calculated using the following formula:

min_processes = 1 + num_databases  + max_workers[1] + ... + max_workers[num_databases]

where:

  • min_processes — the minimal acceptable amount of bgworkers in the system. Note, that other subsystems (e.g. subqueries) need to start background workers too. So you need to adjust the value to take their needs into account.

  • num_databases — the number of databases the scheduler works with

  • max_workers — the value of schedule.max_workers variable in context of each database

F.38.5. SQL Scheme

The extension uses SQL scheme schedule to store its internal tables and functions. Direct access to tables is forbidden. All manipulations should be performed by means of functions defined by extension.

F.38.6. SQL Types

The scheduler defines two SQL types and uses them as types for return values for some of its functions.

F.38.6.1. cron_rec

This type contains information about the job to be scheduled.

CREATE TYPE schedule.cron_rec AS(
    id integer,             -- job id
    node text,              -- node name to be executed on
    name text,              -- job name
    comments text,          -- job's comments
    rule jsonb,             -- scheduling rules
    commands text[],        -- SQL commands to be executed
    run_as text,            -- name of executor user
    owner text,             -- name of owner user
    start_date timestamp,   -- lower bound of execution window
                            -- NULL if unbound
    end_date timestamp,     -- upper bound of execution window
                            -- NULL if unbound
    use_same_transaction boolean,   -- true if the set of SQL commands
                                    -- will be executed in a single transaction
    last_start_available interval,  -- max time till scheduled job
                                    -- can wait for execution if all allowed
                                    -- workers are busy
    max_instances int,      -- max number of simultaneous running instances
                            -- of this job
    max_run_time interval,  -- max execution time
    onrollback text,        -- SQL command to be performed on transaction
                            -- failure
    next_time_statement text,   -- SQL command to execute on main
                                -- transaction end to calculate next
                                -- start time
    active boolean,         -- true if job could be scheduled
    broken boolean          -- true if job has errors in configuration
                            -- that prevent it's further execution
);

F.38.6.2. cron_job

This type contains information about job scheduled execution.

CREATE TYPE schedule.cron_job AS(
    cron integer,           -- job id
    node text,              -- node name to be executed on
    scheduled_at timestamp, -- scheduled execution time
    name text,              -- job name
    comments text,          -- job's comments
    commands text[],        -- SQL commands to be executed
    run_as text,            -- name of executor user
    owner text,             -- name of owner user
    use_same_transaction boolean,   -- true if the set of sql commands
                                    -- will be executed in a single transaction
    started timestamp,      -- timestamp of this job execution started
    last_start_available timestamp, -- time until job must be started
    finished timestamp,     -- timestamp of this job execution finished
    max_run_time interval,  -- max execution time
    max_instances int,      -- the number of instances run at the same time
    onrollback text,        -- statement on ROLLBACK
    next_time_statement text,   -- statement to calculate next start time
    status text,            -- status of this task: working, done, error
    message text            -- error message
);

F.38.7. Functions

schedule.create_job(cron text, sql text, node text)

Creates job and sets it active.

Arguments:

  • croncrontab-like string to set schedule

  • sql — SQL statement to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(cron text, sqls text[], node text)

Creates job and sets it active.

Arguments:

  • croncrontab-like string to set schedule

  • sqls — set of SQL statements to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(date timestamp with time zone, sql text, node text)

Creates job and sets it active.

Arguments:

  • date — exact date of execution

  • sql — SQL statement to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(date timestamp with time zone, sqls text[], node text)

Creates job and sets it active.

Arguments:

  • date — exact date of execution

  • sqls — set of SQL statements to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(dates timestamp with time zone[], sql text, node text)

Creates job and sets it active.

Arguments:

  • dates — set of execution dates

  • sql — SQL statement to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(dates timestamp with time zone[], sqls text[], node text)

Creates job and sets it active.

Arguments:

  • dates — set of execution dates

  • sqls — set of SQL statements to execute

  • node — node name, optional

Returns id of the created job.

schedule.create_job(data jsonb)

Creates job and sets it active.

The only argument is a JSONB object with information about job.

The object can contain the following keys, some of them can be omitted:

  • name — job name;

  • node — node name;

  • comments — job's comments;

  • croncrontab-like string for scheduling settings;

  • rule — scheduling settings as JSONB object (see description below);

  • command — SQL statement to be executed;

  • commands — a set of SQL statements to be executed as an array;

  • run_as — user to execute command(s);

  • start_date — start of interval when scheduled command can be executed, could be NULL;

  • end_date — end of interval when scheduled command can be executed, could be NULL;

  • date — exact date when command will be executed;

  • dates — set of exact dates when command will be executed;

  • use_same_transactiontrue if set of commands will be executed within a single transaction. Default: false;

  • last_start_available — for how long command execution could be postponed if maximum number of allowed workers reached at the scheduled moment. Time set in format of interval. E.g. with '00:02:34' it is possible to wait for 2 minutes 34 seconds. If time is NULL or not set then wait forever. Default value is NULL;

  • max_run_time — how long scheduled job can be executed. Format: interval. If NULL or not set then there is no time limits. Default: NULL;

  • onrollback — SQL statement to be executed on ROLLBACK if main transaction fails. Default: not defined;

  • next_time_statement — SQL statement to calculate next start time.

The rules of scheduling could be set as crontab-like string (key cron) and also as JSONB object (key rule).

This object can contain the following keys:

  • minutes — minutes; array of integers in range 0 .. 59;

  • hours — hours; array of integers in range 0 .. 23;

  • days — days of month; array of integers in range 1 .. 31;

  • months — months; array of integers in range 1 .. 12;

  • wdays — days of week; array of integers in range 0 .. 6 (0 is Sunday);

  • onstart — integer value 0 or 1; if value equals to 1 then the job will be executed on scheduler start only once;

The job also could be scheduled on exact date or set of dates. Use date and dates keys accordingly.

All scheduling methods could be combined but the use of at least one of them is mandatory.

The next_time_statement field may contain SQL statement to be executed after the main transaction to calculate next start time. If the key is defined, the first start time will be calculated by methods described above but successive start times will be derived from this statement. The statement must return record with the first field containing value of type timestamp with time zone. If returning value be of the different type or statement execution produce an error the job will be marked as broken and further execution will be cancelled.

This statement will be executed in spite of main transaction execution state. It's possible to get state of main transaction from Postgres Pro Enterprise variable schedule.transaction_state.

The possible values are:

  • success — transaction is successful

  • failure — transaction is failed

  • running — transaction is in progress

  • undefined — transaction has not been started yet

The last two values should not appear within user procedure specified in the next_time_statement key.

SQL statement to be executed could be set in command or commands key. The first one specifies the single statement, the second — the set of statements. In fact the first key can contain the set of commands in one string separated by semicolons. In this case they all will be executed in a single transaction in spite of the value use_same_transaction. So for a set of the statements is better to use the commands key as you get more control on execution.

Returns id of the created job.

schedule.set_job_attributes(job_id integer, data jsonb)

Updates properties of the existing job.

Arguments:

  • job_id — identifier of the existing job;

  • data — JSONB object with properties to be edited. The keys and their structure shown in the schedule.create_job function description.

The function returns boolean value:

  • true — properties updated successfully;

  • false — properties not updated.

To update the properties of the job, the user must be superuser or the the owner of the job.

schedule.set_job_attribute(job_id integer, name text, value text || anyarray)

Updates a property of the existing job.

Arguments:

  • job_id — identifier of the existing job;

  • name — property name

  • value — property value

The full list of the properties could be found in schedule.create_job function description. Some values are of array types. They should be passed as an array, but if the value could not be an array, the exception will be raised.

This function returns boolean value — true on success and false on failure.

To update the properties of the job, the user must be superuser or the the owner of the job.

schedule.deactivate_job(job_id integer)

Deactivates job and suspends its further scheduling and execution.

Arguments:

  • job_id — identifier of the existing job;

Returns true on success, false on failure.

schedule.activate_job(job_id integer)

Activates job and starts its scheduling and execution.

Arguments:

  • job_id — identifier of the existing job;

Returns true on success, false on failure.

schedule.drop_job(job_id integer)

Deletes a job.

Arguments:

  • job_id — identifier of the existing job;

Returns true on success, false on failure.

schedule.get_job(job_id integer)

Deletes a job.

Arguments:

  • job_id — identifier of the existing job;

The return value is of type cron_rec. Description of the type can be found in Section F.38.6.

schedule.get_user_owned_cron(username text)

Retrieves list of jobs owned by the specified user.

Arguments:

  • username — user name, optional

Returns a set of records of type cron_rec. These records contain information about jobs owned by user. If user name is omitted the session user name is used. Only superuser can retrieve jobs owned by another user.

Description of the cron_rec type can be found in Section F.38.6.

schedule.get_user_cron(username text)

Retrieves list of jobs executed by the specified user.

Arguments:

  • username — user name, optional

Returns a set of records of type cron_rec. These records contain information about jobs executed by user. If user name is omitted the session user name is used. Only superuser can retrieve jobs executed by another user.

Description of the cron_rec type can be found in Section F.38.6.

schedule.get_user_active_jobs(username text)

Returns a list of jobs executed in this very moment by the specified user.

Arguments:

  • username — user name, optional

If user name is omitted then the session user name is used. Only superuser can retrieve jobs executed by another user.

The return value is a set of records of type cron_job. Description of the type can be found in Section F.38.6.

schedule.get_active_jobs()

Returns a list of jobs being executed at that very moment. Only superuser can call this function.

The return value is a set of records of type cron_job. Description of the type can be found in Section F.38.6.

schedule.get_log()

Returns a list of all the completed jobs. Only superuser can call this function.

The return value is a set of records of type cron_job. Description of the type can be found in Section F.38.6.

schedule.get_user_log()

Returns a list of the completed jobs executed by the specified user.

Arguments:

  • username — user name, optional

If user name is omitted then the session user name is used. Only superuser can retrieve list of jobs executed by another user.

The return value is a set of records of type cron_job. Description of the type can be found in Section F.38.6.

schedule.clean_log()

Deletes all records with information about completed jobs. Only superuser can call this function.

Returns the number of records deleted.