<html>
<head>
<base href="https://wiki.asterisk.org/wiki">
<link rel="stylesheet" href="/wiki/s/2041/1/7/_/styles/combined.css?spaceKey=TOP&forWysiwyg=true" type="text/css">
</head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
<h2><a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">Work Queues and Thread Pools</a></h2>
<h4>Page <b>edited</b> by <a href="https://wiki.asterisk.org/wiki/display/~mmichelson">Mark Michelson</a>
</h4>
<br/>
<h4>Changes (4)</h4>
<div id="page-diffs">
<table class="diff" cellpadding="0" cellspacing="0">
<tr><td class="diff-snipped" >...<br></td></tr>
<tr><td class="diff-unchanged" > { <br> void workAdded(bool wasEmpty); <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;"> void workResumable(); <br></td></tr>
<tr><td class="diff-unchanged" > void emptied(); <br> }; <br></td></tr>
<tr><td class="diff-snipped" >...<br></td></tr>
<tr><td class="diff-unchanged" >!SuspendableWorkQueue2.png! <br> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Operation begins similarly to the {{WorkQueue}}. What's different is the way the second {{SuspendableWork}} is handled. Its execution initially calls out to a remote object asynchronously. The {{SuspendableWork}} returns that its work is suspended. Because the work has been suspended, the {{SuspendableQueue}} returns false to the consumer, indicating that there's no work to immediately be executed. Once the remote object has finished doing what it needs to do, it returns and through some operation lets the consumer know there is once again work to be done on the queue. The method by which the remote object notifies the consumer is outside the scope of this document. At this point, the consumer calls {{executeWork}} again. This time the {{SuspendableWork}} can be completed. The {{SuspendableQueue}} returns false to the consumer again, but this time because the queue is empty. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Operation begins similarly to the {{WorkQueue}}. What's different is the way the second {{SuspendableWork}} is handled. Its execution initially calls out to a remote object asynchronously. The {{SuspendableWork}} returns that its work is suspended. Because the work has been suspended, the {{SuspendableQueue}} returns false to the consumer, indicating that there's no work to immediately be executed. Once the remote object has finished doing what it needs to do, it notifies the {{SuspendableWork}} that it has finished. The {{SuspendableWork}}, who was given a reference to the {{SuspendableQueue}} when it was constructed, then notifies the {{SuspendableQueue}}, who informs the {{QueueListener}}, who then pokes the consumer. At this point, the consumer calls {{executeWork}} again. This time the {{SuspendableWork}} can be completed. The {{SuspendableQueue}} returns false to the consumer again, but this time because the queue is empty. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">One aspect that is left open for implementors is the specifics regarding how the consumer gets notified when suspended work becomes resumable. In this example, the {{SuspendableWork}} notifies the {{SuspendableQueue}}. It would also be acceptable for the {{SuspendableWork}} to notify the {{QueueListener}} directly if desired. The problem is that there is a potential race condition that has to be handled. Specifically, the suspended work may become resumable and listeners may be notified work may be resumed before the initial false return for the {{executeWork}} call can be communicated to the consumer. To illustrate, here is a diagram of the race condition. <br> <br>!Race.png! <br> <br>If the {{SuspendableQueue}} is the first to be notified that work may be resumed, then he can be the one to handle this race and change his initial false return to a true return. If the {{SuspendableQueue}} is bypassed in favor of notifying the {{QueueListener}} directly, then the consumer will be responsible for handling the race condition. This example favors the approach of making the {{SuspendableQueue}} handle the race condition under the logic that since there may likely be many consumers that all use the same queue implementation, it makes more sense to handle the race condition centrally instead of having to duplicate the race-handling logic in all consumers. <br> <br></td></tr>
<tr><td class="diff-unchanged" >h1. Thread Pool <br> <br></td></tr>
<tr><td class="diff-snipped" >...<br></td></tr>
</table>
</div> <h4>Full Content</h4>
<div class="notificationGreySide">
<div class='panelMacro'><table class='warningMacro'><colgroup><col width='24'><col></colgroup><tr><td valign='top'><img src="/wiki/images/icons/emoticons/forbidden.gif" width="16" height="16" align="absmiddle" alt="" border="0"></td><td>This page is not complete yet.</td></tr></table></div>
<p>In a distributed system like Asterisk SCF, related tasks may originate from a variety of sources. For instance, tasks pertaining to a <tt>Session</tt> may come from an Ice thread calling one of the <tt>Session</tt>'s member operations. A <tt>Session</tt> may also be acted upon due to the reception of a message over the protocol the session gateway speaks. These tasks could run in the thread that originates the work, but this can lead to potential pitfalls, such as resource contention and potential deadlocks. Work Queues and Thread Pools offer the ability to group like work together, allowing tasks to be run serially rather than concurrently.</p>
<h1><a name="WorkQueuesandThreadPools-WorkQueue"></a>Work Queue</h1>
<p>A work queue is the simplest of the tools for task serialization. Here is its slice interface.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
local interface QueueListener
{
void workAdded(bool wasEmpty);
void workResumable();
void emptied();
};
local interface Work
{
void execute();
};
local sequence<Work> WorkSeq;
local interface Queue
{
void enqueueWork(Work item);
void enqueueWorkSeq(WorkSeq items);
void cancelWork(Work item);
/* return value indicates whether queue contains more work
that can be executed immediately
*/
bool executeWork();
/* this is a snapshot and should only be used as a hint */
int workCount();
void setListener(QueueListener listener);
};
]]></script>
</div></div>
<p>A <tt>Queue</tt> may enqueue <tt>Work</tt> items and accepts requests to execute the work that has been enqueued. The <tt>QueueListener</tt> is informed of changes in the <tt>Queue</tt>. Below is a simple sequence diagram illustrating a potential implementation of a <tt>Queue</tt>.</p>
<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/WorkQueue.png?version=1&modificationDate=1299535125331" style="border: 0px solid black" /></span></p>
<p>At the beginning of the sequence, <tt>Queue</tt> is empty. The producer thread creates two <tt>Work</tt> items and enqueues them. The <tt>QueueListener</tt>, when informed of the addition of work to the queue, pokes a consumer thread. The consumer then calls <tt>Queue::executeWork</tt> until a false return is received. At the end, the <tt>Queue</tt> is empty again and notifies the <tt>QueueListener</tt>.</p>
<p>Note that the practice of having the consumer set the <tt>QueueListener</tt> is not required; this is only a demonstration.</p>
<h1><a name="WorkQueuesandThreadPools-SuspendableWorkQueue"></a>Suspendable Work Queue</h1>
<p>One potential issue with work queues is that a piece of work may need to call out asynchronously to another method. In such a case, the work is not finished, so it may be incorrect or dangerous to handle the next item on the queue. However, there's no reason for the consumer of the queue to be blocked waiting for the asynchronous operation to return. In such a case, a suspendable work queue may be used.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
enum SuspendableWorkResult
{
Complete,
Suspended
};
local interface SuspendableWork
{
SuspendableWorkResult execute();
};
local sequence<SuspendableWork> SuspendableWorkSeq;
local interface SuspendableQueue
{
void enqueueWork(SuspendableWork item);
void enqueueWorkSeq(SuspendableWorkSeq items);
void cancelWork(SuspendableWork item);
/* return value indicates whether queue contains more work
that can be executed immediately
*/
bool executeWork();
/* this is a snapshot and should only be used as a hint */
int workCount();
void setListener(QueueListener listener);
};
]]></script>
</div></div>
<p>With this type of work queue, work can be suspended until it can be completed. When a <tt>SuspendableWork</tt> is executed, it can indicate via its return value if the work completed or if it is just suspended. Below is a sequence diagram that shows the potential operation of a <tt>SuspendableQueue</tt>.</p>
<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/SuspendableWorkQueue2.png?version=1&modificationDate=1299535133697" style="border: 0px solid black" /></span></p>
<p>Operation begins similarly to the <tt>WorkQueue</tt>. What's different is the way the second <tt>SuspendableWork</tt> is handled. Its execution initially calls out to a remote object asynchronously. The <tt>SuspendableWork</tt> returns that its work is suspended. Because the work has been suspended, the <tt>SuspendableQueue</tt> returns false to the consumer, indicating that there's no work to immediately be executed. Once the remote object has finished doing what it needs to do, it notifies the <tt>SuspendableWork</tt> that it has finished. The <tt>SuspendableWork</tt>, who was given a reference to the <tt>SuspendableQueue</tt> when it was constructed, then notifies the <tt>SuspendableQueue</tt>, who informs the <tt>QueueListener</tt>, who then pokes the consumer. At this point, the consumer calls <tt>executeWork</tt> again. This time the <tt>SuspendableWork</tt> can be completed. The <tt>SuspendableQueue</tt> returns false to the consumer again, but this time because the queue is empty.</p>
<p>One aspect that is left open for implementors is the specifics regarding how the consumer gets notified when suspended work becomes resumable. In this example, the <tt>SuspendableWork</tt> notifies the <tt>SuspendableQueue</tt>. It would also be acceptable for the <tt>SuspendableWork</tt> to notify the <tt>QueueListener</tt> directly if desired. The problem is that there is a potential race condition that has to be handled. Specifically, the suspended work may become resumable and listeners may be notified work may be resumed before the initial false return for the <tt>executeWork</tt> call can be communicated to the consumer. To illustrate, here is a diagram of the race condition.</p>
<p><span class="error">Unable to render embedded object: File (Race.png) not found.</span></p>
<p>If the <tt>SuspendableQueue</tt> is the first to be notified that work may be resumed, then he can be the one to handle this race and change his initial false return to a true return. If the <tt>SuspendableQueue</tt> is bypassed in favor of notifying the <tt>QueueListener</tt> directly, then the consumer will be responsible for handling the race condition. This example favors the approach of making the <tt>SuspendableQueue</tt> handle the race condition under the logic that since there may likely be many consumers that all use the same queue implementation, it makes more sense to handle the race condition centrally instead of having to duplicate the race-handling logic in all consumers.</p>
<h1><a name="WorkQueuesandThreadPools-ThreadPool"></a>Thread Pool</h1>
<p>The examples shown so far only have a single consumer executing work in the queue. While it's certainly possible to have multiple consumers executing work on the queue, there are advantages to encapsulating the work into a thread pool object instead.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<script type="syntaxhighlighter" class="toolbar: false; theme: Confluence; brush: java; gutter: false"><![CDATA[
local interface PoolListener
{
void stateChanged(Pool pool, int activeThreads, int idleThreads, int zombieThreads);
void queueWorkAdded(Pool pool, int newWorkCount, bool wasEmpty);
void queueEmptied(Pool pool);
};
local interface Pool
{
void setSize(int size);
AsteriskSCF::System::WorkQueue::V1::Queue getQueue();
};
local interface PoolFactory
{
Pool createPool(PoolListener listener, AsteriskSCF::System::Workqueue::V1::Queue queue);
};
]]></script>
</div></div>
<p>As can be assumed here, a <tt>Pool</tt> has, at its heart, a <tt>WorkQueue</tt>. The <tt>Pool</tt> can relay similar listener information as a work queue, such as when work has been added and when its queue has become empty. The <tt>Pool</tt> also reports changes in state as well, such as when a thread finishes its work and becomes idle, or if threads are destroyed. The <tt>PoolListener</tt> is always given a <tt>Pool</tt> as a parameter in case the <tt>PoolListener</tt> wishes to change a property of the <tt>Pool</tt>. Currently, the only property that can be modified is the number of threads currently in use by the <tt>Pool</tt>.</p>
<p><span class="image-wrap" style=""><img src="/wiki/download/attachments/12550328/ThreadPool.png?version=1&modificationDate=1299535125458" style="border: 0px solid black" /></span></p>
<p>Things begin pretty straightforward, with a <tt>Queue</tt> and <tt>Pool</tt> being created. When the <tt>Queue</tt> is set as the work queue for the <tt>Pool</tt>, this implementation of the <tt>Pool</tt> sets itself as the listener of the queue. The producer then queues two tasks. The <tt>PoolListener</tt>, upon being informed there are two tasks, decides to create two consumer threads. each consumer thread, upon being created, executes work in the <tt>Pool</tt>'s <tt>Queue</tt>. As the threads change from being idle to active, state changes are reported to the <tt>PoolListener</tt>. At the end, once all work is complete, the fact that the queue is empty is reported to the <tt>PoolListener</tt>.</p>
<h1><a name="WorkQueuesandThreadPools-Concurrencyinthreadpools"></a>Concurrency in thread pools</h1>
<p>Notice in the thread pool example that the consumer threads were running the queued tasks concurrently, seemingly negating the benefit of queuing tasks at all. Care must be taken with thread pools to ensure that if tasks are not intended to run concurrently, that they are not. One method of handling this is to make the work items queued to a thread pool's work queue actually themselves be work queues. This way, each thread in the pool is given a sub-queue of work to perform whenever it attempts to execute work. This sub-queue that the thread is given has work items that must be executed serially.</p>
</div>
<div id="commentsSection" class="wiki-content pageSection">
<div style="float: right;">
<a href="https://wiki.asterisk.org/wiki/users/viewnotifications.action" class="grey">Change Notification Preferences</a>
</div>
<a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools">View Online</a>
|
<a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=12550328&revisedVersion=8&originalVersion=7">View Changes</a>
|
<a href="https://wiki.asterisk.org/wiki/display/TOP/Work+Queues+and+Thread+Pools?showComments=true&showCommentArea=true#addcomment">Add Comment</a>
</div>
</div>
</div>
</div>
</div>
</body>
</html>