Integrating with Apache Kafka


Apache Kafkais an open-source, distributed event store and stream-processing platform. It is used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

As a tenant administrator, you can optimize the performance, availability, reliability, and reachability of the applications in an IT network. Use the BMC Helix Intelligent Integrations Apache Kafka connector to collect event and metric data from Apache Kafka.

Important

The Apache Kafka connector collects data only in JSON format.

You can view the collected data in various BMC Helix applications and derive the following benefits:

BMC Helix application

Type of data collected or viewed

Benefits

BMC Helix Operations Management

Events

Use a centralized event view to monitor and manage events, perform event operations, and filter events. Identify actionable events from a large volume of event data by processing events.

For more information, see Monitoring events and reducing event noise.

BMC Helix Operations Management

Metrics

Use alarm and variate policies to detect anomalies and eliminate false positives for more accurate results while monitoring the health of your system.

For more information, see Detecting anomalies by using static and dynamic thresholds.

BMC Helix Dashboards

Metrics

Create dashboards to get a consolidated view of data collected from third-party products across your environment. 

Improve the efficiency of your system by monitoring the key performance metrics and r espond to issues quickly to minimize the down time.

For more information, see Creating custom dashboards.

As a tenant administrator, perform the following steps to configure a connection with Apache Kafka , verify the connection, and view the collected data in various BMC Helix applications.

ConnectorSteps.png

Supported versions

BMC Helix Intelligent Integrations supports version 3.3.1 of Apache Kafka for data collection.

​​​​​Task 1: To plan for the connection

Review the following prerequisites to help you plan and configure a connection with Apache Kafka .

Apache Kafka prerequisites

BMC Helix Intelligent Integrations receives information from Apache Kafka in JSON format and applies JSLT mapping to transform the incoming information into events and metrics (in JSON format) that can be understood by BMC Helix Operations Management.

Prepare the event JSON, JSLT, and event class JSON

Before you start configuring a connection with Apache Kafka to collect events, prepare the event JSON, corresponding JSLT, and the event class JSON if you plan to use an event class other than the existing classes in BMC Helix Operations Management.

A standard event JSLT must contain the following parameters. If you have any additional parameters in the event JSON for which no mapping is available in the standard JSLT and for which you want to send information to BMC Helix Operations Management , include them in the extras section in the JSLT.

JSLT containing mandatory parameters
{
    "severity":"",
    "msg": "",
    "status":"",
    "_ci_id":"",
    "source_hostname":"",
    "source_identifier": "",
    "details":"",
    "source_attributes": {"external_id":""},
    "creation_time": "",
    "source_unique_event_id":"",
    "ii_version": (.ii_version),
    "class":" "
}
Example 1: Click here to see a sample event JSON, the corresponding JSLT containing only mandatory parameters, and the event class JSON
Sample data received from Apache Kafka
[
    {
        "message": "SingleStage JSLT 10",
        "severity": "CRITICAL",
        "event_handle": "560",
        "host": "clm-pun-030058.abc.com",
        "date_reception": "1679493939",
        "mc_ueid": "demo12345",
        "mc_ucid": "12345",
        "status": "OPEN",
        "ciType": "Host"                    
    }
]
JSLT corresponding to the Apache Kafka event JSON
[for (.)
let ciExternalId = string("TSOM_"+ .ciType + "-" + .mc_ucid)
{
    "severity": .severity,
    "msg": .msg,
    "status": .status,
    "_ci_id":$ciExternalId,
    "source_hostname":.mc_host,
    "source_identifier": $ciExternalId,
    "details":"",
    "source_attributes": {
        "external_id": $ciExternalId
    },
    "creation_time": string(number(.date_reception) * 1000),
    "source_unique_event_id": "TSOM_" + string(.mc_ueid),
    "ii_version": (.ii_version),
    "class": "CustomEvent"
}
]

In the above JSLT, CustomEvent indicates the name of the class in BMC Helix Operations Management that will receive events from Apache Kafka. This class must be a child class of the IIMonitorEvent class. You can use an existing class or create a new class. If you want to create a new class, create the event class JSON, as shown in the following example.

Sample event class JSON
{
  "name": "CustomEvent",
  "parentClassName": "IIMonitorEvent"
}

If you need to include mapping for any additional parameters for which there is no mapping is available in the standard JSLT, include them in the extras section. For example, EventID and EventType are additional parameters in the following sample event JSON. In the JSLT, these parameters are included in the extras section. 

