pgpro_scheduler —
Postgres Pro Enterprise extension for job scheduling
pgpro_scheduler allows to schedule jobs execution and
control their activity in Postgres Pro Enterprise database.
The job is a set of SQL commands. Schedule table
could be described as a crontab-like string or
as a JSON object. It's possible to use combination of both methods
for scheduling settings.
Each job could calculate its next start time. The set of SQL commands could be executed in a single transaction or each command could be executed in individual one. It's possible to set an SQL statement to be executed on failure of main job transaction.
pgpro_scheduler is a Postgres Pro Enterprise
extension and it has no special prerequisites.
To build extension from the source, make sure that the environment variable
PATH includes path to pg_config utility.
Also make sure that you have developer version of
Postgres Pro Enterprise installed or
Postgres Pro Enterprise was built from source code.
Install extension as follows:
$ cd pgpro_scheduler
$ make USE_PGXS=1
$ sudo make USE_PGXS=1 install
$ psql dbname -c "CREATE EXTENSION pgpro_scheduler"
The extension defines a number of Postgres Pro Enterprise variables (GUC). These variables help to handle scheduler configuration.
schedule.enable (boolean)
This parameter specifies, whether the scheduler is enabled in this system.
Default value: false.
schedule.database (text)
This parameter specifies list of database names on which the scheduler is enabled. Database names should be separated by commas. Default value: empty string.
schedule.scheme (text)
This parameter specifies the name of a scheme where the scheduler stores
its tables and functions. Changing this parameter requires a server restart.
Normally you should not change this parameter but it could be useful
if you want to run scheduled jobs on a hot-standby database.
So you can define foreign data wrapper on master system
to wrap default scheduler schema to another one and use it on replica.
Default value: schedule.
schedule.nodename (text)
This parameter specifies a node name of this instance.
Default value is master. You should not change or use it
if you run single server configuration. But it is necessary to change this name
if you run scheduler on hot-standby database.
schedule.max_workers (integer)
This parameter specifies max number of simultaneously running jobs for one database.
Default value: 2.
schedule.transaction_state (text)
This is internal parameter. It contains state of executed job. This parameter was designed to use with a next job start time calculation procedure. Possible values are:
success — transaction has finished successfully
failure — transaction has failed to finish
running — transaction is in progress
undefined — transaction has not started yet
The last two values normally should not appear inside the user procedure. If you got them probably it indicates an internal scheduler error.
You could control the scheduler's behavior by means of Postgres Pro Enterprise variables described above.
For example, suppose we have a fresh Postgres Pro Enterprise
installation with the scheduler extension installed.
We are going to use the scheduler with databases named
database1 and database2.
We want database1 to be capable
to run 5 jobs in parallel and database2 — 3 jobs.
Put the following string in your postgresql.conf:
shared_preload_libraries = 'pgpro_scheduler'
Then start psql and execute the following commands:
ALTER SYSTEM SET schedule.enable = true; ALTER SYSTEM SET schedule.database = 'database1,database2'; ALTER DATABASE database1 SET schedule.max_workers = 5; ALTER DATABASE database2 SET schedule.max_workers = 3; SELECT pg_reload_conf();
If you do not need the different values in max_workers
you could store the same in configuration file.
Then ask server to reload configuration. There is no need to restart.
Here is an example of postgresql.conf:
shared_preload_libraries = 'pgpro_scheduler' schedule.enable = on schedule.database = 'database1,database2' schedule.max_workers = 5
The scheduler is implemented as background worker which dynamically starts
another bgworkers. That's why you should care about proper value in
the max_worker_processes variable.
The minimal acceptable value could be calculated using the following formula:
min_processes = 1 + num_databases + max_workers[1] + ... + max_workers[num_databases]
where:
min_processes — the minimal acceptable amount
of bgworkers in the system. Note, that other subsystems (e.g. subqueries) need
to start background workers too. So you need to adjust the value to
take their needs into account.
num_databases — the number of databases the scheduler
works with
max_workers — the value of schedule.max_workers
variable in context of each database
The extension uses SQL scheme schedule to store
its internal tables and functions. Direct access to tables is forbidden.
All manipulations should be performed by means of functions defined by extension.
The scheduler defines two SQL types and uses them as types for return values for some of its functions.
This type contains information about the job to be scheduled.
CREATE TYPE schedule.cron_rec AS(
id integer, -- job id
node text, -- node name to be executed on
name text, -- job name
comments text, -- job's comments
rule jsonb, -- scheduling rules
commands text[], -- SQL commands to be executed
run_as text, -- name of executor user
owner text, -- name of owner user
start_date timestamp, -- lower bound of execution window
-- NULL if unbound
end_date timestamp, -- upper bound of execution window
-- NULL if unbound
use_same_transaction boolean, -- true if the set of SQL commands
-- will be executed in a single transaction
last_start_available interval, -- max time till scheduled job
-- can wait for execution if all allowed
-- workers are busy
max_instances int, -- max number of simultaneous running instances
-- of this job
max_run_time interval, -- max execution time
onrollback text, -- SQL command to be performed on transaction
-- failure
next_time_statement text, -- SQL command to execute on main
-- transaction end to calculate next
-- start time
active boolean, -- true if job could be scheduled
broken boolean -- true if job has errors in configuration
-- that prevent it's further execution
);
This type contains information about job scheduled execution.
CREATE TYPE schedule.cron_job AS(
cron integer, -- job id
node text, -- node name to be executed on
scheduled_at timestamp, -- scheduled execution time
name text, -- job name
comments text, -- job's comments
commands text[], -- SQL commands to be executed
run_as text, -- name of executor user
owner text, -- name of owner user
use_same_transaction boolean, -- true if the set of sql commands
-- will be executed in a single transaction
started timestamp, -- timestamp of this job execution started
last_start_available timestamp, -- time until job must be started
finished timestamp, -- timestamp of this job execution finished
max_run_time interval, -- max execution time
max_instances int, -- the number of instances run at the same time
onrollback text, -- statement on ROLLBACK
next_time_statement text, -- statement to calculate next start time
status text, -- status of this task: working, done, error
message text -- error message
);
schedule.create_job(cron text,
sql text, node text)
Creates job and sets it active.
Arguments:
cron — crontab-like string to set schedule
sql — SQL statement to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(cron text,
sqls text[], node text)
Creates job and sets it active.
Arguments:
cron — crontab-like string to set schedule
sqls — set of SQL statements to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(date timestamp with time zone,
sql text,
node text)
Creates job and sets it active.
Arguments:
date — exact date of execution
sql — SQL statement to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(date timestamp with time zone,
sqls text[],
node text)
Creates job and sets it active.
Arguments:
date — exact date of execution
sqls — set of SQL statements to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(dates timestamp with time zone[],
sql text,
node text)
Creates job and sets it active.
Arguments:
dates — set of execution dates
sql — SQL statement to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(dates timestamp with time zone[],
sqls text[],
node text)
Creates job and sets it active.
Arguments:
dates — set of execution dates
sqls — set of SQL statements to execute
node — node name, optional
Returns id of the created job.
schedule.create_job(data jsonb)
Creates job and sets it active.
The only argument is a JSONB object with information about job.
The object can contain the following keys, some of them can be omitted:
name — job name;
node — node name;
comments — job's comments;
cron — crontab-like string for scheduling settings;
rule — scheduling settings as JSONB object (see description below);
command — SQL statement to be executed;
commands — a set of SQL statements to be executed as an array;
run_as — user to execute command(s);
start_date — start of interval when scheduled command can be executed, could be NULL;
end_date — end of interval when scheduled command can be executed, could be NULL;
date — exact date when command will be executed;
dates — set of exact dates when command will be executed;
use_same_transaction — true if set of commands will be executed within a single transaction.
Default: false;
last_start_available — for how long command execution could be postponed
if maximum number of allowed workers reached at the scheduled moment.
Time set in format of interval. E.g. with '00:02:34' it is possible to wait for
2 minutes 34 seconds. If time is NULL or not set then wait forever. Default value is NULL;
max_run_time — how long scheduled job can be executed. Format: interval.
If NULL or not set then there is no time limits. Default: NULL;
onrollback — SQL statement to be executed on ROLLBACK if main transaction fails.
Default: not defined;
next_time_statement — SQL statement to calculate next start time.
The rules of scheduling could be set as crontab-like string (key cron)
and also as JSONB object (key rule).
This object can contain the following keys:
minutes — minutes; array of integers in range 0 .. 59;
hours — hours; array of integers in range 0 .. 23;
days — days of month; array of integers in range 1 .. 31;
months — months; array of integers in range 1 .. 12;
wdays — days of week; array of integers in range 0 .. 6 (0 is Sunday);
onstart — integer value 0 or 1; if value equals to 1
then the job will be executed on scheduler start only once;
The job also could be scheduled on exact date or set of dates.
Use date and dates keys accordingly.
All scheduling methods could be combined but the use of at least one of them is mandatory.
The next_time_statement field may contain SQL statement to be executed
after the main transaction to calculate next start time. If the key is defined,
the first start time will be calculated by methods described above but
successive start times will be derived from this statement. The statement must
return record with the first field containing value of type timestamp with time zone.
If returning value be of the different type or statement execution produce an error
the job will be marked as broken and further execution will be cancelled.
This statement will be executed in spite of main transaction execution state.
It's possible to get state of main transaction
from Postgres Pro Enterprise variable
schedule.transaction_state.
The possible values are:
success — transaction is successful
failure — transaction is failed
running — transaction is in progress
undefined — transaction has not been started yet
The last two values should not appear within user procedure specified in the next_time_statement key.
SQL statement to be executed could be set in command or commands key.
The first one specifies the single statement, the second — the set of statements.
In fact the first key can contain the set of commands in one string separated by semicolons.
In this case they all will be executed in a single transaction in spite of the value use_same_transaction.
So for a set of the statements is better to use the commands key as you get more control on execution.
Returns id of the created job.
schedule.set_job_attributes(job_id integer,
data jsonb)
Updates properties of the existing job.
Arguments:
job_id — identifier of the existing job;
data — JSONB object with properties to be edited.
The keys and their structure shown in the schedule.create_job
function description.
The function returns boolean value:
true — properties updated successfully;
false — properties not updated.
To update the properties of the job, the user must be superuser or the the owner of the job.
schedule.set_job_attribute(job_id integer,
name text,
value text || anyarray)
Updates a property of the existing job.
Arguments:
job_id — identifier of the existing job;
name — property name
value — property value
The full list of the properties could be found in schedule.create_job function description. Some values are of array types. They should be passed as an array, but if the value could not be an array, the exception will be raised.
This function returns boolean value — true on success
and false on failure.
To update the properties of the job, the user must be superuser or the the owner of the job.
schedule.deactivate_job(job_id integer)
Deactivates job and suspends its further scheduling and execution.
Arguments:
job_id — identifier of the existing job;
Returns true on success, false on failure.
schedule.activate_job(job_id integer)
Activates job and starts its scheduling and execution.
Arguments:
job_id — identifier of the existing job;
Returns true on success, false on failure.
schedule.drop_job(job_id integer)
Deletes a job.
Arguments:
job_id — identifier of the existing job;
Returns true on success, false on failure.
schedule.get_job(job_id integer)
Deletes a job.
Arguments:
job_id — identifier of the existing job;
The return value is of type cron_rec.
Description of the type can be found in Section F.38.6.
schedule.get_user_owned_cron(username text)
Retrieves list of jobs owned by the specified user.
Arguments:
username — user name, optional
Returns a set of records of type cron_rec.
These records contain information about jobs owned by user.
If user name is omitted the session user name is used.
Only superuser can retrieve jobs owned by another user.
Description of the cron_rec type can be found in Section F.38.6.
schedule.get_user_cron(username text)
Retrieves list of jobs executed by the specified user.
Arguments:
username — user name, optional
Returns a set of records of type cron_rec.
These records contain information about jobs executed by user.
If user name is omitted the session user name is used.
Only superuser can retrieve jobs executed by another user.
Description of the cron_rec type can be found in Section F.38.6.
schedule.get_user_active_jobs(username text)
Returns a list of jobs executed in this very moment by the specified user.
Arguments:
username — user name, optional
If user name is omitted then the session user name is used. Only superuser can retrieve jobs executed by another user.
The return value is a set of records of type cron_job.
Description of the type can be found in Section F.38.6.
schedule.get_active_jobs()
Returns a list of jobs being executed at that very moment. Only superuser can call this function.
The return value is a set of records of type cron_job.
Description of the type can be found in Section F.38.6.
schedule.get_log()
Returns a list of all the completed jobs. Only superuser can call this function.
The return value is a set of records of type cron_job.
Description of the type can be found in Section F.38.6.
schedule.get_user_log()
Returns a list of the completed jobs executed by the specified user.
Arguments:
username — user name, optional
If user name is omitted then the session user name is used. Only superuser can retrieve list of jobs executed by another user.
The return value is a set of records of type cron_job.
Description of the type can be found in Section F.38.6.
schedule.clean_log()
Deletes all records with information about completed jobs. Only superuser can call this function.
Returns the number of records deleted.