<html>
<head>
    <base href="https://wiki.asterisk.org/wiki">
            <link rel="stylesheet" href="/wiki/s/2041/1/7/_/styles/combined.css?spaceKey=TOP&amp;forWysiwyg=true" type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">Work Queues and Thread Pools</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://wiki.asterisk.org/wiki/display/~mmichelson">Mark Michelson</a>
    </h4>
        <div id="versionComment">
        <b>Comment:</b>
        Fixed wordings, changed interfaces to be up-to-date. Next comes editing the sequence diagram of the pool...<br />
    </div>
        <br/>
                         <h4>Changes (23)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >{warning} <br> <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h1. What&#39;s up, doc? <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">In a distributed system like Asterisk SCF, related tasks may originate from a variety of sources. For instance, tasks pertaining to a {{Session}} may come from an Ice thread calling one of the {{Session}}&#39;s member operations. A {{Session}} may also be acted upon due to the reception of a message over the protocol the session gateway speaks. These tasks could run in the thread that originates the work, but this can lead to potential pitfalls, such as resource contention and potential deadlocks. Work Queues and Thread Pools offer the ability to group like work together, allowing tasks to be run serially rather than concurrently. <br></td></tr>
            <tr><td class="diff-unchanged" > <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">In a system like Asterisk SCF, related tasks may originate from a variety of sources. For instance, tasks pertaining to a {{Session}} may come from an Ice thread calling member operation. A {{Session}} may also be acted upon due to the reception of a message over the protocol the session gateway speaks. These tasks could run in the thread that originates the work, but this leads to potential pitfalls, such as resource contention and potential deadlocks if programmed sloppily. Work Queues and Thread Pools offer the ability to group like work together, ensuring that all tasks are run subsequently by a single thread. <br> <br></td></tr>
            <tr><td class="diff-unchanged" >h1. Work Queue <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >A work queue is the simplest of the tools for task serialization. Here is its slice <span class="diff-changed-words">interface<span class="diff-added-chars"style="background-color: #dfd;">.</span></span> <br></td></tr>
            <tr><td class="diff-unchanged" > <br>{code: language=slice} <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >    }; <br> <br></td></tr>
            <tr><td class="diff-changed-lines" ><span class="diff-added-words"style="background-color: #dfd;">local</span> sequence&lt;Work&gt; WorkSeq; <br></td></tr>
            <tr><td class="diff-unchanged" > <br>    local interface Queue <br>    { <br>        void enqueueWork(Work item); <br></td></tr>
            <tr><td class="diff-changed-lines" >void <span class="diff-changed-words">enqueueWork<span class="diff-added-chars"style="background-color: #dfd;">Seq</span>(WorkSeq</span> items); <br></td></tr>
            <tr><td class="diff-unchanged" >        void cancelWork(Work item); <br> <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >{code} <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >A {{Queue}} <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">accepts</span> <span class="diff-added-words"style="background-color: #dfd;">may enqueue</span> {{Work}} items <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">to be enqueued</span> and accepts requests to execute the work that has been enqueued. The {{QueueListener}} is informed of changes in the {{Queue}}. Below is a simple sequence diagram illustrating a potential implementation of a {{Queue}}. <br></td></tr>
            <tr><td class="diff-unchanged" > <br>!WorkQueue.png! <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >h1. Suspendable Work Queue <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >One potential issue with work queues is that a piece of work may need to call out asynchronously to another method. In such a case, the work is not finished, so it may be incorrect or dangerous to handle the next item on the queue. However, there&#39;s no reason for the consumer of the queue to <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">necessarily</span> be blocked waiting for the asynchronous operation to return. In such a case, a suspendable work queue <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">is handy.</span> <span class="diff-added-words"style="background-color: #dfd;">may be used.</span> <br></td></tr>
            <tr><td class="diff-unchanged" > <br>{code: language=slice} <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >    }; <br> <br></td></tr>
            <tr><td class="diff-changed-lines" ><span class="diff-added-words"style="background-color: #dfd;">local</span> sequence&lt;SuspendableWork&gt; SuspendableWorkSeq; <br></td></tr>
            <tr><td class="diff-unchanged" > <br>    local interface SuspendableQueue <br>    { <br>        void enqueueWork(SuspendableWork item); <br></td></tr>
            <tr><td class="diff-changed-lines" >void <span class="diff-changed-words">enqueueWork<span class="diff-added-chars"style="background-color: #dfd;">Seq</span>(SuspendableWorkSeq</span> items); <br></td></tr>
            <tr><td class="diff-unchanged" >        void cancelWork(SuspendableWork item); <br> <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >!SuspendableWorkQueue.png! <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >Operation begins similarly to the <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">regular work queue.</span> <span class="diff-added-words"style="background-color: #dfd;">{{WorkQueue}}.</span> What&#39;s different is the way the second {{SuspendableWork}} is handled. Its execution initially calls out to a remote object <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">asynchronously and so the</span> <span class="diff-added-words"style="background-color: #dfd;">asynchronously. The</span> {{SuspendableWork}} returns that its work is suspended. Because the work has been suspended, the {{SuspendableQueue}} returns false to the consumer, indicating that there&#39;s no work to immediately be executed. Once the remote object has finished doing what it needs to do, it returns and through some <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">undefined</span> operation lets the consumer know there is once again work to be done on the queue. <span class="diff-added-words"style="background-color: #dfd;">The method by which the remote object notifies the consumer is outside the scope of this document.</span> At this point, the consumer calls {{executeWork}} again. This time the {{SuspendableWork}} can be completed. The {{SuspendableQueue}} returns false to the consumer again, but this time <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">it&#39;s</span> because the queue is empty. <br></td></tr>
            <tr><td class="diff-unchanged" > <br>h1. Thread Pool <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" > <br>{code: language=slice} <br></td></tr>
            <tr><td class="diff-changed-lines" >local interface <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">PoolManager</span> <span class="diff-added-words"style="background-color: #dfd;">PoolListener</span> <br></td></tr>
            <tr><td class="diff-unchanged" >    { <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">        void setPoolSize(int size); <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">        void stateChanged(Pool pool, int activeThreads, int idleThreads, int zombieThreads); <br>        void queueWorkAdded(Pool pool, int newWorkCount, bool wasEmpty); <br>        void queueEmptied(Pool pool); <br></td></tr>
            <tr><td class="diff-unchanged" >    }; <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >local interface <span class="diff-changed-words">Pool<span class="diff-deleted-chars"style="color:#999;background-color:#fdd;text-decoration:line-through;">Listener</span></span> <br></td></tr>
            <tr><td class="diff-unchanged" >    { <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">        void stateChanged(PoolManager manager, int activeThreads, int idleThreads, int zombieThreads); <br>        void queueWorkAdded(PoolManager manager, int newWorkCount, bool wasEmpty); <br>        void queueEmptied(PoolManager manager); <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">        void setSize(int size); <br>        AsteriskSCF::System::WorkQueue::V1::Queue getQueue(); <br></td></tr>
            <tr><td class="diff-unchanged" >    }; <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >local interface <span class="diff-changed-words">Pool<span class="diff-added-chars"style="background-color: #dfd;">Factory</span></span> <br></td></tr>
            <tr><td class="diff-unchanged" >    { <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">        void setWorkQueue(AsteriskSCF::System::WorkQueue::V1::Queue queue); <br>        void setListener(PoolListener listener); <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">        Pool createPool(PoolListener listener, AsteriskSCF::System::Workqueue::V1::Queue queue); <br></td></tr>
            <tr><td class="diff-unchanged" >    }; <br> <br>{code} <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >As can be assumed here, a {{Pool}} has, at its heart, a <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">work queue.</span> <span class="diff-added-words"style="background-color: #dfd;">{{WorkQueue}}.</span> The {{Pool}} can relay similar listener information as a work queue, such as when work has been added and when its queue has become empty. The {{Pool}} also reports changes in state as well, such as when a thread finishes its work and becomes idle, or if threads are destroyed. The {{PoolListener}} is always given a <span class="diff-changed-words">{{Pool<span class="diff-deleted-chars"style="color:#999;background-color:#fdd;text-decoration:line-through;">Manager</span>}}</span> as <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">an argument</span> <span class="diff-added-words"style="background-color: #dfd;">a parameter</span> in case the {{PoolListener}} wishes to change a property of the {{Pool}}. Currently, the only property that can be modified is the number of threads currently in use by the {{Pool}}. <br></td></tr>
            <tr><td class="diff-unchanged" > <br>!ThreadPool.png! <br> <br></td></tr>
            <tr><td class="diff-changed-lines" >Things begin pretty straightforward, with a {{Queue}} and {{Pool}} being created. <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">The {{Pool}} creates a {{PoolManager}} so that it may be passed to the {{PoolListener}}.</span> When the {{Queue}} is set as the work queue for the {{Pool}}, this implementation of the {{Pool}} sets itself as the listener of the queue. The producer then queues two tasks. The {{PoolListener}}, upon being informed there are two tasks, decides to create two consumer threads. each consumer thread, upon being created, executes work in the {{Pool}}&#39;s {{Queue}}. As the threads change from being idle to active, state changes are reported to the {{PoolListener}}. At the end, once all work is complete, the fact that the queue is empty is reported to the {{PoolListener}}. <br></td></tr>
            <tr><td class="diff-unchanged" > <br>h1. Concurrency in thread pools <br> <br></td></tr>
            <tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">One thing you&#39;ll notice</span> <span class="diff-added-words"style="background-color: #dfd;">Notice</span> in the thread pool example <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">is</span> that the consumer threads were running the queued tasks concurrently, <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">thus</span> seemingly negating the benefit of queuing tasks <span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">in the first place.</span> <span class="diff-added-words"style="background-color: #dfd;">at all.</span> Care must be taken with thread pools to ensure that if tasks are not intended to run concurrently, that they are not. One method of handling this is to make the work items queued to a thread pool&#39;s work queue actually themselves be work queues. This way, each thread in the pool is given a <span class="diff-changed-words">sub<span class="diff-added-chars"style="background-color: #dfd;">-</span>queue</span> of work to perform whenever it attempts to execute work. This <span class="diff-changed-words">sub<span class="diff-added-chars"style="background-color: #dfd;">-</span>queue</span> that the thread is given has work items that must be executed serially. <br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <div class='panelMacro'><table class='warningMacro'><colgroup><col width='24'><col></colgroup><tr><td valign='top'><img src="/wiki/images/icons/emoticons/forbidden.gif" width="16" height="16" align="absmiddle" alt="" border="0"></td><td>This page is not complete yet.</td></tr></table></div>

<p>In a distributed system like Asterisk SCF, related tasks may originate from a variety of sources. For instance, tasks pertaining to a <tt>Session</tt> may come from an Ice thread calling one of the <tt>Session</tt>'s member operations. A <tt>Session</tt> may also be acted upon due to the reception of a message over the protocol the session gateway speaks. These tasks could run in the thread that originates the work, but this can lead to potential pitfalls, such as resource contention and potential deadlocks. Work Queues and Thread Pools offer the ability to group like work together, allowing tasks to be run serially rather than concurrently.</p>

<h1><a name="WorkQueuesandThreadPools-WorkQueue"></a>Work Queue</h1>

<p>A work queue is the simplest of the tools for task serialization. Here is its slice interface.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
    local interface QueueListener
    {
        void workAdded(bool wasEmpty);
        void emptied();
    };

    local interface Work
    {
        void execute();
    };

    local sequence&lt;Work&gt; WorkSeq;

    local interface Queue
    {
        void enqueueWork(Work item);
        void enqueueWorkSeq(WorkSeq items);
        void cancelWork(Work item);

        /* return value indicates whether queue contains more work
           that can be executed immediately
        */
        bool executeWork();
        /* this is a snapshot and should only be used as a hint */
        int workCount();
        
        void setListener(QueueListener listener);
    };
]]></script>
</div></div>