Example 2: Click here to see a sample event JSON having additional parameters and the corresponding JSLT containing the extras section
Sample data received from Apache Kafka containing EventID and EventType as additional parameters
[
    {
        "message": "SingleStage JSLT 10",
        "severity": "CRITICAL",
        "event_handle": "560",
        "host": "clm-pun-030058.abc.com",
        "date_reception": "1679493939",
        "mc_ueid": "demo12345",
        "mc_ucid": "12345",
        "status": "OPEN",
        "ciType": "Host",
        "EventID": 1235,
        "EventType": "customEventType"                   
    }
]
JSLT corresponding to the Apache Kafka event JSON containing extra parameter
[for (.)
let ciExternalId = string("TSOM_"+ .ciType + "-" + .mc_ucid)
{
    "severity": .severity,
    "msg": .message,
    "status": .status,
    "_ci_id":$ciExternalId,
    "source_hostname":.host,
    "source_identifier": $ciExternalId,
    "details":"",
    "source_attributes": {
        "external_id": $ciExternalId
    },
    "creation_time": string(number(.date_reception) * 1000),
    "source_unique_event_id": "TSOM_" + string(.mc_ueid),
    "ii_version": (.ii_version),
    "class": "CustomEvent",
     "extras": {
    "sourceEventID": string((.EventID)),
    "sourceEventType": (.EventType)
  }
}]

In the above JSLT, CustomEvent indicates the name of the class in BMC Helix Operations Management that will receive events from Apache Kafka. This class must be a child class of the IIMonitorEvent class. You can use an existing class or create a new class. If you want to create a new class, prepare the event class JSON, as shown in the following example. The JSON contains the attributes section for the additional parameters. 

Sample event class in BMC Helix Operations Management
{
  "name": "CustomEvent",
  "parentClassName": "IIMonitorEvent",
  "attributes": [
    {
      "name": "sourceEventID",
      "dataType": "INTEGER"
    },
    {
      "name": "sourceEventType",
      "dataType": "STRING"
    }
  ]
}

Prepare the metrics JSON and JSLT

Before you start configuring a connection with Apache Kafka to collect metrics, prepare the metrics JSON and the corresponding JSLT mapping.

Example 3: Click here to see a sample metrics JSON and the corresponding JSLT
Sample data received from Apache Kafka
[
    {
        "body": {
            "version": 1,
            "ts": 1675771980000,
            "samples": [
                {
                    "did": 3140,
                    "dmd": {
                        "uptime": "126 days, 11:04:20.29",
                        "total_storage": 0,
                        "total_ram": 0,
                        "operating_system_version": "5.1.3[Default]",
                        "operating_system": "IOS XR",
                        "model": "ciscoASR9010",
                        "locality": "City West, GD/107/4/B Melbourne, VIC",
                        "hostname": "clm-pun-123abc",
                        "device_type": "Infinera 7100N",
                        "description": "clm-pun-123abc",
                        "dc_stat": "VIC",
                        "dc_name": "MCI Clayton",
                        "dc_code": "MCC",
                        "dc_address": "1822 Dandenong Road, Clayton, VIC 3168",
                        "config_last_updated": "0"
                    },
                    "dsam": [
                        {
                            "ts": 1678902978000,
                            "v": 200,
                            "mmd": {
                                "unit": "pkts/s",
                                "speed": 0,
                                "oper_status": 2,
                                "name": "bundle-ether112_3507",
                                "min": "0",
                                "metric_desc": "Outbound Packets",
                                "metric": "ifOutPkts",
                                "max": "150000000",
                                "mac_addres": "30:78:35:30:38:37",
                                "ip": "",
                                "instance_desc": "",
                                "instance": "bundle-ether112_3507",
                                "ifindex": 139941,
                                "description": "N0043507R",
                                "datatype": "DERIVE",
                                "category_desc": "Network Interface",
                                "category": "interface",
                                "admin_status": 1
                            }
                        },
                        {
                            "ts": 1678902918000,
                            "v": 100,
                            "mmd": {
                                "unit": "pkts/s",
                                "speed": 0,
                                "oper_status": 2,
                                "name": "bundle-ether112_3550",
                                "min": "0",
                                "metric_desc": "Outbound Packets",
                                "metric": "ifInPkts",
                                "max": "150000000",
                                "mac_addres": "30:78:35:30:38:37",
                                "ip": "",
                                "instance_desc": "",
                                "instance": "bundle-ether112_3507",
                                "ifindex": 139941,
                                "description": "N0043507R",
                                "datatype": "DERIVE",
                                "category_desc": "Network Interface",
                                "category": "interface",
                                "admin_status": 1
                            }
                        }
                    ]
                }
            ]
        }
    }
]
Sample JSLT
let convertedData = [
for (fallback(.samples,.body.samples))
let parentBody = {for (.dmd) .key : replace(string(.value),"\\,|\\."," ")}
[
for(.dsam)
let entityName=.mmd.instance
let entityTypeId= .mmd.instance
{
    "labels": {
        "metricName": .mmd.metric,
        "hostname": $parentBody.hostname,
        "entityId":join(["Zabbix",":",($parentBody.hostname),":",$entityTypeId,":",$entityName],""),
        "entityTypeId": $entityTypeId,
        "device_id": $parentBody.hostname,
        "instanceName":$entityName,
        "entityName": $entityName,
        "hostType": fallback(.mmd.hostType,"Server"),
        "isKpi": true,
        "unit": .mmd.unit,
        "source": "Zabbix",
        "external_id": join(["ZABBIX_",.mmd.category,"-",string(.mmd.ifindex)],"")
    },
    "samples": [
        {
            "value": number(.v),
            "timestamp": number(.ts)
        }
    ]
}
]
]
flatten($convertedData)

