Adapter request and response for the Kafka adapter
This topic describes the adapter request and response in the Kafka adapter.
When you use a context item as an input for an adapter request, you must enclose the adapter request in the <request-data> elements. However, when you create a static request, <request-data> is not required and the adapter request starts with the <kafka-adapter-request> element.
Send Messages
Sends messages to Kafka topic.
The following table describes the input elements for the adapter wrapper process.
Input | Description | Required |
|---|---|---|
adapter name | Specifies the name of the adapter as configured on the Grid Manager. If no adapter name is specified, the default name KafkaActorAdapter is considered. | No |
url | Specifies the URL where the Kafka broker is located in a hostname:port or IPAddress:port format. To specify a broker cluster, enter comma-separated broker URLs. This input element is used to set the connection property called bootstrap.servers in Kafka. Do not specify the bootstrap.servers property in the connection properties input element. | Conditional; required if not provided in the adapter configuration. |
topic | Specifies the Kafka topic to which you want to send messages. You must specify the URL to use the topic element. | Conditional; required if not provided in the adapter configuration. |
partition | Specifies the partition number. You must specify the topic before specifying the partition. If not specified, the default value is 0. If topic is not present, but partition is present, the value will be reset to empty and value provided in adapter configuration will be used. | Conditional; required if not provided in the adapter configuration. |
connection properties | Contains the connection properties required to connect to Kafka. Contains the <key.serializer> and <value.serializer> elements. Use this parameter only if the url, topic, and partition elements are specified in the adapter request. You can specify other properties within the <connection-properties> element that are supported by the Apache Kafka interface. For more information, see Apache Kafka documentation. | Conditional; required if not provided in the adapter configuration. |
messages | Specifies the messages to be sent to the Kafka topic. Example: <entries> <entry> <key>request1</key> <value>Hello World!</value> </entry> <entry> <key>request2</key> <value>Hello Universe!</value> </entry> </entries> | Yes |
If using the Call Adapter activity in the Development Studio, the following table describes the input elements for the adapter request.
Name | Description |
|---|---|
<target> | Specifies the dynamic target |
<url> | Specifies the URL where the Kafka broker is located in a hostname:port or IPAddress:port format. To specify a broker cluster, enter comma-separated broker URLs. |
<topic> | Specifies the Kafka topic to which you want to send messages. You must specify the URL to use the topic element. |
<partition> | Specifies the partition number. You must specify the topic before specifying the partition. If not specified, the default value is 0. If topic is not present, but partition is present, the value will be reset to empty and value provided in adapter configuration will be used. |
<data> | Contains the single <entry> or multiple <entries> elements. |
<entries> | Contains multiple <entry> elements in key-value pairs. |
<entry> | Contains a single entry in a key-value pair. To specify multiple entries, use the <entries> element with a single <entry> element. |
<key> | Specifies the value of the message to be published. |
<value> | Specifies the portion of the message to be published. This can be a string or an XML. JSON data is considered as a string. |
The following figure shows a sample adapter request with a single <entry>.
<target>
<url>[host]:[port]</url>
<topic>adapterTest</topic>
<partition>0</partition>
<character-set></character-set>
<connection-properties>
<acks>all</acks>
<retries>1</retries>
<batch.size>16384</batch.size>
<linger.ms>1</linger.ms>
<key.serializer>org.apache.kafka.common.serialization.StringSerializer</key.serializer>
<value.serializer>org.apache.kafka.common.serialization.StringSerializer</value.serializer>
</connection-properties>
</target>
<data>
<entry>
<key>request1</key>
<value>Hello World!</value>
</entry>
</data>
</kafka-adapter-request>
The following figure shows the adapter response.
<metadata>
<status>success</status>
</metadata>
<kafka-publish-responses>
<topic>adapterTest</topic>
<partition>0</partition>
<kafka-record>
<kafka-record-request>
<key>request1</key>
<value>Hello World!</value>
</kafka-record-request>
<kafka-record-response>
<status>success</status>
<topic>adapterTest</topic>
<partition>0</partition>
<offset>0</offset>
<timestamp>1571805524304</timestamp>
</kafka-record-response>
</kafka-record>
</kafka-publish-responses>
</kafka-adapter-response>
The following figure shows a sample adapter request with multiple entries using the <entries> element.
<data>
<entries>
<entry>
<key>request1</key>
<value>Hello World!</value>
</entry>
<entry>
<key>request2</key>
<value>Hello India!</value>
</entry>
</entries>
</data>
</kafka-adapter-request>
The following figure shows the adapter response.
<metadata>
<status>success</status>
</metadata>
<kafka-publish-responses>
<topic>adapterTest</topic>
<partition>0</partition>
<kafka-record>
<kafka-record-request>
<key>request1</key>
<value>Hello World!</value>
</kafka-record-request>
<kafka-record-response>
<status>success</status>
<topic>adapterTest</topic>
<partition>0</partition>
<offset>1</offset>
<timestamp>1571805769609</timestamp>
</kafka-record-response>
</kafka-record>
<kafka-record>
<kafka-record-request>
<key>request2</key>
<value>Hello India!</value>
</kafka-record-request>
<kafka-record-response>
<status>success</status>
<topic>adapterTest</topic>
<partition>0</partition>
<offset>2</offset>
<timestamp>1571805769609</timestamp>
</kafka-record-response>
</kafka-record>
</kafka-publish-responses>
</kafka-adapter-response>
The following figure shows the adapter response if the request fails because the broker is unavailable.
<metadata>
<status>success</status>
</metadata>
<kafka-publish-responses>
<topic>adapterTest</topic>
<partition>0</partition>
<kafka-record>
<kafka-record-request>
<key>request1</key>
<value>Hello World!</value>
</kafka-record-request>
<kafka-record-response>
<status>failure</status>
<error-message>Error sending message to topic: org.apache.kafka.common.errors.TimeoutException:
Topic adapterTest not present in metadata after 60000 ms.. Error: {2}</error-message>
</kafka-record-response>
</kafka-record>
<kafka-record>
<kafka-record-request>
<key>request2</key>
<value>Hello India!</value>
</kafka-record-request>
<kafka-record-response>
<status>failure</status>
<error-message>Error sending message to topic: org.apache.kafka.common.errors.TimeoutException:
Topic adapterTest not present in metadata after 60000 ms.. Error: {2}</error-message>
</kafka-record-response>
</kafka-record>
</kafka-publish-responses>
</kafka-adapter-response>
The following table describes the output elements for the process.
Output element | Description |
|---|---|
adapter response | Contains the adapter response. |
status | Indicates the status of the request. Valid values: success, error |
error message | Contains the error message, if the request fails. |