<p>A <tt>Queue</tt> may enqueue <tt>Work</tt> items and accepts requests to execute the work that has been enqueued. The <tt>QueueListener</tt> is informed of changes in the <tt>Queue</tt>. Below is a simple sequence diagram illustrating a potential implementation of a <tt>Queue</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/WorkQueue.png?version=1&amp;modificationDate=1298669354539" style="border: 0px solid black" /></span></p>

<p>At the beginning of the sequence, <tt>Queue</tt> is empty. The producer thread creates two <tt>Work</tt> items and enqueues them. The <tt>QueueListener</tt>, when informed of the addition of work to the queue, pokes a consumer thread. The consumer then calls <tt>Queue::executeWork</tt> until a false return is received. At the end, the <tt>Queue</tt> is empty again and notifies the <tt>QueueListener</tt>.</p>

<p>Note that the practice of having the consumer set the <tt>QueueListener</tt> is not required; this is only a demonstration.</p>

<h1><a name="WorkQueuesandThreadPools-SuspendableWorkQueue"></a>Suspendable Work Queue</h1>

<p>One potential issue with work queues is that a piece of work may need to call out asynchronously to another method. In such a case, the work is not finished, so it may be incorrect or dangerous to handle the next item on the queue. However, there's no reason for the consumer of the queue to be blocked waiting for the asynchronous operation to return. In such a case, a suspendable work queue may be used.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
    enum SuspendableWorkResult
    {
        Complete,
        Suspended
    };

    local interface SuspendableWork
    {
        SuspendableWorkResult execute();
    };

    local sequence&lt;SuspendableWork&gt; SuspendableWorkSeq;

    local interface SuspendableQueue
    {
        void enqueueWork(SuspendableWork item);
        void enqueueWorkSeq(SuspendableWorkSeq items);
        void cancelWork(SuspendableWork item);

        /* return value indicates whether queue contains more work
           that can be executed immediately
        */
        bool executeWork();
        /* this is a snapshot and should only be used as a hint */
        int workCount();

        void setListener(QueueListener listener);
    };
]]></script>
</div></div>

