<html>
<head>
<base href="https://wiki.asterisk.org/wiki">
<link rel="stylesheet" href="/wiki/s/en/2160/1/7/_/styles/combined.css?spaceKey=TOP&forWysiwyg=true" type="text/css">
</head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
<h2><a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components">Uses of ThreadPools and WorkQueues in Asterisk SCF Components</a></h2>
<h4>Page <b>edited</b> by <a href="https://wiki.asterisk.org/wiki/display/~kpfleming">Kevin P. Fleming</a>
</h4>
<div id="versionComment">
<b>Comment:</b>
lots of editorial changes; added summary section<br />
</div>
<br/>
<h4>Changes (20)</h4>
<div id="page-diffs">
<table class="diff" cellpadding="0" cellspacing="0">
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{warning} <br>This is a work in progress <br>{warning} <br> <br></td></tr>
<tr><td class="diff-unchanged" >h1. Introduction <br> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Asterisk SCF provides three interfaces (as well as implementations in C\+\+) for assisting with task execution and concurrency. The {{Queue}} interface ( {{WorkQueue}} implementation in C++) provides a method to execute tasks sequentially. The {{SuspendableQueue}} interface ( {{SuspendableWorkQueue}} implementation in C\+\+) provides similar functionality. The differences are that the SuspendableQueue is more restrictive with regards to task execution. One task must fully complete before the next task may be executed. In addition, a task may finish in a suspended state, indicating that it cannot execute any further at the moment but that it will complete later. Finally, there's the {{Pool}} interface ( {{ThreadPool}} implementation in C\+\+). The {{ThreadPool}} uses a {{WorkQueue}} internally to organize tasks. The {{ThreadPool}} provides methods to set a number of worker threads to grab tasks from the queue and execute them. Since a {{WorkQueue}} is the structure used internally, the worker threads grab tasks in the order that they were entered into the queue, but it is possible for multiple threads to execute tasks concurrently. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Asterisk SCF provides three interfaces (as well as implementations in C++) for assisting with task execution and concurrency. <br>* The {{Queue}} interface ({{WorkQueue}} implementation in C++) provides a method to execute tasks sequentially. <br>* The {{SuspendableQueue}} interface ({{SuspendableWorkQueue}} implementation in C++) provides similar functionality; it differs primarily in that it does not allow concurrent execution of tasks from the queue. In addition, a task may finish in a suspended state, indicating that it cannot execute any further at the moment but that it will complete later. <br>* The {{Pool}} interface ({{ThreadPool}} implementation in C++). The {{ThreadPool}} uses a {{WorkQueue}} internally to organize tasks. The {{ThreadPool}} provides methods to control the number of worker threads created to grab tasks from the queue and execute them. Since a {{WorkQueue}} is the structure used internally, the worker threads grab tasks in the order that they were entered into the queue, but if the {{ThreadPool}} has multiple worker threads, then it is likely that multiple tasks will be executed concurrently. <br>For more information on these interfaces, see the&nbsp;[TOP:Work Queues and Thread Pools]&nbsp;page. <br></td></tr>
<tr><td class="diff-unchanged" > <br>What follows are some examples of how they can be used in concert to mold execution to follow concurrency models necessary for specific components. <br> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h1. ThreadPool Usage <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h3. Thread Pool Usage <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-changed-lines" >A common approach for components will be to farm tasks out to multiple threads. While setting the size of an Ice thread pool is one approach to the matter, they are not as flexible as the implementation provided by the {{ThreadPool}}. The big reason is that the {{ThreadPool}} is able to inform <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">a component</span> <span class="diff-added-words"style="background-color: #dfd;">its owner</span> about the current load and can grow or shrink based on <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">the current</span> <span class="diff-added-words"style="background-color: #dfd;">that</span> load. For many components, this is ideal since with communications <span class="diff-changed-words">platforms<span class="diff-deleted-chars"style="color:#999;background-color:#fdd;text-decoration:line-through;">,</span></span> the <span class="diff-added-words"style="background-color: #dfd;">work</span> load can be <span class="diff-changed-words">inconsistent<span class="diff-deleted-chars"style="color:#999;background-color:#fdd;text-decoration:line-through;"> and can spike at times</span>.</span> <br></td></tr>
<tr><td class="diff-unchanged" > <br>Below is an example of thread pool usage: <br></td></tr>
<tr><td class="diff-snipped" >...<br></td></tr>
<tr><td class="diff-unchanged" >!SimpleThreadPoolUsage.png! <br> <br></td></tr>
<tr><td class="diff-changed-lines" >This shows how operation {{foo()}} is executed when using a {{ThreadPool}}. {{foo()}} is encapsulated as <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">an</span> <span class="diff-added-words"style="background-color: #dfd;">a</span> queuable operation {{FooOperation}}. This operation is queued on to the {{ThreadPool}}'s {{WorkQueue}}. The {{ThreadPool}} is informed that work has been added and then informs the {{ThreadPoolListener}}. The {{ThreadPoolListener}} is implemented within <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">our component.</span> <span class="diff-added-words"style="background-color: #dfd;">the component that created the {{ThreadPool}}.</span> In this particular case, the listener decides that a new {{WorkerThread}} should be created, so it calls {{setSize}} on the {{ThreadPool}} so that a new thread <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">is</span> <span class="diff-added-words"style="background-color: #dfd;">will be</span> created. A {{WorkerThread}} is created, and it executes the work. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">This model works well for cases where all operations are completely independent. A potential component that could use this would be the service locator. For other components, this model is insufficient. Other components will wish to distribute tasks to worker threads, but they will require that like operations be grouped in such a way that they are run sequentially. Following is a description of how this is implemented in the SIP component. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">This model works well for cases where all operations are completely independent. A component that could potentially use this model would be the Service Locator. For other components, this model is insufficient. Other components will wish to distribute tasks to worker threads, but they will require that like operations (those affecting common objects, working on shared data) be grouped in such a way that they are run sequentially. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h1. SIP's Thread Pool Usage <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h3. Thread Pool Usage in the SIP Session Gateway <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">The SIP component presents an interesting challenge. The SIP component can receive messages from multiple sources: either from PJSIP via registered callbacks or via Ice RPCs. The messages may arrive at any time and may require access to common objects. The SIP component is not as tolerant of delays due to RPCs as other components may be due to the high rate of messages that may be received. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">The SIP Session Gateway presents an interesting challenge. It can receive messages from multiple sources: from the PJSIP stack (as a result of SIP requests or responses arriving from the network) or from the Ice runtime (as a result of operations being invoked on its servant objects). The messages may arrive at any time and may require access to common objects. In addition, it is important for the component to be able to respond to these messages as quickly as possible; due to the potentially high volume of messages, it's not practical for a thread in the SIP Session Gateway to *block* waiting for another operation to complete. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">In the case of the SIP session gateway, the {{SipSession}} is involved in the vast majority of operations. Since operations originate from multiple sources, and operations on a {{SipSession}} should execute in the order in which they arrive, it is an obvious choice to have a {{WorkQueue}} on each SIP session. We actually use a {{SuspendableWorkQueue}} so that if operation of tasks on a {{SipSession}} needs to be suspended, it can be. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">In the SIP Session Gateway, the {{SipSession}} is involved in the vast majority of operations. Since operations originate from multiple sources, and operations on a {{SipSession}} should execute in the order in which they arrive, it is an obvious choice to have a {{WorkQueue}} on each SIP session. We actually use a {{SuspendableWorkQueue}} so that if an operation on the {{SipSession}} needs to enter a _suspended_ state (waiting for another operation elsewhere in the system to complete), it can do so, without blocking the worker thread that is executing the operation. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{{ThreadPool}} and {{SuspendableWorkQueue}} usage can be combined. Each {{SipSession}} contains an object called {{SessionWork}}. {{SessionWork}} contains a {{SuspendableWorkQueue}} so that all operations on a particular session run sequentially. In addition, the {{SessionWork}} will itself be a a queueable item. The {{SessionWork}} can be placed into the {{ThreadPool}}'s {{WorkQueue}}. Using this method, whenever a worker thread pulls a task from the thread pool, it actually is pulling an entire {{SuspendableWorkQueue}} of work. This allows for the worker thread to run multiple tasks, and it prevents other worker threads from attempting to run tasks related to a specific {{SipSession}}. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">{{ThreadPool}} and {{SuspendableWorkQueue}} usage can be combined. Each {{SipSession}} contains a member variable called {{SessionWork}}. {{SessionWork}} contains a {{SuspendableWorkQueue}} which is used to hold the pending operations on that session and ensure that they will be executed serially. In addition, the {{SessionWork}} is itself a queueable item, which means that the {{SessionWork}} can be placed into the {{ThreadPool}}'s {{WorkQueue}}. Using this method, whenever a worker thread pulls a task from the thread pool, it actually is pulling an entire {{SuspendableWorkQueue}} of work. This allows for the worker thread to execute multiple pending tasks on that {{SipSession}}, and it prevents other worker threads from attempting to run tasks related to that {{SipSession}}. <br></td></tr>
<tr><td class="diff-unchanged" > <br>!ThreadPoolUsage.png! <br> <br></td></tr>
<tr><td class="diff-changed-lines" >In this example, an Asterisk SCF component calls operation {{foo()}} provided by the {{SipSession}}. A <span class="diff-changed-words">{{Work}}<span class="diff-added-chars"style="background-color: #dfd;">\</span>-derived</span> object, {{FooOperation}} is created and enqueued on the {{SessionWork}}'s internal {{SuspendableWorkQueue}}. The {{SessionWork}} acts as a listener for the <span class="diff-changed-words">{{Suspendable<span class="diff-added-chars"style="background-color: #dfd;">Work</span>Queue}}</span> and reacts to the work being enqueued by enqueuing itself onto the {{ThreadPool}}'s {{WorkQueue}}. The {{WorkQueue}} notifies the {{ThreadPool}}, which then notifies a {{ThreadPoolListener}} of the work added. In this particular scenario, the {{ThreadPoolListener}} signals the {{ThreadPool}} to create a {{WorkerThread}} to handle the work. The {{WorkerThread}} then initiates a series of {{executeWork}} and {{execute}} calls in order to run the {{FooOperation}}. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">While this approach is used exclusively in the SIP component, it will likely fit well into other components as well. For instance, the bridging component may use this approach to group all operations relating to a specific bridge together while still allowing for concurrent execution of tasks. The media component may similarly use this approach to group operations relating to a media session together. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h3. Summary <br> <br>Using the Slice-defined {{Pool}}, {{Queue}} and {{SuspendableQueue}} interfaces provides a straightforward way to manage complex workloads in an Asterisk SCF component, while minimizing contention for shared data (and the resulting performance problems it can cause). <br></td></tr>
</table>
</div> <h4>Full Content</h4>
<div class="notificationGreySide">
<h1><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-Introduction"></a>Introduction</h1>
<p>Asterisk SCF provides three interfaces (as well as implementations in C++) for assisting with task execution and concurrency.</p>
<ul>
        <li>The <tt>Queue</tt> interface (<tt>WorkQueue</tt> implementation in C++) provides a method to execute tasks sequentially.</li>
        <li>The <tt>SuspendableQueue</tt> interface (<tt>SuspendableWorkQueue</tt> implementation in C++) provides similar functionality; it differs primarily in that it does not allow concurrent execution of tasks from the queue. In addition, a task may finish in a suspended state, indicating that it cannot execute any further at the moment but that it will complete later.</li>
        <li>The <tt>Pool</tt> interface (<tt>ThreadPool</tt> implementation in C++). The <tt>ThreadPool</tt> uses a <tt>WorkQueue</tt> internally to organize tasks. The <tt>ThreadPool</tt> provides methods to control the number of worker threads created to grab tasks from the queue and execute them. Since a <tt>WorkQueue</tt> is the structure used internally, the worker threads grab tasks in the order that they were entered into the queue, but if the <tt>ThreadPool</tt> has multiple worker threads, then it is likely that multiple tasks will be executed concurrently.<br/>
