Outputting CDRs to PostgreSQL

The blueworx-utils package provides an Apache Kafka consumer to read CDRs from the Apache Kafka blueworx.cdr topic and output them to a PostgreSQL database.

Before continuing make sure you have the blueworx-utils package installed as detailed here: Installing Blueworx Utilities

PostgreSQL setup

Before you can use the consumer you need to install the required database and roles into PostgreSQL.

If you are hosting PostgreSQL on Red Hat Enterprise Linux or CentOS, you need to install the PostgreSQL server package before running the SQL scripts to install the database and role required by the consumer. For instructions on how to install the PostgreSQL server package, see the PostgreSQL documentation: https://www.postgresql.org/download/linux/redhat/.

Once you have installed the PostgreSQL package, switch to the postgres user ID:

su - postgres

Change directory to the consumer's SQL directory:

cd /opt/blueworx/utils/consumers/sql

Then run the create database script in this directory:

psql -f cdr-postgresql-create-database.sql
Note: If your PostgreSQL database is located on a different server to the blueworx-utils package you will need to specify additional flags for hostname, username and password.

This will create the necessary database structure required by the consumer.

Using the consumer

If your PostgreSQL instance and Apache Kafka broker is located on the same server as your consumer, you can run the consumer without customising any configuration options. If one or more of these services is located off the server you are running the consumer on, you will need to specify additional configuration options.

Specifying configuration options

All configuration options are set in the /opt/blueworx/utils/consumers/config/cdr-postgresql-consumer.properties properties file.

The available configuration options are:

Table 1. Apache Kafka CDR Consumer properties
Name Mandatory Type Default Description
db_url no string jdbc:postgresql://localhost:5432/blueworx_cdr_records A jdbc URL pointing to the database. When using a remote database, modify the localhost part of the URL to be the host and port (in the host:port format) that the remote database is running on. If no port is specified, the default of 5432 will be used.
db_user no string bwconsumers The database user ID that owns the database, tables, and is used to connect to the PostgreSQL database over JDBC.
db_pass no string   The password for the db_user specified above.
poll_rate no integer 1000 The rate at which the Apache Kafka topic is polled for new messages, specified in milliseconds.
commit_frequency no integer 1 Frequency to commit position in the Apache Kafka topic with the broker.

The default of 1 commits after each record is processed. Everytime the consumer commits the position in the topic a message is sent to the broker. To improve performance increase this number so commit messages are sent less frequently. It is recommended you only do this if you are experiencing performance issues.

Note: if you increase this value then our position in the Apache Kafka topic will not be recorded after we process every message. If the consumer were to fail and need to be restarted without recording the position, a message may be processed twice. Due to constraints on the database tables, even if the consumer does process the message again, a duplicate entry should not be added to the database.

In addition to these properties, any standard Apache Kafka consumer property can be specified. This can be used for configuring the consumer based on the security setup of your Kafka environment. Consult the Apache Kafka documentation for available consumer options.

The two most likely Apache Kafka properties you will want to override are bootstrap.servers and group.id, to override the list of Apache Kafka brokers and consumer group ID, respectively. This is explained in the table below.

Table 2. Important Apache Kafka properties
Name Mandatory Type Default Description
bootstrap.servers no string localhost:9092 The URI of the Apache Kafka broker(s) to retrieve messages from. If you specify more than one broker (in an Apache cluster setup), specify in the format host1:port,host2:port.
group.id no string Blueworx-CDR-Consumer Unique ID for the group of consumers handling this workload. Each consumer group will only process a message once, but running multiple consumers in the same consumer gorup allows for parallel processing of messages.

Running the consumer

Once you have configured the consumer for your environment, and installed the database, you can run the consumer by issuing the following command:

/opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh
Note: This will start the consumer in the active shell, and so will terminate when you log off. To create a long running consumer task either:
  1. Run the consumer with the nohup command
  2. Run the consumer as a service

Running the consumer as a service

Note: This documentation will not detail in full how to create a service to run the consumer, and a service may need to be tailored depending on your environment.

An example service is shown below:

[Unit]
Description=CDR PostgreSQL Daemon

[Service]
ExecStart=/opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh
User=root

[Install]
WantedBy=multi-user.target

To use this example, create a new service file (for example cdr-consumer.service) in /etc/systemd/system/multi-user.target.wants on your Red Hat Enterprise Linux or CentOS system.

You can now use the following commands to control your service.

Check status of the consumer service

To check the status of the CDR consumer issue the following command:

systemctl status cdr-consumer.service

Start the consumer service

To start the CDR consumer issue the following command:

systemctl start cdr-consumer.service

Example status output:

# systemctl status cdr-consumer.service
● consumer.service - CDR PostgreSQL Daemon
   Loaded: loaded (/usr/lib/systemd/system/cdr-consumer.service; disabled; vendor preset: disabled)
   Active: active (running) since Thu 2018-08-16 11:35:57 BST; 14s ago
 Main PID: 11578 (cdr-postgresql-)
   CGroup: /system.slice/consumer.service
           ├─11578 /bin/bash /opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh
           └─11580 java -Dconsumer.properties=/opt/blueworx/utils/consumers/config/cdr-postgresql-consumer.properties -Dlog4j.configuration=file:/opt/blueworx/utils/consumers/config/log4j.propertie...

Aug 16 11:35:57 bvr.example.com cdr-postgresql-consumer.sh[11578]: value.deserializer = class com.blueworx.util.consumer.cdr.CDRDeserialiser
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:35:58 INFO  PostgreSQLConsumer:297 - Opened database successfully: jdbc:postgresql://local...r_records
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.Metadata - Cluster ID: Cw63-muSRJ-XBjld3txmIg
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ck: null)
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions []
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ing group
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...eration 6
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions []

Stop the consumer service

To stop the CDR consumer issue the following command:

systemctl stop cdr-consumer.service

Example status output:

# systemctl status cdr-consumer.service
● consumer.service - CDR PostgreSQL Daemon
   Loaded: loaded (/usr/lib/systemd/system/cdr-consumer.service; disabled; vendor preset: disabled)
   Active: inactive (dead)

Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions []
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ing group
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...eration 6
Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions []
Aug 16 11:36:25 bvr.example.com systemd[1]: Stopping CDR PostgreSQL Daemon...
Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO  PostgreSQLConsumer:111 - Starting exit...
Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO  PostgreSQLConsumer:267 - Closing consumer. Committing current offsets (sync): {}
Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO  PostgreSQLConsumer:121 - Closed database connection
Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO  PostgreSQLConsumer:273 - Closed consumer
Aug 16 11:36:25 bvr.example.com systemd[1]: Stopped CDR PostgreSQL Daemon.