<p>With this type of work queue, work can be suspended until it can be completed. When a <tt>SuspendableWork</tt> is executed, it can indicate via its return value if the work completed or if it is just suspended. Below is a sequence diagram that shows the potential operation of a <tt>SuspendableQueue</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/SuspendableWorkQueue.png?version=1&amp;modificationDate=1298669354410" style="border: 0px solid black" /></span></p>

<p>Operation begins similarly to the <tt>WorkQueue</tt>. What's different is the way the second <tt>SuspendableWork</tt> is handled. Its execution initially calls out to a remote object asynchronously. The <tt>SuspendableWork</tt> returns that its work is suspended. Because the work has been suspended, the <tt>SuspendableQueue</tt> returns false to the consumer, indicating that there's no work to immediately be executed. Once the remote object has finished doing what it needs to do, it returns and through some operation lets the consumer know there is once again work to be done on the queue. The method by which the remote object notifies the consumer is outside the scope of this document. At this point, the consumer calls <tt>executeWork</tt> again. This time the <tt>SuspendableWork</tt> can be completed. The <tt>SuspendableQueue</tt> returns false to the consumer again, but this time because the queue is empty.</p>

<h1><a name="WorkQueuesandThreadPools-ThreadPool"></a>Thread Pool</h1>

<p>The examples shown so far only have a single consumer executing work in the queue. While it's certainly possible to have multiple consumers executing work on the queue, there are advantages to encapsulating the work into a thread pool object instead.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
    local interface PoolListener
    {
        void stateChanged(Pool pool, int activeThreads, int idleThreads, int zombieThreads);
        void queueWorkAdded(Pool pool, int newWorkCount, bool wasEmpty);
        void queueEmptied(Pool pool);
    };

    local interface Pool
    {
        void setSize(int size);
        AsteriskSCF::System::WorkQueue::V1::Queue getQueue();
    };

    local interface PoolFactory
    {
        Pool createPool(PoolListener listener, AsteriskSCF::System::Workqueue::V1::Queue queue);
    };

]]></script>
</div></div>