For more information on these interfaces, see the <a href="/wiki/display/TOP/Work+Queues+and+Thread+Pools" title="Work Queues and Thread Pools">Work Queues and Thread Pools</a> page.</li>
</ul>
<p>What follows are some examples of how they can be used in concert to mold execution to follow concurrency models necessary for specific components.</p>
<h3><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-ThreadPoolUsage"></a>Thread Pool Usage</h3>
<p>A common approach for components will be to farm tasks out to multiple threads. While setting the size of an Ice thread pool is one approach to the matter, they are not as flexible as the implementation provided by the <tt>ThreadPool</tt>. The big reason is that the <tt>ThreadPool</tt> is able to inform its owner about the current load and can grow or shrink based on that load. For many components, this is ideal since with communications platforms the work load can be inconsistent.</p>
<p>Below is an example of thread pool usage:</p>
<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/14352403/SimpleThreadPoolUsage.png?version=2&modificationDate=1307569694973" style="border: 0px solid black" /></span></p>
<p>This shows how operation <tt>foo()</tt> is executed when using a <tt>ThreadPool</tt>. <tt>foo()</tt> is encapsulated as a queuable operation <tt>FooOperation</tt>. This operation is queued on to the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. The <tt>ThreadPool</tt> is informed that work has been added and then informs the <tt>ThreadPoolListener</tt>. The <tt>ThreadPoolListener</tt> is implemented within the component that created the <tt>ThreadPool</tt>. In this particular case, the listener decides that a new <tt>WorkerThread</tt> should be created, so it calls <tt>setSize</tt> on the <tt>ThreadPool</tt> so that a new thread will be created. A <tt>WorkerThread</tt> is created, and it executes the work.</p>
<p>This model works well for cases where all operations are completely independent. A component that could potentially use this model would be the Service Locator. For other components, this model is insufficient. Other components will wish to distribute tasks to worker threads, but they will require that like operations (those affecting common objects, working on shared data) be grouped in such a way that they are run sequentially.</p>
<h3><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-ThreadPoolUsageintheSIPSessionGateway"></a>Thread Pool Usage in the SIP Session Gateway</h3>
<p>The SIP Session Gateway presents an interesting challenge. It can receive messages from multiple sources: from the PJSIP stack (as a result of SIP requests or responses arriving from the network) or from the Ice runtime (as a result of operations being invoked on its servant objects). The messages may arrive at any time and may require access to common objects. In addition, it is important for the component to be able to respond to these messages as quickly as possible; due to the potentially high volume of messages, it's not practical for a thread in the SIP Session Gateway to <b>block</b> waiting for another operation to complete.</p>
<p>In the SIP Session Gateway, the <tt>SipSession</tt> is involved in the vast majority of operations. Since operations originate from multiple sources, and operations on a <tt>SipSession</tt> should execute in the order in which they arrive, it is an obvious choice to have a <tt>WorkQueue</tt> on each SIP session. We actually use a <tt>SuspendableWorkQueue</tt> so that if an operation on the <tt>SipSession</tt> needs to enter a <em>suspended</em> state (waiting for another operation elsewhere in the system to complete), it can do so, without blocking the worker thread that is executing the operation.</p>
<p><tt>ThreadPool</tt> and <tt>SuspendableWorkQueue</tt> usage can be combined. Each <tt>SipSession</tt> contains a member variable called <tt>SessionWork</tt>. <tt>SessionWork</tt> contains a <tt>SuspendableWorkQueue</tt> which is used to hold the pending operations on that session and ensure that they will be executed serially. In addition, the <tt>SessionWork</tt> is itself a queueable item, which means that the <tt>SessionWork</tt> can be placed into the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. Using this method, whenever a worker thread pulls a task from the thread pool, it actually is pulling an entire <tt>SuspendableWorkQueue</tt> of work. This allows for the worker thread to execute multiple pending tasks on that <tt>SipSession</tt>, and it prevents other worker threads from attempting to run tasks related to that <tt>SipSession</tt>.</p>
<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/14352403/ThreadPoolUsage.png?version=1&modificationDate=1307549003717" style="border: 0px solid black" /></span></p>
<p>In this example, an Asterisk SCF component calls operation <tt>foo()</tt> provided by the <tt>SipSession</tt>. A <tt>Work</tt>-derived object, <tt>FooOperation</tt> is created and enqueued on the <tt>SessionWork</tt>'s internal <tt>SuspendableWorkQueue</tt>. The <tt>SessionWork</tt> acts as a listener for the <tt>SuspendableWorkQueue</tt> and reacts to the work being enqueued by enqueuing itself onto the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. The <tt>WorkQueue</tt> notifies the <tt>ThreadPool</tt>, which then notifies a <tt>ThreadPoolListener</tt> of the work added. In this particular scenario, the <tt>ThreadPoolListener</tt> signals the <tt>ThreadPool</tt> to create a <tt>WorkerThread</tt> to handle the work. The <tt>WorkerThread</tt> then initiates a series of <tt>executeWork</tt> and <tt>execute</tt> calls in order to run the <tt>FooOperation</tt>.</p>
<h3><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-Summary"></a>Summary</h3>
<p>Using the Slice-defined <tt>Pool</tt>, <tt>Queue</tt> and <tt>SuspendableQueue</tt> interfaces provides a straightforward way to manage complex workloads in an Asterisk SCF component, while minimizing contention for shared data (and the resulting performance problems it can cause).</p>
</div>
<div id="commentsSection" class="wiki-content pageSection">
<div style="float: right;" class="grey">
<a href="https://wiki.asterisk.org/wiki/users/removespacenotification.action?spaceKey=TOP">Stop watching space</a>
<span style="padding: 0px 5px;">|</span>
<a href="https://wiki.asterisk.org/wiki/users/editmyemailsettings.action">Change email notification preferences</a>
</div>
<a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components">View Online</a>
|
<a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=14352403&revisedVersion=10&originalVersion=9">View Changes</a>
|
<a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components?showComments=true&showCommentArea=true#addcomment">Add Comment</a>
</div>
</div>
</div>
</div>
</div>
</body>
</html>