Apache Pulsar Producer Activity

Apache Pulsar Producer activity can be used to map a string or a JSON object to a message. You can use the Producer activity to map the properties such as string:stringpairs and an optional key. You can send messages to a single topic and compress them by all the supported algorithms.

Settings

On the Settings tab, configure the following settings:

Field Required Description
Pulsar Connection Yes Name of the connection.
Topic Name Yes The name of the topic to which the message is published. The Topic Name field has the following format :persistent://public/default/foo

The first segment is the type of topic. The topic can be persistent or nonpersistent. With persistent topics, all the messages are persisted to the disk. The second segment is the name of the domain or tenant. The third segment is the namespace within the domain and the fourth segment is the name of the individual topic.

An abbreviated form of the topic name can be used only when the rightmost segment is provided. If the other segments of the topic name are needed then the complete topic name must be used.

Send Mode Yes

The message can be sent to the brokers using the Sync and Async mode.

Compression Type No The following options are available for the compression type:
  • None
  • LZ4
  • ZLIB
  • ZSTD
Chunking No

Select the Chunking checkbox to enable the chunking feature.

Note: If you set Chunking to True, Batching automatically gets disabled.
ChunkMaxMessage Size Yes

The maximum chunk size of a message that can be sent in bytes.

Default: 5242880.

Batching No

Select the Batching checkbox to enable batching. The batching at the

producer happens only when the Send Mode is Async on the General tab.

Note: If you set Batching to True, Chunking automatically gets disabled.
Batching Max Messages Yes

Set the maximum number of messages that can be sent in a particular batch.

Default: 1000

Batching Max Size Yes

This specifies maximum number of bytes permitted in a batch.

Default: 128000

Batching Max Publish Delay Yes Specifies the maximum time in milliseconds within which the messages sent can be batched. Default value: 10 ms

The producer created has a name in the following format that can be seen by using the Pulsar Admin REST API: <ApplicationName>-<AppVersion>-<FlowName>-<ActivityName>-<HostName>

Input Settings

The Input Settings tab has the following fields:

Field Required Description
Message Format Yes The following options are available for the message format:
  • String: A simple string can be mapped on the Input schema.
  • JSON: The Input schema is updated to reflect the structure of the JSON document and flow data can be mapped to the fields.
Message Properties No You can use the Message Properties field to add property value to the properties field presented on the Input schema. Each additional property is presented as a named string that is mapped.
Schema for JSON Value No You can see the Schema for JSON Value field only if the Message Format on the Input Settings tab is JSON. This is a free form text editor, which accepts any valid JSON document that is presented on the Input schema. This JSON document must also be used by the receiving app to decode the message.

Input

The Input tab has the following fields:

Field Description
payload The message to be published to a topic. Either a simple string or a representation of the JSON document provided on the Input Settings tab.
properties An object with a string value for each of the named properties from the Input Settings tab.
key A string value used by the topic compaction function of the broker. In the shared subscriber mode subscribers are bound to specific keys so that the repeat keys are always processed by the same consumer.

Output Settings

The Output Settings tab has the following field:

Field Description
msgid A string representing the message id.

Output

The Output tab displays the output schema of the activity as a tree structure. The output is read-only. The information in the schema varies based on the fields selected on the Settings tab. The properties that are displayed in the schema correspond to the output of this activity and can be used as input by subsequent activities in the flow.