<p>As can be assumed here, a <tt>Pool</tt> has, at its heart, a <tt>WorkQueue</tt>. The <tt>Pool</tt> can relay similar listener information as a work queue, such as when work has been added and when its queue has become empty. The <tt>Pool</tt> also reports changes in state as well, such as when a thread finishes its work and becomes idle, or if threads are destroyed. The <tt>PoolListener</tt> is always given a <tt>Pool</tt> as a parameter in case the <tt>PoolListener</tt> wishes to change a property of the <tt>Pool</tt>. Currently, the only property that can be modified is the number of threads currently in use by the <tt>Pool</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/ThreadPool.png?version=1&amp;modificationDate=1298669354497" style="border: 0px solid black" /></span></p>

<p>Things begin pretty straightforward, with a <tt>Queue</tt> and <tt>Pool</tt> being created. When the <tt>Queue</tt> is set as the work queue for the <tt>Pool</tt>, this implementation of the <tt>Pool</tt> sets itself as the listener of the queue. The producer then queues two tasks. The <tt>PoolListener</tt>, upon being informed there are two tasks, decides to create two consumer threads. each consumer thread, upon being created, executes work in the <tt>Pool</tt>'s <tt>Queue</tt>. As the threads change from being idle to active, state changes are reported to the <tt>PoolListener</tt>. At the end, once all work is complete, the fact that the queue is empty is reported to the <tt>PoolListener</tt>.</p>

<h1><a name="WorkQueuesandThreadPools-Concurrencyinthreadpools"></a>Concurrency in thread pools</h1>

<p>Notice in the thread pool example that the consumer threads were running the queued tasks concurrently, seemingly negating the benefit of queuing tasks at all. Care must be taken with thread pools to ensure that if tasks are not intended to run concurrently, that they are not. One method of handling this is to make the work items queued to a thread pool's work queue actually themselves be work queues. This way, each thread in the pool is given a sub-queue of work to perform whenever it attempts to execute work. This sub-queue that the thread is given has work items that must be executed serially.</p>
    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://wiki.asterisk.org/wiki/users/viewnotifications.action" class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">View Online</a>
        |
        <a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=12550328&revisedVersion=6&originalVersion=5">View Changes</a>
                |
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools?showComments=true&amp;showCommentArea=true#addcomment">Add Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>