Apache Pulsar Consumer Trigger

Apache Pulsar consumer is a Flogo trigger that receives messages published to the configured topic. Each message triggers a new flow. The message gets acknowledged as soon as the Message Ack activity is encountered in the flow or the flow is successful.

Trigger Settings

The Trigger Settings tab has the following fields:

Field Required Description
Pulsar Connection Yes Name of the connection.
Topic No

The name of the topic from which the message is consumed.

The Topic field has the following format : persistent://public/default/foo

The first segment is the type of topic. The topic can be persistent or nonpersistent. With persistent topics, all the messages are persisted on the disk. The second segment is the name of the domain or a tenant. The third segment is the namespace within the domain and the fourth segment is the name of the individual topic.

Topics Pattern No A pattern of topics you can subscribe to, for consuming messages from multiple topics. You can specify the group using a regular expression pattern.
Example: mytopic*
Subscription Name Yes The subscription name is used by the broker to combine subscribers belonging to a single app into a logical group for message delivery. Subscriptions can also be used to deliver backlog messages to a consumer that goes offline for some time.
Subscription Type Yes The following options are available:
  • Exclusive: In the Exclusive mode, only a single consumer is allowed to attach to the subscription. An error occurs if multiple consumers subscribe to a topic by using the same subscription.
  • Shared: In the Shared mode, multiple consumers can attach to the same subscription. Messages are delivered in a round-robin distribution across consumers, and any given message is delivered to only one consumer.
  • Failover: In the Failover mode, multiple consumers can attach to the same subscription. Messages are delivered to the master consumer only. In case of the failover of a master consumer, the next master is selected by the broker and further messages are redirected to the next master.
  • KeyShared: In the KeyShared mode, multiple consumers can attach to the same subscription. Messages are delivered in distribution across consumers and messages with the same key or the same ordering key are delivered to only one consumer.
Note: In TIBCO Cloud™ Integration, in the Exclusive mode, the consumer app should not be scaled more than once. The app must be scaled down before pushing any new updates.
Processing Mode Yes
The trigger processes the messages in one of the following modes:
  • Sync: The Sync mode is the default mode. The handler receives and processes only one message at a time. This mode guarantees the processing order of the messages.
  • Async: The handler receives messages concurrently. This mode does not guarantee the processing order of the messages. The total concurrent messages handled by the trigger are based on the type of the engine runner. You can set the engine runner type by using the FLOGO_RUNNER_TYPE variable. The following values are supported:
    • POOLED: The total concurrent messages handled by the trigger handler. To achieve higher concurrency, set FLOGO_RUNNER_WORKERS to a higher value.

      For an app, the total messages processed concurrently is equal to the number of trigger handlers configured with async processing multiplied by the FLOGO_RUNNER_WORKERS. For example, if two Pulsar trigger handlers are configured with async mode and FLOGO_RUNNER_WORKERS is set to 10, 20 messages would be processed concurrently at a given time.

    • DIRECT: Currently, 200 concurrent messages are handled by the trigger handler.

      For an app, the total messages processed concurrently is equal to the number of trigger handlers configured with async processing multiplied by 200. For example, if two Pulsar trigger handlers are configured with async mode then 400 messages would be processed concurrently at a given time.

Initial Position Yes On the Initial Position field, select Latest to receive the messages that have been published after the subscriber has been connected. Select Earliest to receive all the stored and new messages.
DLQ Topic No The DLQ Topic is available only if the Subscription Type is Shared. If the flow started by the trigger does not complete successfully, the message is negatively acknowledged. If this happens repeatedly, the message effectively blocks processing by the trigger. This field allows you to relocate messages that cannot be processed to another topic. If the DLQ Topic field is not provided, then the dead letter queue (DLQ) processing is not performed.

Note: A delay exists between a negative acknowledgment and reposting the message. So, it can take several minutes for a message to arrive on the DLQ.

DLQ Max Deliveries No The number of times a message is negatively acknowledged before being rerouted to the DLQ Topic field.
Nack Redelivery Delay Yes The delay in seconds after which the message is redelivered when the message is not acknowledged. Default: 60 seconds.
Enable Batch Index Acknowledgment No

This is a checkbox to enable the batch index acknowledgments. Ensure that the batch index acknowledgment is also enabled by setting the acknowledgmentAtBatchIndexLevelEnabled parameter to true at the broker side.

Max Pending Chunked Message No

Specifies the maximum size of the queue that can hold pending message chunks for a consumer.

Default: 100

Expire time of incomplete Chunk No

Specifies the total time interval to expire the incomplete chunks, if the consumer fails to receive all the chunks of a message in the given time period.

Default: 60 sec

Auto Ack Incomplete Chunk No This automatically acknowledges any pending message chunks when the Chunk Max Pending Messages threshold value is reached.
Message Format Yes The Message Format field controls the format of the output schema. The following options are available:
  • String
  • JSON: If JSON is selected, a text editor is provided on the Output Settings tab to accept a JSON document

The consumer created has a name in the following format that can be seen by using the Pulsar Admin REST API: <ApplicationName>-<AppVersion>-<FlowName>-<HostName>

Output Settings

The Output Settings tab has the following fields:

Field Required Description
Message Properties No You can add a property value to the properties field presented on the input schema. Each additional property is presented as a named string to be mapped.
Schema for JSON value No The Schema for JSON value field is only available when the Message Format field on the Triggers Settings tab is JSON. This is a free form text editor that accepts any valid JSON document, which is then presented on the output schema.

Map to Flow Inputs

The Map to Flow Inputs tab has the following fields:

Field Description
payload Either a simple string or a representation of the JSON document provided on the Output Settings tab.
properties An object with a string value for each of the named properties from the Output Settings tab.
topic If the subscriber subscribes to multiple topics, the topic field provides the actual topic on which the message has arrived.
msgid A string representing the message id.
redelivery Count An integer representing the redelivery count of the message.