<html>
<head>
    <base href="https://wiki.asterisk.org/wiki">
            <link rel="stylesheet" href="/wiki/s/en/2160/1/7/_/styles/combined.css?spaceKey=TOP&amp;forWysiwyg=true" type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">Work Queues and Thread Pools</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://wiki.asterisk.org/wiki/display/~kpfleming">Kevin P. Fleming</a>
    </h4>
        <div id="versionComment">
        <b>Comment:</b>
        remove unsupported 'language=slice' tags<br />
    </div>
        <br/>
                         <h4>Changes (6)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >A work queue is the simplest of the tools for task serialization. Here is its slice interface. <br> <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code: language=slice} <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">{code} <br></td></tr>
            <tr><td class="diff-unchanged" >    local interface QueueListener <br>    { <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >One potential issue with work queues is that a piece of work may need to call out asynchronously to another method. In such a case, the work is not finished, so it may be incorrect or dangerous to handle the next item on the queue. However, there&#39;s no reason for the consumer of the queue to be blocked waiting for the asynchronous operation to return. In such a case, a suspendable work queue may be used. <br> <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code: language=slice} <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">{code} <br></td></tr>
            <tr><td class="diff-unchanged" >    enum SuspendableWorkResult <br>    { <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >The examples shown so far only have a single consumer executing work in the queue. While it&#39;s certainly possible to have multiple consumers executing work on the queue, there are advantages to encapsulating the work into a thread pool object instead. <br> <br></td></tr>
            <tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code: language=slice} <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">{code} <br></td></tr>
            <tr><td class="diff-unchanged" >    local interface PoolListener <br>    { <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <p>In a distributed system like Asterisk SCF, related tasks may originate from a variety of sources. For instance, tasks pertaining to a <tt>Session</tt> may come from an Ice thread calling one of the <tt>Session</tt>'s member operations. A <tt>Session</tt> may also be acted upon due to the reception of a message over the protocol the session gateway speaks. These tasks could run in the thread that originates the work, but this can lead to potential pitfalls, such as resource contention and potential deadlocks. Work Queues and Thread Pools offer the ability to group like work together, allowing tasks to be run serially rather than concurrently.</p>

<p>Even though many programming languages and libraries offer work queues and thread pools (or similar mechanisms), in order to provide some consistency among Asterisk SCF components, and to support the need for asynchronous work in work queues, Asterisk SCF provides a set of common interfaces for these mechanisms. These interfaces are defined in Slice, like all Asterisk SCF APIs, even though they don't involve remote operations or messaging; instead, Slice is used to define the interfaces so that they will be identical (or nearly so) in each language supported by Asterisk SCF. This will mean that anyone reading the source code of an Asterisk SCF component, even one written in a language that they don't themselves use for writing components, will be able to recognize and understand these mechanisms, even though their implementation in that language may be very different from those in other languages.</p>

<h1><a name="WorkQueuesandThreadPools-WorkQueue"></a>Work Queue</h1>

<p>A work queue is the simplest of the tools for task serialization. Here is its slice interface.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: java; gutter: false">local interface QueueListener
    {
        void workAdded(bool wasEmpty);
        void workResumable();
        void emptied();
    };

    local interface Work
    {
        void execute();
    };

    local sequence&lt;Work&gt; WorkSeq;

    local interface Queue
    {
        void enqueueWork(Work item);
        void enqueueWorkSeq(WorkSeq items);
        void cancelWork(Work item);

        /* return value indicates whether queue contains more work
           that can be executed immediately
        */
        bool executeWork();
        /* this is a snapshot and should only be used as a hint */
        int workCount();

        void setListener(QueueListener listener);
    };</pre>
</div></div>

<p>A producer may enqueue <tt>Work</tt> onto a <tt>Queue</tt>. The <tt>Queue</tt> accepts requests from a consumer to execute the work that has been enqueued. The <tt>QueueListener</tt> is informed of changes in the <tt>Queue</tt>. Below is a simple sequence diagram illustrating a potential implementation of a <tt>Queue</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/WorkQueue.png?version=1&amp;modificationDate=1299535125331" style="border: 0px solid black" /></span></p>

<p>At the beginning of the sequence, <tt>Queue</tt> is empty. The producer thread creates two <tt>Work</tt> items and enqueues them. The <tt>QueueListener</tt>, when informed of the addition of work to the queue, pokes a consumer thread. The consumer then calls <tt>Queue::executeWork</tt> until a false return is received. At the end, the <tt>Queue</tt> is empty again and notifies the <tt>QueueListener</tt>.</p>

<p>Note that the practice of having the consumer set the <tt>QueueListener</tt> is not required; this is only a demonstration.</p>

<h1><a name="WorkQueuesandThreadPools-SuspendableWorkQueue"></a>Suspendable Work Queue</h1>

<p>One potential issue with work queues is that a piece of work may need to call out asynchronously to another method. In such a case, the work is not finished, so it may be incorrect or dangerous to handle the next item on the queue. However, there's no reason for the consumer of the queue to be blocked waiting for the asynchronous operation to return. In such a case, a suspendable work queue may be used.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: java; gutter: false">enum SuspendableWorkResult
    {
        Complete,
        Suspended
    };

    local interface SuspendableWorkListener
    {
        void workResumable();
    };

    local interface SuspendableWork
    {
        SuspendableWorkResult execute(SuspendableWorkListener listener);
    };

    local sequence&lt;SuspendableWork&gt; SuspendableWorkSeq;

    local interface SuspendableQueue
    {
        void enqueueWork(SuspendableWork item);
        void enqueueWorkSeq(SuspendableWorkSeq items);
        void cancelWork(SuspendableWork item);

        /* return value indicates whether queue contains more work
           that can be executed immediately
        */
        bool executeWork();
        /* this is a snapshot and should only be used as a hint */
        int workCount();

        void setListener(QueueListener listener);
    };</pre>
</div></div>

<p>With this type of work queue, work can be suspended until it can be completed. When a <tt>SuspendableWork</tt> is executed, it can indicate via its return value if the work completed or if it is just suspended. Below is a sequence diagram that shows the potential operation of a <tt>SuspendableQueue</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/SuspendableWorkQueue2.png?version=2&amp;modificationDate=1299603351795" style="border: 0px solid black" /></span></p>

<p>Operation begins similarly to the <tt>WorkQueue</tt>. What's different is the way the second <tt>SuspendableWork</tt> is handled. Its execution initially calls out to a remote object asynchronously. The <tt>SuspendableWork</tt> returns that its work is suspended. Because the work has been suspended, the <tt>SuspendableQueue</tt> returns false to the consumer, indicating that there's no work to immediately be executed. Once the remote object has finished doing what it needs to do, it notifies the <tt>SuspendableWork</tt> that it has finished. The <tt>SuspendableWork</tt> calls the <tt>SuspendableWorkListener</tt>, who informs the <tt>QueueListener</tt>, who then pokes the consumer. At this point, the consumer calls <tt>executeWork</tt> again. This time the <tt>SuspendableWork</tt> can be completed. The <tt>SuspendableQueue</tt> returns false to the consumer again, but this time because the queue is empty.</p>

<p>One aspect that is left open for implementors is the specifics regarding how the consumer gets notified when suspended work becomes resumable. In this example, the <tt>SuspendableWorkListener</tt> informs the <tt>QueueListener</tt>. It would also be acceptable for the <tt>SuspendableWorkListener</tt> to notify the <tt>SuspendableQueue</tt> if desired.</p>

<p>There is a potential race condition that has to be handled. Specifically, the suspended work may become resumable and listeners may be notified work may be resumed before the initial false return for the <tt>executeWork</tt> call can be communicated to the consumer.  To illustrate, here is a diagram of the race condition.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/Race.png?version=3&amp;modificationDate=1299603351982" style="border: 0px solid black" /></span></p>

<p>If the <tt>SuspendableQueue</tt> is the first to be notified that work may be resumed, then he can be the one to handle this race and change his initial false return to a true return. If the <tt>SuspendableQueue</tt> is bypassed in favor of notifying the <tt>QueueListener</tt>, then the consumer will be responsible for handling the race condition. Making the <tt>SuspendableQueue</tt> handle the race condition is preferable since there may be many consumers that all make use of the same queue implementation. It makes more sense to handle the race condition centrally instead of having to duplicate the race-handling logic in all consumers. A recommendation is to have the <tt>SuspendableQueue</tt> implement the <tt>SuspendableWorkListener</tt> interface so that it may be able to handle the potential race condition itself.</p>

<h1><a name="WorkQueuesandThreadPools-ThreadPool"></a>Thread Pool</h1>

<p>The examples shown so far only have a single consumer executing work in the queue. While it's certainly possible to have multiple consumers executing work on the queue, there are advantages to encapsulating the work into a thread pool object instead.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: java; gutter: false">local interface PoolListener
    {
        void stateChanged(Pool pool, int activeThreads, int idleThreads, int zombieThreads);
        void queueWorkAdded(Pool pool, int newWorkCount, bool wasEmpty);
        void queueEmptied(Pool pool);
    };

    local interface Pool
    {
        void setSize(int size);
        AsteriskSCF::System::WorkQueue::V1::Queue getQueue();
    };

    local interface PoolFactory
    {
        Pool createPool(PoolListener listener, AsteriskSCF::System::Workqueue::V1::Queue queue);
    };</pre>
</div></div>

<p>As can be assumed here, a <tt>Pool</tt> has, at its heart, a <tt>WorkQueue</tt>. The <tt>Pool</tt> can relay similar listener information as a work queue, such as when work has been added and when its queue has become empty. The <tt>Pool</tt> also reports changes in state as well, such as when a thread finishes its work and becomes idle, or if threads are destroyed. The <tt>PoolListener</tt> is always given a <tt>Pool</tt> as a parameter in case the <tt>PoolListener</tt> wishes to change a property of the <tt>Pool</tt>. Currently, the only property that can be modified is the number of threads currently in use by the <tt>Pool</tt>.</p>

<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/ThreadPool.png?version=1&amp;modificationDate=1299535125458" style="border: 0px solid black" /></span></p>

<p>Things begin pretty straightforward, with a <tt>Queue</tt> and <tt>Pool</tt> being created. When the <tt>Queue</tt> is set as the work queue for the <tt>Pool</tt>, this implementation of the <tt>Pool</tt> sets itself as the listener of the queue. The producer then queues two tasks. The <tt>PoolListener</tt>, upon being informed there are two tasks, decides to create two consumer threads. each consumer thread, upon being created, executes work in the <tt>Pool</tt>'s <tt>Queue</tt>. As the threads change from being idle to active, state changes are reported to the <tt>PoolListener</tt>. At the end, once all work is complete, the fact that the queue is empty is reported to the <tt>PoolListener</tt>.</p>

<h1><a name="WorkQueuesandThreadPools-Concurrencyinthreadpools"></a>Concurrency in thread pools</h1>

<p>Notice in the thread pool example that the consumer threads were running the queued tasks concurrently, seemingly negating the benefit of queuing tasks at all. Care must be taken with thread pools to ensure that if tasks are not intended to run concurrently, that they are not. One method of handling this is to make the work items queued to a thread pool's work queue actually themselves be work queues. This way, each thread in the pool is given a sub-queue of work to perform whenever it attempts to execute work. This sub-queue that the thread is given has work items that must be executed serially.</p>
    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;" class="grey">
                        <a href="https://wiki.asterisk.org/wiki/users/removespacenotification.action?spaceKey=TOP">Stop watching space</a>
            <span style="padding: 0px 5px;">|</span>
                <a href="https://wiki.asterisk.org/wiki/users/editmyemailsettings.action">Change email notification preferences</a>
</div>
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">View Online</a>
        |
        <a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=12550328&revisedVersion=14&originalVersion=13">View Changes</a>
                |
        <a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools?showComments=true&amp;showCommentArea=true#addcomment">Add Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>