Controlling Mule Application Flow Thread Concurrency

There is a page in the Mulesoft documentation regarding the use of threads and thread pools in an Anypoint application. That documentation is here.

The following picture is from that page and illustrates where threads are used during the execution of an Anypoint application:

thread_pooling
There is a lot being shown in the picture. In particular, note that if a workflow is a:

  • synchronous process, then the receiver thread is used to execute the entire flow from the inbound endpoint up to the outbound endpoint,
  • asynchronous process, then the inbound endpoint, the flow and the outbound endpoint each have a pool of threads and phase (or stage) is executed in its own thread

An endpoint’s connector configuration contains the thread pooling information in the receiver-threading-profile and dispatcher-threading-profile child elements of the connector configuration. For example, a pair of VM inbound and outbound endpoints might have their threading limited to four concurrently running threads by the following code:

<vm:connector name="SFDC-Q" validateConnections="true" doc:name="SFDC-Q">
  <receiver-threading-profile
    maxThreadsActive="4"
    poolExhaustedAction="WAIT"
    threadWaitTimeout="-1"/>
  <dispatcher-threading-profile
    maxThreadsActive="4"
    poolExhaustedAction="WAIT"
    threadWaitTimeout="-1"/>
</vm:connector>
<flow name="collectInput" doc:name="collectInput">
  ...
  <vm:outbound-endpoint exchange-pattern="one-way"
    doc:name="toSFDC-Q" connector-ref="SFDC-Q" address="vm://sfdcQ" />
</flow>
<flow name="processFlow" doc:name="processFlow">
 <vm:inbound-endpoint exchange-pattern="one-way"
 doc:name="fromSFDC-Q" connector-ref="SFDC-Q" address="vm://sfdcQ" />
  ...
</flow>

This defines the pool of available threads going out to an endpoint and coming into an endpoint, but does not necessarily define the pool of threads that run the flow. For synchronous flows, the receiver thread is used to execute the flow components. For asynchronous flows, a thread from the flow’s thread pool is used to execute flow component’s. So for asynchronous flows, the flow’s thread pool is used to control the number of concurrently executing threads. The flow’s processingStrategy is where the minimum and maximum available threads are configured. The processingStrategy for a flow is specified as a reference to a Global Element (the Global Processing Strategies) in application. An example of setting the maximum number of currently running threads for a flow to four is shown below:

<queued-asynchronous-processing-strategy doc:name="Queued Asynchronous Processing Strategy"
  name="Queued_Asynchronous_Processing_Strategy" 
  maxThreads="4" 
  minThreads="4"/>
<flow name="processFlow" doc:name="processFlow"
    processingStrategy="Queued_Asynchronous_Processing_Strategy" 
    initialState="started">
  <vm:inbound-endpoint exchange-pattern="one-way" doc:name="fromSFDC-Q"
    connector-ref="SFDC-Q" address="vm://sfdcQ" />
  ...
</flow>

More later…

Advertisement