BMC Helix Intelligent Integrations prerequisites

  • Depending on the location of the third-party product (SaaS, on-premises), choose one or more BMC Helix Intelligent Integrations deployment modes and review the corresponding port requirements. For information about various deployment modes and port requirements, see Deployment-scenarios.
  • Based on the deployment mode, use the BMC Helix Intelligent Integrations SaaS deployment or the BMC Helix Intelligent Integrations on-premises gateway or both. For more information about the gateway, see Deploying-the-BMC-Helix-Intelligent-Integrations-on-premises-gateway.
  • The on-premises gateway must be able to reach the third-party product on the required port (default is 7459).

In the preceding list, third-party product refers to Apache Kafka.

Task 2: To configure the connection with Apache Kafka

  1. Depending on the deployment mode, perform one of the following steps to access BMC Helix Intelligent
    Integrations:
    • BMC Helix Intelligent Integrations SaaS – Log on to BMC Helix Portal, and click Launch on BMC Helix
      Intelligent Integrations.
    • BMC Helix Intelligent Integrations on-premises gateway – Use the following URL to access BMC Helix
      Intelligent Integrations: https://<hostName>:<portNumber>/swpui
  2. On the CONNECTORS  tab, click add_icon.pngin the SOURCES panel.
  3. Click the Apache Kafka tile.
  4. Specify a unique instance name.

    Best practice

    We recommend that you specify the instance name in the following format:
    <sourceType>_<sourceControllerServerName>_<InstanceQualifier>

    The instance qualifier helps you to distinguish the multiple instances configured from the same source server. For example, you can name your instances Kafka_Host_PROD and Kafka_Host_TEST.

  5. Specify the following details for the source connection:
    1. Specify the Apache Kafkahost name and port number (default port number is 7459).
    2. (Optional) Specify the Schema Registry URL of the Apache Kafkahost with HTTP or HTTPS port number, in the following format: 
      <protocol>://<hostName>:<portNumber>
      For example, https://hostA:9001.
      You can use this URL to extract the JSON data from an Avro-encoded JSON.
    3. Specify one of the following security protocols:
      • PLAINTEXT (default)
      • SASL_PLAINTEXT
      • SSL
      • SASL_SSL
    4. (Optional) Specify a comma-separated list of additional Apache Kafka brokers in <hostName>:<portNumber> format.
      For example, hostA:7400, hostB:7500 .
    5. Specify the additional parameters required for various authentication protocols:

      • Secure sockets layer (SSL) key password if the SSL keystore location is configured
        The password of the private key in the keystore file or the PEM key specified in ssl.keystore.key.
      • SSL truststore file location
      • SSL truststore password for the trust store location.
        If a password is not set, you can still use the trust storefile, but integrity check will be disabled.
      • SSL keystore file location
        You can use this file for two-way authentication for client.
      • SSL keystore password
      • Simple Authentication and Security Layer (SASL) Kerberos service name
        It is the Kerberos principal name that Kafka runs.
      • SASL JAAS configuration
        It is the JAAS login context parameters for SASL connections in the format used by JAAS configuration files. For brokers, the configuration must be prefixed with the listener prefix and SASL mechanism name in lower case. Usually, the security protocol, SASL mechanism, and SASL JAAS configuration are specified together.
      • SASL mechanism
        There could be multiple SASL mechanisms that are enabled simultaneously on a broker, but each client chooses only one mechanism. Apache Kafka supports many SASL mechanisms, for example, GSSAPI (Kerberos authentication), OAUTHBEARER, SCRAM, PLAIN, Delegation Tokens, and LDAP.
      • All properties
        Use this field to specify any additional authentication parameters that are not available on the Source connection page.  You can specify the parameters in the  parameter=value  format. 
        For example, if you want to use the client DNS lookup parameter, you can specify  client.dns.lookup=use_all_dns_ips
         in this field . To specify multiple parameters, use a comma-separated list.     
      Caution

      We recommend that you do not select the Allow Unsigned Certificate option in a production environment. You might want to select this option to allow unsigned certificates in a test environment. See the Apache Kafka documentation to learn how to install SSL certificates.

    6. Specify the number of maximum concurrent REST API requests that should be executed during a data collection schedule (default value is 5).
    7. Specify the time, in seconds, after which no attempt should be made to establish a connection (default value is 30).
  6. Click VALIDATE AND CREATE .
    The specified connection details are validated and the corresponding source connection is created in the Source Connection list.
  7. Select the source connection that you created from the  list if it is not selected already.

    Important

    The destination host connection is created and configured automatically when the source connection is created.

  8. Ensure that the options for the datatypes for which you want to collect data are selected.
  9. Configure the collectors for the selected data types by clicking the respective data type in the Collectors section and specify the parameters for the selected data type, as explained in the following table.
    The ✅️  symbol indicates that this field applies to the data type.  

    Parameter name

     Description

    Data Type

    Apache Kafka Events

    Apache Kafka Metrics

    Collection Schedule

    Select one of the following options to specify the data collection frequency:

    • Duration: When you select this option, data collection happens constantly. Specify the schedule in minutes, hours, or day. 
      Default: 5 minutes
      Example:
      Collection Schedule
      is set to 5 mins.
      Current time is 00:30.

      If you run the collector just after 00:30, data is collected every 5 mins, first at 00:30 and next at 00:35, and so on.  
    • Cron schedule: When you select this option, data collection happens periodically. Specify the schedule by using a cron expression.
      A cron expression is a string consisting of five subexpressions (fields) that describe individual details of the schedule.  
      These fields, separated by blank spaces, can contain any of the allowed values with various combinations of the allowed characters for that field.
      Default: */5 * * * * (evaluates to 5 minutes)

      Format:
      Minutes Hours (24-hour format) Day of Month Month Day of Week

      Example:
      If you specify 10 15 3 7 * , data is collected at 15:10 hours every third day in the month of July.

    For more information about how this parameter affects data collection, see Data collection schedule.

    ✅️

    ✅️

    Subscription Time

    Specify the duration a client (or subscriber) remains connected to Apache Kafka and collects data from the subscribed topic.

    For example, if Collection Schedule and Subscription Time are set to 5 and 2 minutes, after every 5 minutes the client (or subscriber) connects to the specified Apache Kafkatopic and collects data for 2 minutes.

    Default: 30 seconds

    Important: The subscription time should not be more than the collection schedule.

    ✅️

    ✅️

    Topic

    Specify the name of an Apache Kafkatopic to which you have subscribed for collecting data.

    ✅️

    ✅️

    Group ID

    Specify the group ID to allow multiple consumer groups to collect data from an Apache Kafkatopic.

    The combination of Base Client ID and Number of clients generates a set of clients and those clients must be a member of the specified Group ID

    ✅️

    ✅️

    Base Client ID

    Specify the prefix of client ID to allow collection of data from the specified Apache Kafka topic.

    It is just a label associated to an Apache Kafka consumer.

    ✅️

    ✅️

    Number of clients (subscribers)

    Specify the number of clients (or consumers) who can collect data concurrently from the specified Apache Kafka topic .

    For example, if you specify Number of clients (subscribers) as 3 and Base Client ID as ClientA (which is actually a prefix), only three subscribers whose IDs are ClientA_1, ClientA_2, and ClientA_3 can access the topic, provided they belong to the specified Group ID .

    ✅️

    ✅️

  10. Click CREATE COLLECTORS to create the required collector streams for the selected data types.
  11. Configure the distributors for the selected data types by clicking the respective data type in the Distributors section and specifying the parameters for the selected data type, as explained in the following table:

    Parameter name

     Description

    Data type

    BMC Events

    BMC Metrics

    Batching Control

    Max Batching Size

    Specify the maximum number of data items to send in a single POST request to the destination API.

    The batch size depends on the destination’s ability to buffer the incoming data.

    Default: 250

    ✅️

    ✅️

    Max Batching Delay

    Specify the maximum time (in seconds) to wait before building a batch and processing.

    Default: 3 seconds 

    ✅️

     

    Base Retry Delay

    Specify the initial time (in seconds) for which to wait before retrying to build a batch and processing.

    The waiting time increases in the following sequence: n1, n2, n3, and so on, where n indicates the number of seconds.

    Default: 2 seconds

    Example:

    Base Retry Delay is set to 2 seconds.

    Retry is performed after 2, 4, 8, 16, ... seconds.

    ✅️

    ✅️

    Max Intra-Retry Delay

    Specify the maximum limit for the base retry delay. 

    Default: 60 seconds

    Example:

    Max Intra-Retry Delay is set to 60 seconds.
    Base Retry Delay is set to 2 seconds.

    Retries are performed 2, 4, 8, 16, 32, 64,... seconds later again.

    ✅️

    ✅️

    Max Retry Duration

    Specify the total time for retrying a delivery. For REST destinations, a delivery is a batch of data items in one POST request. 

    Default: 5 minutes

    Example:

    Max Retry Duration is set to 8 hours.
    Base Retry Delay is set to 2 seconds.

    Requests are sent for 2+4+8+16+32+64+132... until 8 hours in total duration is reached. After that, no subsequent attempts are made to retry the delivery.

    The assumption here is that if there is an outage or other issue with the destination tool, recovery should take less than the value of the Max Retry Duration parameter to be completed.

    ✅️

    ✅️

    JSLT 

    JSLT Mapping (Events)

    Replace the existing JSLT with the JSLT that you have prepared according to guidelines explained in the Apache Kafka prerequisites section.

    Tip: Validate that the JSLT is formed correctly according to the JSON. Otherwise, you might encounter issues later.

    ✅️

    ❌️

    Event Class JSON

    Specify the JSON for the event class if you are using any other event class other than the already existing classes in BMC Helix Operations Management.

    See the Apache Kafka prerequisites section for a sample event class JSON.

    ✅️

    ❌️

    Event Policy JSON

    Specify the JSON for the policy that should be applied to the collected events in BMC Helix Operations Management 

    For information about creating event policies, see Defining event policies for enrichment, correlation, notification, and suppression and for exporting policies in JSON format, see Migrating event policies between tenants.

    ✅️

    ❌️

    JSLT Mapping (Metrics)

    Replace the existing JSLT with the JSLT that you have prepared according to guidelines explained in the Apache Kafka prerequisites section.

    Tip: Validate that the JSLT is formed correctly according to the JSON. Otherwise, you might encounter issues later.

    ❌️

    ✅️

  12. Click CREATE DISTRIBUTORS to create the required distributor streams for the selected data types.
  13. Click one of the following buttons:
    • SAVE STREAM: Click this button if you want to edit the integration details before creating the instance. After you save the stream, the connector that you just created is listed in the SOURCES panel. Move the slider to the right to start the data stream.
    • SAVE AND START STREAM: Click this button if you want to save the integration details and start receiving data immediately.

