The blueworx-utils package provides an Apache Kafka consumer to read CDRs from the Apache Kafka blueworx.cdr topic and output them to a PostgreSQL database.
Before continuing make sure you have the blueworx-utils package installed as detailed here: Installing Blueworx Utilities
PostgreSQL setup
Before you can use the consumer you need to install the required database and roles into PostgreSQL.
If you are hosting PostgreSQL on Red Hat Enterprise Linux or CentOS, you need to install the PostgreSQL server package before running the SQL scripts to install the database and role required by the consumer. For instructions on how to install the PostgreSQL server package, see the PostgreSQL documentation: https://www.postgresql.org/download/linux/redhat/.
Once you have installed the PostgreSQL package, switch to the postgres user ID:
su - postgres
Change directory to the consumer's SQL directory:
cd /opt/blueworx/utils/consumers/sql
Then run the create database script in this directory:
psql -f cdr-postgresql-create-database.sql
This will create the necessary database structure required by the consumer.
Using the consumer
If your PostgreSQL instance and Apache Kafka broker is located on the same server as your consumer, you can run the consumer without customising any configuration options. If one or more of these services is located off the server you are running the consumer on, you will need to specify additional configuration options.
Specifying configuration options
All configuration options are set in the /opt/blueworx/utils/consumers/config/cdr-postgresql-consumer.properties properties file.
The available configuration options are:
Name | Mandatory | Type | Default | Description |
---|---|---|---|---|
db_url | no | string | jdbc:postgresql://localhost:5432/blueworx_cdr_records | A jdbc URL pointing to the database. When using a remote database, modify the localhost part of the URL to be the host and port (in the host:port format) that the remote database is running on. If no port is specified, the default of 5432 will be used. |
db_user | no | string | bwconsumers | The database user ID that owns the database, tables, and is used to connect to the PostgreSQL database over JDBC. |
db_pass | no | string | The password for the db_user specified above. | |
poll_rate | no | integer | 1000 | The rate at which the Apache Kafka topic is polled for new messages, specified in milliseconds. |
commit_frequency | no | integer | 1 | Frequency to commit position in the Apache Kafka topic with the broker. The default of 1 commits after each record is processed. Everytime the consumer commits the position in the topic a message is sent to the broker. To improve performance increase this number so commit messages are sent less frequently. It is recommended you only do this if you are experiencing performance issues. Note: if you increase this value then our position in the Apache Kafka topic will not be recorded after we process every message. If the consumer were to fail and need to be restarted without recording the position, a message may be processed twice. Due to constraints on the database tables, even if the consumer does process the message again, a duplicate entry should not be added to the database. |
In addition to these properties, any standard Apache Kafka consumer property can be specified. This can be used for configuring the consumer based on the security setup of your Kafka environment. Consult the Apache Kafka documentation for available consumer options.
The two most likely Apache Kafka properties you will want to override are bootstrap.servers and group.id, to override the list of Apache Kafka brokers and consumer group ID, respectively. This is explained in the table below.
Name | Mandatory | Type | Default | Description |
---|---|---|---|---|
bootstrap.servers | no | string | localhost:9092 | The URI of the Apache Kafka broker(s) to retrieve messages from. If you specify more than one broker (in an Apache cluster setup), specify in the format host1:port,host2:port. |
group.id | no | string | Blueworx-CDR-Consumer | Unique ID for the group of consumers handling this workload. Each consumer group will only process a message once, but running multiple consumers in the same consumer gorup allows for parallel processing of messages. |
Running the consumer
Once you have configured the consumer for your environment, and installed the database, you can run the consumer by issuing the following command:
/opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh
Running the consumer as a service
An example service is shown below:
[Unit] Description=CDR PostgreSQL Daemon [Service] ExecStart=/opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh User=root [Install] WantedBy=multi-user.target
To use this example, create a new service file (for example cdr-consumer.service) in /etc/systemd/system/multi-user.target.wants on your Red Hat Enterprise Linux or CentOS system.
You can now use the following commands to control your service.
Check status of the consumer service
To check the status of the CDR consumer issue the following command:
systemctl status cdr-consumer.service
Start the consumer service
To start the CDR consumer issue the following command:
systemctl start cdr-consumer.service
Example status output:
# systemctl status cdr-consumer.service ● consumer.service - CDR PostgreSQL Daemon Loaded: loaded (/usr/lib/systemd/system/cdr-consumer.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2018-08-16 11:35:57 BST; 14s ago Main PID: 11578 (cdr-postgresql-) CGroup: /system.slice/consumer.service ├─11578 /bin/bash /opt/blueworx/utils/consumers/bin/cdr-postgresql-consumer.sh └─11580 java -Dconsumer.properties=/opt/blueworx/utils/consumers/config/cdr-postgresql-consumer.properties -Dlog4j.configuration=file:/opt/blueworx/utils/consumers/config/log4j.propertie... Aug 16 11:35:57 bvr.example.com cdr-postgresql-consumer.sh[11578]: value.deserializer = class com.blueworx.util.consumer.cdr.CDRDeserialiser Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:35:58 INFO PostgreSQLConsumer:297 - Opened database successfully: jdbc:postgresql://local...r_records Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.Metadata - Cluster ID: Cw63-muSRJ-XBjld3txmIg Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ck: null) Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions [] Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ing group Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...eration 6 Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions []
Stop the consumer service
To stop the CDR consumer issue the following command:
systemctl stop cdr-consumer.service
Example status output:
# systemctl status cdr-consumer.service ● consumer.service - CDR PostgreSQL Daemon Loaded: loaded (/usr/lib/systemd/system/cdr-consumer.service; disabled; vendor preset: disabled) Active: inactive (dead) Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions [] Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...ing group Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consume...eration 6 Aug 16 11:35:58 bvr.example.com cdr-postgresql-consumer.sh[11578]: [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consume...itions [] Aug 16 11:36:25 bvr.example.com systemd[1]: Stopping CDR PostgreSQL Daemon... Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO PostgreSQLConsumer:111 - Starting exit... Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO PostgreSQLConsumer:267 - Closing consumer. Committing current offsets (sync): {} Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO PostgreSQLConsumer:121 - Closed database connection Aug 16 11:36:25 bvr.example.com cdr-postgresql-consumer.sh[11578]: 2018-08-16 11:36:25 INFO PostgreSQLConsumer:273 - Closed consumer Aug 16 11:36:25 bvr.example.com systemd[1]: Stopped CDR PostgreSQL Daemon.