<html>
<head>
    <base href="https://wiki.asterisk.org/wiki">
            <link rel="stylesheet" href="/wiki/s/en/2160/1/7/_/styles/combined.css?spaceKey=TOP&amp;forWysiwyg=true" type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components">Uses of ThreadPools and WorkQueues in Asterisk SCF Components</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://wiki.asterisk.org/wiki/display/~mmichelson">Mark Michelson</a>
    </h4>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >h1. Introduction <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >Asterisk SCF provides three interfaces (as well as implementations in <span class="diff-changed-words">C<span class="diff-added-chars"style="background-color: #dfd;">\</span>+<span class="diff-added-chars"style="background-color: #dfd;">\</span>+)</span> for assisting with task execution and concurrency. The {{Queue}} interface ( {{WorkQueue}} implementation in C++) provides a method to execute tasks sequentially. The {{SuspendableQueue}} interface ( {{SuspendableWorkQueue}} implementation in <span class="diff-changed-words">C<span class="diff-added-chars"style="background-color: #dfd;">\</span>+<span class="diff-added-chars"style="background-color: #dfd;">\</span>+)</span> provides similar functionality. The differences are that the SuspendableQueue is more restrictive with regards to task execution. One task must fully complete before the next task may be executed. In addition, a task may finish in a suspended state, indicating that it cannot execute any further at the moment but that it will complete later. Finally, there&#39;s the {{Pool}} interface ( {{ThreadPool}} implementation in <span class="diff-changed-words">C<span class="diff-added-chars"style="background-color: #dfd;">\</span>+<span class="diff-added-chars"style="background-color: #dfd;">\</span>+).</span> The {{ThreadPool}} uses a {{WorkQueue}} internally to organize tasks. The {{ThreadPool}} provides methods to set a number of worker threads to grab tasks from the queue and execute them. Since a {{WorkQueue}} is the structure used internally, the worker threads grab tasks in the order that they were entered into the queue, but it is possible for multiple threads to execute tasks concurrently. <br></td></tr>
            <tr><td class="diff-unchanged" > <br>What follows are some examples of how they can be used in concert to mold execution to follow concurrency models necessary for specific components. <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <div class='panelMacro'><table class='warningMacro'><colgroup><col width='24'><col></colgroup><tr><td valign='top'><img src="/wiki/images/icons/emoticons/forbidden.gif" width="16" height="16" align="absmiddle" alt="" border="0"></td><td>This is a work in progress</td></tr></table></div>

<h1><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-Introduction"></a>Introduction</h1>