Important
For a data stream, the Run Latency (max/avg), Items (Avg per Run), and Last Run Status columns on the Streams page might show the status as No Runs during the data collection process. After completion of the process, these columns are updated with an appropriate status.

For more information about the data streams, see Starting-or-stopping-data-streams.

Task 3: To verify the connection

From BMC Helix Intelligent Integrations , on the SOURCES panel, confirm that the data streams for the connection you created are running. Data streaming is indicated by moving colored arrows.

Kafka_stream.png

  • A moving dark blue arrow (EventsStream_Icon.png ) indicates that the event stream is running. Event data will be pushed according to the configured Collection Schedule interval.
  • A moving red arrow (MetricsStream_Icon.png ) indicates that the metric stream is running. Metric data will be pushed according to the configured Collection Schedule interval.

To view data in BMC Helix applications

View data collected from Apache Kafka in multiple BMC Helix applications.

To view events in BMC Helix Operations Management

  1. In BMC Helix Operations Management, select Monitoring > Events.
  2. Filter the events by KafkaEvent class.

Incoming events from Apache Kafka are processed in BMC Helix Operations Management   through a set of rules to determine whether the incoming event contains the results of same test on the same node and processed accordingly. For more information, see Event-deduplication-suppression-and-closure-for-reducing-event-noise.

For information about events, see Monitoring and managing events.

To view metrics in BMC Helix Operations Management

  1. In BMC Helix Operations Management, select Monitoring > Devices.
  2. Click the links for the required device.
  3. On the Monitors tab, click the required monitor.
    The Performance Overview tab shows the metrics graph.

For information about metrics, see Viewing collected data.

 

Tip: For faster searching, add an asterisk to the end of your partial query. Example: cert*