Интеграция с внешними источниками данных Elasticsearch используется для чтения журналов, записываемых туда утилитой pgpro-otel-collector.
Интеграция включает в себя компоненты, перечисленные ниже.
Агент мониторинга, предоставляющий следующую функциональность:
собирает журналы активности с экземпляров СУБД Postgres Pro
передаёт журналы активности в Elastic APM для Elasticsearch
Система мониторинга производительности приложений на основе Elastic Stack, предоставляющая следующую функциональность:
принимает данные от агента мониторинга и преобразует их в формат документа ES
отправляет преобразованные данные в Elasticsearch
Система хранения журналов активности, предоставляющая следующую функциональность:
принимает журналы активности от системы мониторинга производительности приложений
хранит журналы активности согласно внутренним параметрам хранения
предоставляет интерфейс для получения журналов активности
Система Postgres Pro Enterprise Manager, предоставляющая следующую функциональность:
обращается к системе Elasticsearch для получения журналов активности экземпляров СУБД
предоставляет пользователю интерфейс мониторинга в виде текстовых данных на основе журналов активности
Процесс интеграции состоит из следующих этапов:
Дополнительная настройка агента не требуется.
Установите сервер Elastic APM по стандартной документации.
Интегрируйте сервер Elastic APM с Elastiсsearch по стандартной документации.
Настройте ingest-конвейер (pipeline) pgpro-otel-collector.
Это необходимо для совместимости полей документов (журналов) со схемой именования полей Elastiсsearch Common Schema (ECS).
Пример настройки конвейера (оба запроса следует последовательно выполнить в Kibana Developer Tools):
PUT _ingest/pipeline/postgrespro-otelcol-enrich-logs
{
"description": "Enrich PostgresPro Otel collector logs",
"processors": [
{
"rename": {
"if": "ctx?.labels?.message != null",
"field": "labels.message",
"target_field": "message",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.pid != null",
"field": "labels.pid",
"target_field": "process.pid",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.error_severity != null",
"field": "labels.error_severity",
"target_field": "log.level",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.user != null",
"field": "labels.user",
"target_field": "user.name",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.session_start != null",
"field": "labels.session_start",
"target_field": "session.start_time",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.session_id != null",
"field": "labels.session_id",
"target_field": "session.id",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.numeric_labels?.tx_id != null",
"field": "numeric_labels.tx_id",
"target_field": "transaction.id",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.log_file_name != null",
"field": "labels.log_file_name",
"target_field": "log.file.path",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"rename": {
"if": "ctx?.labels?.dbname != null",
"field": "labels.dbname",
"target_field": "db.name",
"ignore_failure": true,
"ignore_missing": false,
"override": true
}
},
{
"gsub": {
"if": "ctx?.service?.node?.name != null",
"field": "service.node.name",
"target_field": "host.name",
"pattern": ":.+$",
"replacement": "",
"ignore_failure": true,
"ignore_missing": false
}
},
{
"remove": {
"field": [
"observer.version",
"observer.hostname",
"service.language.name"
],
"ignore_failure": true
}
},
{
"remove": {
"field": "agent.version",
"if": "ctx?.agent?.version == \"unknown\"",
"ignore_failure": true
}
}
]
}PUT _ingest/pipeline/logs-apm.app@custom
{
"processors": [
{
"pipeline": {
"name": "postgrespro-otelcol-enrich-logs"
}
}
]
}Включите и настройте ресивер filelog.
Пример настройки ресивера для сценария, когда журналы PostgreSQL генерируются в формате JSON:
receivers:
filelog:
include:
- /var/log/postgresql/*.json
operators:
- parse_ints: true
timestamp:
layout: '%Y-%m-%d %H:%M:%S.%L %Z'
layout_type: strptime
parse_from: attributes.timestamp
type: json_parser
- field: attributes.timestamp
type: remove
retry_on_failure:
enabled: true
initial_interval: 1s
max_elapsed_time: 5m
max_interval: 30s
start_at: endНастройте процессоры:
processors:
attributes/convert:
actions:
- action: convert
converted_type: string
key: query_id
- action: convert
converted_type: string
key: pid
resource:
attributes:
- action: upsert
key: service.name
value: postgresql
- action: upsert
key: service.instance.id
value: postgresql-01.example.org:5432Где:
service.name — ключ для именования потока данных (data stream) и, как следствие, индексов.
service.instance.id — ключ для идентификации экземпляра.
Для журналов в формате JSON обязательно конвертировать поле query_id в строку, так как числовое значение некорректно отображается в ES.
Для хранения данных используются так называемые потоки данных (data streams). Целевой поток выбирается автоматически и имеет форматlogs-apm.app.service.name-namespace.
Значение service.name указывается при настройке коллектора в списке processors.resource.attributes элементом key: service.name.
Значение namespace определяется элементом с ключом service.environment. В приведённой настройке он не передаётся, поэтому подставляется значение по умолчанию default. Таким образом, при использовании приведённой настройки журналы активности будут храниться в потоке с именем logs-apm.app.postgresql-default.
Настройте отправку журналов через otlphttpexporter и настройте конвейер:
exporters:
otlphttp/elastic_logs:
compression: gzip
endpoint: https://elasticsearch-apm.example.org
tls:
insecure_skip_verify: false
service:
extensions: []
pipelines:
logs:
receivers:
- filelog
processors:
- resource
- attributes/convert
exporters:
- otlphttp/elastic_logsЗапустите коллектор и проверьте публикацию метрик на стороне коллектора:
# systemctl start pgpro-otel-collector
# systemctl status pgpro-otel-collector
● pgpro-otel-collector.service - PostgresPro OpenTelemetry Collector
Loaded: loaded (/lib/systemd/system/pgpro-otel-collector.service; enabled; preset: enabled)
Active: active (running) since Thu 2025-03-20 01:18:08 MSK; 4h 13min ago
Main PID: 6991 (pgpro-otel-coll)
Tasks: 8 (limit: 3512)
Memory: 119.3M
CPU: 2min 49.311s
CGroup: /system.slice/pgpro-otel-collector.service
└─6991 /usr/bin/pgpro-otel-collector --config /etc/pgpro-otel-collector/basic.yml
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.366656,"msg":"Setting up own telemetry..."}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.367178,"msg":"Skipped telemetry setup."}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.3679142,"msg":"Development component. May change in the future.","kind":"receiver","name":"postgrespro","data_type":"metrics"}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.3494158,"caller":"envprovider@v1.16.0/provider.go:59","msg":"Configuration references unset environment variable","name":"POSTGRESQL_P>
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481084,"msg":"Starting pgpro-otel-collector...","Version":"v0.5.0","NumCPU":1}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481149,"msg":"Starting extensions..."}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.4483361,"msg":"Using the 0.0.0.0 address exposes this server to every network interface, which may facilitate Denial of Service attack>
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4515307,"msg":"Starting stanza receiver","kind":"receiver","name":"filelog","data_type":"logs"}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.451749,"msg":"Everything is ready. Begin running and processing data."}
Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.6523068,"msg":"Started watching file","kind":"receiver","name":"filelog","data_type":"logs","component":"fileconsumer","path":"/var/log/postgresql/postgresql-2025-03-20.json"}После настройки отправки журналов из pgpro-otel-collector убедитесь, что система Elasticsearch получает метрики.
Для проверки можно выполнить запрос к хранилищу с помощью утилиты curl.
Пример запроса:
curl -s -XGET "https://elasticsearch.example.org:9200/logs-apm.app.postgresql-default/_search?size=10" -H 'Content-Type: application/json' -d'
{
"_source": ["message","service.node.name","@timestamp"],
"sort": [
{ "@timestamp": "desc" }
],
"query": {
"bool": {
"filter": [
{ "term":{"service.node.name":"postgresql-01.example.org:5432" }}]
}
}
}'Где:
https://elasticsearch.example.org:9200 — URL системы хранения журналов.
logs-apm.app.postgresql-default — имя потока данных (data stream) для поиска.
size=10 ограничивает количество возвращаемых журналов.
"_source": ["message","service.node.name","@timestamp"] — запрашиваемые поля.
Пример ответа:
{
"took": 18,
"timed_out": false,
"_shards": {
"total": 11,
"successful": 11,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": null,
"hits": [
{
"_index": ".ds-logs-apm.app.postgresql-default-2025.03.19-000379",
"_id": "qmuArJUB2PKtie47RffA",
"_score": null,
"_source": {
"message": "checkpoint complete: wrote 2038 buffers (16.6%); 0 WAL file(s) added, 0 removed, 10 recycled; write=269.563 s, sync=1.192 s, total=270.962 s; sync files=246, longest=0.677 s, average=0.005 s; distance=162419 kB, estimate=174180 kB; lsn=6/62000850, redo lsn=6/583C4DD8",
"@timestamp": "2025-03-19T03:44:01.336Z",
"service": {
"node": {
"name": "postgresql-01.example.org:5432"
}
}
},
"sort": [
1742355841336
]
}
]
}
}В навигационной панели перейдите в Инфраструктура → Источники данных → Хранилища сообщений.
В правом верхнем углу страницы нажмите Создать хранилище.
Укажите параметры хранилища журналов (помеченные звёздочкой параметры являются обязательными):
Система хранения сообщений: тип системы хранения журналов.
Выберите Elasticsearch.
Название: уникальное имя хранилища журналов. Например, Elasticsearch.
URL: сетевой адрес для подключения к хранилищу журналов. Например, https://elasticsearch.example.org.
Elasticsearch index: Имя индекса (потока) для поисковых запросов.
Укажите logs-apm.app.postgresql-default.
Пользователь: уникальное имя пользователя, если используется аутентификация.
Пароль: пароль пользователя, если используется аутентификация.
Описание: описание хранилища журналов.
Сделать источником данных по умолчанию: указывает, следует ли использовать хранилище журналов по умолчанию для всех запросов, связанных с получением журналов активности.
В навигационной панели перейдите в Мониторинг → Журнал сообщений.
Отобразится таблица журналов.
Чтобы сбросить фильтры, нажмите Сбросить всё над таблицей.
Проверьте, что новые журналы активности отображаются в таблице.