<p>Asterisk SCF provides three interfaces (as well as implementations in C&#43;&#43;) for assisting with task execution and concurrency. The <tt>Queue</tt> interface ( <tt>WorkQueue</tt> implementation in C++) provides a method to execute tasks sequentially. The <tt>SuspendableQueue</tt> interface ( <tt>SuspendableWorkQueue</tt> implementation in C&#43;&#43;) provides similar functionality. The differences are that the SuspendableQueue is more restrictive with regards to task execution. One task must fully complete before the next task may be executed. In addition, a task may finish in a suspended state, indicating that it cannot execute any further at the moment but that it will complete later. Finally, there's the <tt>Pool</tt> interface ( <tt>ThreadPool</tt> implementation in C&#43;&#43;). The <tt>ThreadPool</tt> uses a <tt>WorkQueue</tt> internally to organize tasks. The <tt>ThreadPool</tt> provides methods to set a number of worker threads to grab tasks from the queue and execute them. Since a <tt>WorkQueue</tt> is the structure used internally, the worker threads grab tasks in the order that they were entered into the queue, but it is possible for multiple threads to execute tasks concurrently.</p>

<p>What follows are some examples of how they can be used in concert to mold execution to follow concurrency models necessary for specific components.</p>

<h1><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-ThreadPoolUsage"></a>ThreadPool Usage</h1>

<p>A common approach for components will be to farm tasks out to multiple threads. While setting the size of an Ice thread pool is one approach to the matter, they are not as flexible as the implementation provided by the <tt>ThreadPool</tt>. The big reason is that the <tt>ThreadPool</tt> is able to inform a component about the current load and can grow or shrink based on the current load. For many components, this is ideal since with communications platforms, the load can be inconsistent and can spike at times.</p>

<p>Below is an example of thread pool usage:</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/14352403/SimpleThreadPoolUsage.png?version=2&amp;modificationDate=1307569694973" style="border: 0px solid black" /></span></p>

<p>This shows how operation <tt>foo()</tt> is executed when using a <tt>ThreadPool</tt>. <tt>foo()</tt> is encapsulated as an queuable operation <tt>FooOperation</tt>. This operation is queued on to the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. The <tt>ThreadPool</tt> is informed that work has been added and then informs the <tt>ThreadPoolListener</tt>. The <tt>ThreadPoolListener</tt> is implemented within our component. In this particular case, the listener decides that a new <tt>WorkerThread</tt> should be created, so it calls <tt>setSize</tt> on the <tt>ThreadPool</tt> so that a new thread is created. A <tt>WorkerThread</tt> is created, and it executes the work.</p>

<p>This model works well for cases where all operations are completely independent. A potential component that could use this would be the service locator. For other components, this model is insufficient. Other components will wish to distribute tasks to worker threads, but they will require that like operations be grouped in such a way that they are run sequentially. Following is a description of how this is implemented in the SIP component.</p>

<h1><a name="UsesofThreadPoolsandWorkQueuesinAsteriskSCFComponents-SIP%27sThreadPoolUsage"></a>SIP's Thread Pool Usage</h1>

<p>The SIP component presents an interesting challenge. The SIP component can receive messages from multiple sources. Asterisk SCF can receive messages either from PJSIP via registered callbacks or via Ice. The messages may arrive at any time and may require access to common objects. The SIP component is not as tolerant of delays due to RPCs as other components may be due to the high rate of messages that may be received.</p>

<p>In the case of SIP, there is a specific object that is nearly always a target of operations, the <tt>SipSession</tt>. Since operations can arrive from multiple sources, and it is desirable to process the operations in the same order, it is an obvious choice to have a <tt>WorkQueue</tt> on each SIP session. We actually use a <tt>SuspendableWorkQueue</tt> so that if operation of tasks on a <tt>SipSession</tt> needs to be suspended, it can be.</p>

<p><tt>ThreadPool</tt> and <tt>SuspendableWorkQueue</tt> usage can be combined. Each <tt>SipSession</tt> will contain an object called <tt>SessionWork</tt>. <tt>SessionWork</tt> contains a <tt>SuspendableWorkQueue</tt> so that all operations on a particular session run sequentially. In addition, the <tt>SessionWork</tt> will itself be a a queueable item. The <tt>SessionWork</tt> can be placed into the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. Using this method, whenever a worker thread pulls a task from the thread pool, it actually is pulling an entire <tt>SuspendableWorkQueue</tt> of work. This allows for the worker thread to run multiple tasks, and it prevents other worker threads from attempting to run tasks related to a specific <tt>SipSession</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/14352403/ThreadPoolUsage.png?version=1&amp;modificationDate=1307549003717" style="border: 0px solid black" /></span></p>

<p>In this example, an Asterisk SCF component calls operation <tt>foo()</tt> provided by the <tt>SipSession</tt>. A <tt>Work</tt>-derived object, <tt>FooOperation</tt> is created and enqueued on the <tt>SessionWork</tt>'s internal <tt>SuspendableWorkQueue</tt>. The <tt>SessionWork</tt> acts as a listener for the <tt>SuspendableQueue</tt> and reacts to the work being enqueued by enqueuing itself onto the <tt>ThreadPool</tt>'s <tt>WorkQueue</tt>. The <tt>WorkQueue</tt> notifies the <tt>ThreadPool</tt>, which then notifies a <tt>ThreadPoolListener</tt> of the work added. In this particular scenario, the <tt>ThreadPoolListener</tt> signals the <tt>ThreadPool</tt> to create a <tt>WorkerThread</tt> to handle the work. The <tt>WorkerThread</tt> then initiates a series of <tt>executeWork</tt> and <tt>execute</tt> calls in order to run the <tt>FooOperation</tt>.</p>

<p>While this approach is used exclusively in the SIP component, it will likely fit well into other components as well. For instance, the bridging component may use this approach to group all operations relating to a specific bridge together while still allowing for concurrent execution of tasks. The media component may similarly use this approach to group operations relating to a media session together.</p>
    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;" class="grey">
                        <a href="https://wiki.asterisk.org/wiki/users/removespacenotification.action?spaceKey=TOP">Stop watching space</a>
            <span style="padding: 0px 5px;">|</span>
                <a href="https://wiki.asterisk.org/wiki/users/editmyemailsettings.action">Change email notification preferences</a>
</div>
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components">View Online</a>
        |
        <a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=14352403&revisedVersion=8&originalVersion=7">View Changes</a>
                |
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Uses+of+ThreadPools+and+WorkQueues+in+Asterisk+SCF+Components?showComments=true&amp;showCommentArea=true#addcomment">Add Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>