[asterisk-scf-commits] asterisk-scf/integration/test_channel.git branch "master" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Sep 9 12:28:50 CDT 2010


branch "master" has been created
        at  7d19cc48eb249ad3be96d0fd644afb146b760efc (commit)

- Log -----------------------------------------------------------------
commit 7d19cc48eb249ad3be96d0fd644afb146b760efc
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Sep 9 14:54:15 2010 -0230

    Use an alternate method for hooking up the console driver.

diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
index eadbeba..0785e7a 100644
--- a/src/ConsoleDriver.cpp
+++ b/src/ConsoleDriver.cpp
@@ -102,6 +102,10 @@ void ConsoleDriver::run()
                     {
                         mCommandHandler->clearlog(arg);
                     }
+                    else
+                    {
+                        std::cerr << "unknown command" << std::endl;
+                    }
                 }
                 else
                 {
diff --git a/src/Service.cpp b/src/Service.cpp
index d7fc5c4..e0a9782 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -39,7 +39,6 @@ int TestChannelDriver::run(int, char*[])
     mLogger.getTraceStream() << "Launching AsteriskSCF Session-Oriented Test Channel Service." << std::endl;
     mConsoleDriver = new ConsoleDriver;
     mConsoleDriver->start();
-
     IceStorm::TopicManagerPrx topicManager;
     try
     {
@@ -61,7 +60,7 @@ int TestChannelDriver::run(int, char*[])
     Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("AsteriskSCF.TestChannelService");
     adapter->activate();
 
-    AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr servant = AsteriskSCF::TestUtil::TestEndpoint::initialize(adapter, "TestChannel.Locator");
+    AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr servant = AsteriskSCF::TestUtil::TestEndpoint::initialize(mConsoleDriver, adapter, "TestChannel.Locator");
     AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx endpointLocatorPrx = 
         AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx::checkedCast(adapter->add(servant, communicator()->stringToIdentity("TestChannel.Locator")));
 
@@ -71,14 +70,9 @@ int TestChannelDriver::run(int, char*[])
     params->category = "TestChannel";
     service_management->addLocatorParams(params, "");
 
-    CommandsPtr cmd(CommandsPtr::dynamicCast(servant));
-    if(cmd)
-    {
-        mConsoleDriver->setHandler(cmd);
-    }
     communicator()->waitForShutdown();
 
-    // service_management->unregister();
+    service_management->unregister();
     return EXIT_SUCCESS;
 }
 
diff --git a/src/TestEndpoint.cpp b/src/TestEndpoint.cpp
index 55f080a..318f358 100644
--- a/src/TestEndpoint.cpp
+++ b/src/TestEndpoint.cpp
@@ -441,6 +441,11 @@ public:
         return result;
     }
 
+    CommandsPtr getCommandInterface()
+    {
+        return CommandsPtr::dynamicCast(mManager);
+    }
+
 private:
 
     Ice::ObjectAdapterPtr mAdapter;
@@ -449,15 +454,19 @@ private:
     AsteriskSCF::Core::Endpoint::V1::BaseEndpointPrx mEndpointProxy;
 };
 
+typedef IceUtil::Handle<EndpointLocatorI> EndpointLocatorIPtr;
+
 boost::shared_mutex AsteriskSCF::TestUtil::TestEndpoint::mMutex;
 AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr AsteriskSCF::TestUtil::TestEndpoint::mImpl;
 
-AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr TestEndpoint::initialize(const Ice::ObjectAdapterPtr& adapter, const std::string& id)
+    AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr TestEndpoint::initialize(const ConsoleDriverPtr& console, const Ice::ObjectAdapterPtr& adapter, const std::string& id)
 {
     boost::unique_lock<boost::shared_mutex> lock(mMutex);
     if(mImpl.get() == 0)
     {
-        mImpl = new EndpointLocatorI(adapter, id); 
+        EndpointLocatorIPtr impl(new EndpointLocatorI(adapter, id));
+        console->setHandler(CommandsPtr::dynamicCast(impl->getCommandInterface()));
+        mImpl = AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr::dynamicCast(impl);
     }
     return mImpl;
 }
diff --git a/src/TestEndpoint.h b/src/TestEndpoint.h
index 7eca887..717c785 100644
--- a/src/TestEndpoint.h
+++ b/src/TestEndpoint.h
@@ -11,6 +11,7 @@
 #include <Core/Routing/RoutingIf.h>
 
 #include <boost/thread/shared_mutex.hpp>
+#include "ConsoleDriver.h"
 
 namespace AsteriskSCF
 {
@@ -19,7 +20,7 @@ namespace TestUtil
     class TestEndpoint
     {
     public:
-        static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr initialize(const Ice::ObjectAdapterPtr& adapter, const std::string& id);
+        static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr initialize(const ConsoleDriverPtr& driver, const Ice::ObjectAdapterPtr& adapter, const std::string& id);
     private:
         static boost::shared_mutex mMutex;
         static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr mImpl;

commit 6a15d6bce50aa60a36b00f162b5fbf167e4ec523
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Sep 9 14:36:17 2010 -0230

    Fixing up some more of the code and connecting the console driver to the command handler.

diff --git a/src/Commands.h b/src/Commands.h
new file mode 100644
index 0000000..208882f
--- /dev/null
+++ b/src/Commands.h
@@ -0,0 +1,70 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+
+#pragma once
+
+//
+// Interface for the console driver (or other UIs) to interact
+// with the session endpoint.
+//
+
+#include <string>
+#include <vector>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Handle.h>
+
+//
+// Add more commands.
+//
+class Commands : public virtual IceUtil::Shared
+{
+public:
+    virtual ~Commands() {}
+
+    //
+    // Echo any cached media frames.
+    //
+    virtual void echo(const std::string& id) = 0;
+    
+    //
+    // Turn on automatic (immediate) echo of media frames.
+    //
+    virtual void echoOn(const std::string& id) = 0;
+
+    //
+    // Turn off automatic echo of media frames.
+    //
+    virtual void echoOff(const std::string& id) = 0;
+
+    //
+    // Answer an invite.
+    //
+    virtual void answer(const std::string& id) = 0;
+
+    //
+    // Hangup!
+    // 
+    virtual void hangup(const std::string& id) = 0;
+
+    //
+    // Die horribly.
+    //
+    virtual void die(const std::string& id) = 0;
+
+    //
+    // Dump log
+    //
+    virtual void getlog(const std::string& id, std::vector<std::string>& log) = 0;
+
+    //
+    // Clear log
+    //
+    virtual void clearlog(const std::string& id) = 0;
+};
+
+typedef IceUtil::Handle<Commands> CommandsPtr;
diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
index ca75569..eadbeba 100644
--- a/src/ConsoleDriver.cpp
+++ b/src/ConsoleDriver.cpp
@@ -30,21 +30,85 @@ ConsoleDriver::ConsoleDriver() :
 {
 }
 
+void ConsoleDriver::setHandler(const CommandsPtr& handler)
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mCommandHandler = handler;
+    mMonitor.notify();
+}
+
 void ConsoleDriver::run()
 {
     std::cerr << "Console driver started" << std::endl;
     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
-    while(!mDone && _kbhit() == 0)
-    {
-        mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
-    }
-    if(!mDone)
+    while(!mDone)
     {
-        std::string command;
-        std::getline(std::cin, command);
-        //
-        // TODO: do stuff!!! :)
-        //
+        while(!mDone && _kbhit() == 0)
+        {
+            mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
+        }
+        if(!mDone)
+        {
+            std::string command;
+            std::getline(std::cin, command);
+            //
+            // TODO: do stuff!!! :)
+            //
+            if(!mCommandHandler)
+            {
+                std::cout << "TODO: Implement commands! Echoing: " << command << std::endl;
+            }
+            else
+            {
+                size_t split = command.find(" ");
+                if(split != std::string::npos)
+                {
+                    std::string cmd = command.substr(0, split);
+                    std::string arg = command.substr(split+1);
+                    if(cmd == "echo")
+                    {
+                        mCommandHandler->echo(arg);
+                    }
+                    else if(cmd == "echoon")
+                    {
+                        mCommandHandler->echoOn(arg);
+                    }
+                    else if(cmd == "echooff")
+                    {
+                        mCommandHandler->echoOff(arg);
+                    }
+                    else if(cmd == "answer")
+                    {
+                        mCommandHandler->answer(arg);
+                    }
+                    else if(cmd == "hangup")
+                    {
+                        mCommandHandler->hangup(arg);
+                    }
+                    else if(cmd == "die")
+                    {
+                        mCommandHandler->die(arg);
+                    }
+                    else if(cmd == "getlog")
+                    {
+                        std::vector<std::string> result;
+                        mCommandHandler->getlog(arg, result);
+                        for(std::vector<std::string>::iterator i = result.begin(); i != result.end(); ++i)
+                        {
+                            std::cerr << *i << std::endl;
+                        }
+                    }
+                    else if(cmd == "clearlog")
+                    {
+                        mCommandHandler->clearlog(arg);
+                    }
+                }
+                else
+                {
+                    std::cerr << "parsing error" << std::endl;
+                }
+            }
+        }
     }
 }
 
diff --git a/src/ConsoleDriver.h b/src/ConsoleDriver.h
index 4ff5103..465306a 100644
--- a/src/ConsoleDriver.h
+++ b/src/ConsoleDriver.h
@@ -11,6 +11,7 @@
 #include <IceUtil/Thread.h>
 #include <string>
 #include "Logger.h"
+#include "Commands.h"
 
 class ConsoleDriver : public IceUtil::Thread
 {
@@ -18,12 +19,14 @@ public:
     ConsoleDriver();
     
     void run();
+    void setHandler(const CommandsPtr& handler);
     void write(const std::string& msg);
     void destroy();
 private:
     IceUtil::Monitor<IceUtil::Mutex> mMonitor;
     bool mDone;
     AsteriskSCF::TestUtil::Logger mLogger;
+    CommandsPtr mCommandHandler;
 };
 
 typedef IceUtil::Handle<ConsoleDriver> ConsoleDriverPtr;
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
index 4c094fc..a9c5e38 100644
--- a/src/InternalExceptions.h
+++ b/src/InternalExceptions.h
@@ -8,7 +8,7 @@
 
 namespace AsteriskSCF
 {
-namespace BridgeService
+namespace TestUtil
 {
     class ConfigException : public std::exception
     {
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
index b9051da..6754533 100644
--- a/src/ListenerManager.h
+++ b/src/ListenerManager.h
@@ -9,7 +9,7 @@
 
 namespace AsteriskSCF
 {
-namespace BridgeService
+namespace TestUtil
 {
     //
     // Helper template for classes that need to implement listener style interfaces.
diff --git a/src/MediaEchoThread.cpp b/src/MediaEchoThread.cpp
index 1cc555a..155e59f 100644
--- a/src/MediaEchoThread.cpp
+++ b/src/MediaEchoThread.cpp
@@ -8,27 +8,29 @@
 #include "MediaEchoThread.h"
 
 MediaEchoThread::MediaEchoThread():
-    mDone(false)
+    mDone(false),
+    mPaused(true)
 {
 }
 
 void MediaEchoThread::run()
 {
     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
-    while(!mDone && mFrames.size() && mSink != 0)
+    while(!mDone)
     {
         mMonitor.wait();
-    }
-    if(!mDone)
-    {
-        try
-        {
-            mSink->write(mFrames);
-            mFrames.clear();
-        }
-        catch(const Ice::Exception&)
+
+        if(!mDone && mFrames.size() > 0 && mSink != 0)
         {
-            // TODO: log and continue
+            try
+            {
+                mSink->write(mFrames);
+                mFrames.clear();
+            }
+            catch(const Ice::Exception&)
+            {
+                // TODO: log and continue
+            }
         }
     }
 }
@@ -37,7 +39,10 @@ void MediaEchoThread::pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFram
 {
     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
     mFrames.insert(mFrames.end(), newFrames.begin(), newFrames.end());
-    mMonitor.notify();
+    if(!mPaused)
+    {
+        mMonitor.notify();
+    }
 }
 
 void MediaEchoThread::destroy()
@@ -51,5 +56,27 @@ void MediaEchoThread::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx)
 {
     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
     mSink = prx;
+    if(!mPaused)
+    {
+        mMonitor.notify();
+    }
+}
+
+void MediaEchoThread::echo()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mMonitor.notify();
+}
+
+void MediaEchoThread::pause()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mPaused = true;
+}
+
+void MediaEchoThread::resume()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mPaused = false;
     mMonitor.notify();
 }
diff --git a/src/MediaEchoThread.h b/src/MediaEchoThread.h
index edd0fee..9f350b5 100644
--- a/src/MediaEchoThread.h
+++ b/src/MediaEchoThread.h
@@ -23,10 +23,18 @@ public:
     void destroy();
     void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx);
 
+    //
+    // Use when paused... causes the loop to run once.
+    //
+    void echo();
+    void pause();
+    void resume();
+
 private:
     AsteriskSCF::Media::V1::StreamSinkPrx mSink;
     IceUtil::Monitor<IceUtil::Mutex> mMonitor;
     AsteriskSCF::Media::V1::FrameSeq mFrames;
     bool mDone;
+    bool mPaused;
 };
 typedef IceUtil::Handle<MediaEchoThread> MediaEchoThreadPtr;
diff --git a/src/MediaSession.cpp b/src/MediaSession.cpp
index 6704824..1a45cc5 100644
--- a/src/MediaSession.cpp
+++ b/src/MediaSession.cpp
@@ -6,125 +6,23 @@
 * All rights reserved.
 */
 #include "MediaSession.h"
-#include "MediaEchoThread.h"
 
 //
 // The test media session echoes media received on its sink object to its
 // associated source object.
 //
 
-//
-// Test Sink.
-//
-class SinkI : public AsteriskSCF::Media::V1::StreamSink
-{
-public:
-    SinkI(const std::string& id) :
-        mId(id),
-        mThread(new MediaEchoThread)
-    {
-    }
-
-    void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
-    {
-        mThread->pushFrames(frames);
-    }
-
-    void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
-    {
-        mSource = source;
-    }
-
-    AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
-    {
-        return mSource;
-    }
-
-    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
-    {
-        return mFormats;
-    }
-
-    std::string getId(const Ice::Current&)
-    {
-        return mId;
-    }
-
-    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
-    {
-        // XXX
-    }
-
-    void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
-    {
-        mThread->setSink(sink);
-    }
-
-private:
-    AsteriskSCF::Media::V1::StreamSourcePrx mSource;
-    AsteriskSCF::Media::V1::FormatSeq mFormats;
-    std::string mId;
-    MediaEchoThreadPtr mThread;
-};
-typedef IceUtil::Handle<SinkI> SinkIPtr;
-
-//
-// Test source.
-//
-class SourceI : public AsteriskSCF::Media::V1::StreamSource
-{
-public:
-
-    SourceI(const std::string& id, const SinkIPtr& sinkServant) :
-        mId(id),
-        mSinkServant(sinkServant)
-    {
-    }
-
-    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
-    {
-        mSink = sink;
-        mSinkServant->setRelaySink(mSink);
-    }
-
-    AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
-    {
-        return mSink;
-    }
-
-    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
-    {
-        return mFormats;
-    }
-
-    std::string getId(const Ice::Current&)
-    {
-        return mId;
-    }
-
-    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
-    {
-        // XXX
-    }
-
-private:
-    AsteriskSCF::Media::V1::StreamSinkPrx mSink;
-    AsteriskSCF::Media::V1::FormatSeq mFormats;
-    std::string mId;
-    SinkIPtr mSinkServant;
-};
-
 MediaSessionI::MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter) :
     mId(id),
     mAdapter(adapter)
 {
-    SinkIPtr sinkServant = new SinkI(mId + "Sink");
-    mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(sinkServant,
+    mSink = new SinkI(mId + "Sink");
+    mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(mSink,
                             mAdapter->getCommunicator()->stringToIdentity(mId + "Sink"))));
+    mSource = new SourceI(mId + "Source", mSink);
     mSources.push_back(
         AsteriskSCF::Media::V1::StreamSourcePrx::uncheckedCast(mAdapter->add(
-                    new SourceI(mId + "Source", sinkServant),
-                            mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
+                    mSource, mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
 }
 
 MediaSessionI::~MediaSessionI()
@@ -154,3 +52,18 @@ std::string MediaSessionI::getId(const Ice::Current&)
 {
     return mId;
 }
+
+void MediaSessionI::echoOn()
+{
+    mSink->echoOn();
+}
+
+void MediaSessionI::echoOff()
+{
+    mSink->echoOff();
+}
+
+void MediaSessionI::echo()
+{
+    mSink->echo();
+}
diff --git a/src/MediaSession.h b/src/MediaSession.h
index d1a3c09..a49c0d1 100644
--- a/src/MediaSession.h
+++ b/src/MediaSession.h
@@ -10,6 +10,137 @@
 #include <Media/MediaIf.h>
 #include <boost/thread/shared_mutex.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include "MediaEchoThread.h"
+
+//
+// TODO: Inline methods should be moved into CPP file.
+//
+
+//
+// Test Sink.
+//
+class SinkI : public AsteriskSCF::Media::V1::StreamSink
+{
+public:
+    SinkI(const std::string& id) :
+        mId(id),
+        mThread(new MediaEchoThread)
+    {
+    }
+
+    ~SinkI()
+    {
+        if(mThread)
+        {
+            mThread->destroy();
+        }
+    }
+
+    void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
+    {
+        mThread->pushFrames(frames);
+    }
+
+    void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+    {
+        mSource = source;
+    }
+
+    AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
+    {
+        return mSource;
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+    {
+        return mFormats;
+    }
+
+    std::string getId(const Ice::Current&)
+    {
+        return mId;
+    }
+
+    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+    {
+        // XXX
+    }
+
+    void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
+    {
+        mThread->setSink(sink);
+    }
+
+    void echo()
+    {
+        mThread->echo();
+    }
+
+    void echoOn()
+    {
+        mThread->resume();
+    }
+
+    void echoOff()
+    {
+        mThread->pause();
+    }
+
+private:
+    AsteriskSCF::Media::V1::StreamSourcePrx mSource;
+    AsteriskSCF::Media::V1::FormatSeq mFormats;
+    std::string mId;
+    MediaEchoThreadPtr mThread;
+};
+typedef IceUtil::Handle<SinkI> SinkIPtr;
+
+//
+// Test source.
+//
+class SourceI : public AsteriskSCF::Media::V1::StreamSource
+{
+public:
+
+    SourceI(const std::string& id, const SinkIPtr& sinkServant) :
+        mId(id),
+        mSinkServant(sinkServant)
+    {
+    }
+
+    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+    {
+        mSink = sink;
+        mSinkServant->setRelaySink(mSink);
+    }
+
+    AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
+    {
+        return mSink;
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+    {
+        return mFormats;
+    }
+
+    std::string getId(const Ice::Current&)
+    {
+        return mId;
+    }
+
+    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+    {
+        // XXX
+    }
+
+private:
+    AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+    AsteriskSCF::Media::V1::FormatSeq mFormats;
+    std::string mId;
+    SinkIPtr mSinkServant;
+};
+
+typedef IceUtil::Handle<SourceI> SourceIPtr;
 
 class MediaSessionI : public AsteriskSCF::Media::V1::Session
 {
@@ -20,12 +151,24 @@ public:
     AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
+
+    //
+    // internal test methods.
+    //
+    void echoOn();
+    void echoOff();
+    void echo();
     
 private:
     boost::shared_mutex mMutex;
     std::string mId;
     Ice::ObjectAdapterPtr mAdapter;
 
+    SinkIPtr mSink;
+    SourceIPtr mSource;
+
     AsteriskSCF::Media::V1::StreamSourceSeq mSources;
     AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
 };
+
+typedef IceUtil::Handle<MediaSessionI> MediaSessionIPtr;
diff --git a/src/Service.cpp b/src/Service.cpp
index 65b3924..d7fc5c4 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -13,6 +13,7 @@
 #include "Logger.h"
 #include "TestEndpoint.h"
 #include "ConsoleDriver.h"
+#include "Commands.h"
 
 class TestChannelDriver : public Ice::Application
 {
@@ -69,8 +70,12 @@ int TestChannelDriver::run(int, char*[])
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr params = new AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams();
     params->category = "TestChannel";
     service_management->addLocatorParams(params, "");
-    
 
+    CommandsPtr cmd(CommandsPtr::dynamicCast(servant));
+    if(cmd)
+    {
+        mConsoleDriver->setHandler(cmd);
+    }
     communicator()->waitForShutdown();
 
     // service_management->unregister();
diff --git a/src/TestEndpoint.cpp b/src/TestEndpoint.cpp
index 1f7f497..55f080a 100644
--- a/src/TestEndpoint.cpp
+++ b/src/TestEndpoint.cpp
@@ -19,10 +19,19 @@
 
 #include "Logger.h"
 #include "MediaSession.h"
+#include "Commands.h"
 
+
+//
+// Forward declarations.
+//
 class EndpointManager;
 typedef IceUtil::Handle<EndpointManager> EndpointManagerPtr;
 
+//
+// Interface that defines the contract between the SessionI session implementation (child)
+// and its parent (EndpointManager).
+//
 class InternalManagerIf : virtual public IceUtil::Shared
 {
 public:
@@ -31,10 +40,216 @@ public:
 };
 typedef IceUtil::Handle<InternalManagerIf> InternalManagerPtr;
 
+
+//
+// Specialization of ListenerManagerT helper template for SessionListeners. Basically takes care of invoking the
+// appropriate method on a publisher.
+//
+class SessionListenerMgr : public AsteriskSCF::TestUtil::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>
+{
+public:
+    SessionListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+        AsteriskSCF::TestUtil::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>(communicator, topicName)
+    {
+        mPublisher = AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(mTopic->getPublisher());
+    }
+
+    //
+    // TODO:
+    //
+    void connected()
+    {
+        mPublisher->connected(mSession);
+    }
+
+    void flashed()
+    {
+        mPublisher->flashed(mSession);
+    }
+
+    void held()
+    {
+        mPublisher->held(mSession);
+    }
+
+    void progressing(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
+    {
+        mPublisher->progressing(mSession, response);
+    }
+
+    void ringing()
+    {
+        mPublisher->ringing(mSession);
+    }
+
+    void stopped(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
+    {
+        mPublisher->stopped(mSession, response);
+    }
+
+    void unheld()
+    {
+        mPublisher->unheld(mSession);
+    }
+
+    void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx)
+    {
+        mSession = prx;
+    }
+        
+protected:
+    AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mPublisher;
+    AsteriskSCF::SessionCommunications::V1::SessionPrx mSession;
+};
+
+typedef IceUtil::Handle<SessionListenerMgr> SessionListenerMgrPtr;
+
+//
+// Needs some intelligence:
+//   - scheduled ringing
+//   - etc.
+class SessionI : public AsteriskSCF::SessionCommunications::V1::Session
+{
+public:
+    SessionI(const InternalManagerPtr& m, const AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx& prx,  const std::string& id,
+            const Ice::ObjectAdapterPtr& adapter) :
+        mEndpointManager(m),
+        mEndpointPrx(prx),
+        mId(id),
+        mListeners(new SessionListenerMgr(adapter->getCommunicator(), mId))
+    {
+    }
+
+    AsteriskSCF::SessionCommunications::V1::SessionInfoPtr addListener(
+        const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+                const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+        return 0;
+    }
+
+    void connect(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    void flash(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx getEndpoint(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+        return mEndpointPrx;
+    }
+
+    AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+        return 0;
+    }
+
+    AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current& current)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        if(!mMediaSession)
+        {
+            mMediaSession = AsteriskSCF::Media::V1::SessionPrx::uncheckedCast(
+                current.adapter->add(new MediaSessionI(mId + "Media", current.adapter),
+                        current.adapter->getCommunicator()->stringToIdentity(mId + "Media")));
+        }
+        return mMediaSession;
+    };
+
+    void hold(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    void progress(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+            const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    void ring(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+                
+    void start(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+                
+    void stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+                
+    void unhold(const Ice::Current&)
+    {
+        mEndpointManager->log(mId, __FUNCTION__);
+    }
+
+    void echo()
+    {
+        mMediaServant->echo();
+    }
+    
+    void echoOn()
+    {
+        mMediaServant->echoOn();
+    }
+
+    void echoOff()
+    {
+        mMediaServant->echoOff();
+    }
+
+    void answer()
+    {
+        mListeners->connected();
+    }
+
+    void hangup()
+    {
+        mListeners->stopped(new AsteriskSCF::SessionCommunications::V1::ResponseCode);
+    }
+
+    void die()
+    {
+    }
+
+    void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx)
+    {
+        mListeners->setProxy(prx);
+    }
+
+private:
+    boost::shared_mutex mMutex;
+    InternalManagerPtr mEndpointManager;
+    AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx mEndpointPrx;
+    const std::string mId;
+    MediaSessionIPtr mMediaServant;
+    AsteriskSCF::Media::V1::SessionPrx mMediaSession;
+    SessionListenerMgrPtr mListeners;
+};
+
+typedef IceUtil::Handle<SessionI> SessionIPtr;
+
 class EndpointManager :
     virtual public IceUtil::Shared,
     virtual public InternalManagerIf,
-    virtual public AsteriskSCF::SessionCommunications::V1::SessionEndpoint
+    virtual public AsteriskSCF::SessionCommunications::V1::SessionEndpoint,
+    virtual public Commands
 {
 public:
     EndpointManager(const Ice::ObjectAdapterPtr& adapter, const std::string& id) :
@@ -50,9 +265,11 @@ public:
     {
         InternalSessionInfo info;
         info.id = mId + "." + destination + "." + IceUtil::generateUUID();
-        AsteriskSCF::SessionCommunications::V1::SessionPtr session = new SessionI(this,
-                AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::checkedCast(current.adapter->createProxy(current.id)), info.id);
-        info.session =  AsteriskSCF::SessionCommunications::V1::SessionPrx::checkedCast(current.adapter->add(session, current.adapter->getCommunicator()->stringToIdentity(info.id)));
+        info.servant = new SessionI(this,
+                AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::checkedCast(current.adapter->createProxy(current.id)),
+                info.id, current.adapter);
+        info.session =  AsteriskSCF::SessionCommunications::V1::SessionPrx::checkedCast(current.adapter->add(info.servant, current.adapter->getCommunicator()->stringToIdentity(info.id)));
+        info.servant->setProxy(info.session);
         boost::unique_lock<boost::shared_mutex> lock(mMutex);
         mSessions[info.id] = info;
         return info.session;
@@ -85,120 +302,119 @@ public:
         }
     }
 
-private:
-
-    struct InternalSessionInfo
+    void echo(const std::string& id)
     {
-        AsteriskSCF::SessionCommunications::V1::SessionPrx session;
-        std::string id;
-        std::vector<std::string> log;
-    };
-
-    typedef std::map<std::string, InternalSessionInfo> SessionMap;
-    SessionMap mSessions;
-    boost::shared_mutex mMutex;
-    AsteriskSCF::TestUtil::Logger mLogger;
-    Ice::ObjectAdapterPtr mAdapter;
-    std::string mId;
-
-    class EndpointManagerPtr;
-
-    class SessionI : public AsteriskSCF::SessionCommunications::V1::Session
-    {
-    public:
-        SessionI(const InternalManagerPtr& m, const AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx& prx,  const std::string& id) :
-            mEndpointManager(m),
-            mEndpointPrx(prx),
-            mId(id)
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
+            i->second.servant->echo();
+            return;
         }
-
-        AsteriskSCF::SessionCommunications::V1::SessionInfoPtr addListener(
-            const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
-            const Ice::Current&)
+        std::cerr << "Unknown id" << std::endl;
+    }
+    
+    void echoOn(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
-            return 0;
+            i->second.servant->echoOn();
+            return;
         }
+        std::cerr << "Unknown id" << std::endl;
+    }
 
-        void connect(const Ice::Current&)
+    void echoOff(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
+            i->second.servant->echoOff();
+            return;
         }
+        std::cerr << "Unknown id" << std::endl;
+    }
 
-        void flash(const Ice::Current&)
+    void answer(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
+            i->second.servant->answer();
+            return;
         }
-
-        AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx getEndpoint(const Ice::Current&)
+        std::cerr << "Unknown id" << std::endl;
+    }
+    
+    void hangup(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
-            return mEndpointPrx;
+            i->second.servant->hangup();
+            return;
         }
+        std::cerr << "Unknown id" << std::endl;
+    }
 
-        AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&)
+    void die(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
-            return 0;
+            mAdapter->remove(i->second.session->ice_getIdentity());
+            return;
         }
-
-        AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current& current)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-            boost::unique_lock<boost::shared_mutex> lock(mMutex);
-            if(!mMediaSession)
-            {
-                mMediaSession = AsteriskSCF::Media::V1::SessionPrx::uncheckedCast(
-                    current.adapter->add(new MediaSessionI(mId + "Media", current.adapter),
-                            current.adapter->getCommunicator()->stringToIdentity(mId + "Media")));
-            }
-            return mMediaSession;
-        };
-
-        void hold(const Ice::Current&)
+        std::cerr << "Unknown id" << std::endl;
+    }
+    
+    void getlog(const std::string& id, std::vector<std::string>& log)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
+            log = i->second.log;
+            return;
         }
+        std::cerr << "Unknown id" << std::endl;
+    }
 
-        void progress(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+    void clearlog(const std::string& id)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
+        SessionMap::iterator i = mSessions.find(id);
+        if(i != mSessions.end())
         {
-            mEndpointManager->log(mId, __FUNCTION__);
+            i->second.log.clear();
+            return;
         }
+        std::cerr << "Unknown id" << std::endl;
+    }
 
-        void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
-                const Ice::Current&)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-        }
+private:
 
-        void ring(const Ice::Current&)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-        }
-                
-        void start(const Ice::Current&)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-        }
-                
-        void stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-        }
-                
-        void unhold(const Ice::Current&)
-        {
-            mEndpointManager->log(mId, __FUNCTION__);
-        }
-  
-    private:
-        boost::shared_mutex mMutex;
-        InternalManagerPtr mEndpointManager;
-        AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx mEndpointPrx;
-        const std::string mId;
-        AsteriskSCF::Media::V1::SessionPrx mMediaSession;
+    struct InternalSessionInfo
+    {
+        AsteriskSCF::SessionCommunications::V1::SessionPrx session;
+        SessionIPtr servant;
+        std::string id;
+        std::vector<std::string> log;
     };
+
+    typedef std::map<std::string, InternalSessionInfo> SessionMap;
+    SessionMap mSessions;
+    boost::shared_mutex mMutex;
+    AsteriskSCF::TestUtil::Logger mLogger;
+    Ice::ObjectAdapterPtr mAdapter;
+    std::string mId;
+
 };
 
 namespace AsteriskSCF

commit 2e3fb4b95e34c6c8b6c18e098ef535323612bf18
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Sep 9 12:33:51 2010 -0230

    Initial commit of test channel driver component.

diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..eba8938
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,6 @@
+[submodule "cmake"]
+	path = cmake
+	url = ssh://git@git.asterisk.org/asterisk-scf/release/cmake
+[submodule "slice"]
+	path = slice
+	url = ssh://git@git.asterisk.org/asterisk-scf/integration/slice
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..9c7be7c
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,17 @@
+# Service locator build system
+
+# Minimum we require is 2.6, any lower and stuff would fail horribly
+cmake_minimum_required(VERSION 2.6)
+
+# Include common Hydra build infrastructure
+include(cmake/Hydra_v4.cmake)
+
+# This project is C++ based and requires a minimum of 3.4
+hydra_project("Test Channel" 3.4 CXX)
+
+# Take care of slice definitions
+add_subdirectory(slice EXCLUDE_FROM_ALL)
+
+# Take care of the source code for this project
+add_subdirectory(src)
+
diff --git a/cmake b/cmake
new file mode 160000
index 0000000..1e7a172
--- /dev/null
+++ b/cmake
@@ -0,0 +1 @@
+Subproject commit 1e7a1725fe9e2d176a2c26820d6bcd96c6c3939e
diff --git a/slice b/slice
new file mode 160000
index 0000000..14413db
--- /dev/null
+++ b/slice
@@ -0,0 +1 @@
+Subproject commit 14413db47bfae3d1ff57d36e80cfe700d755ae0b
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..91c0483
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,15 @@
+hydra_component_init(test_channel CXX)
+hydra_component_add_slice(test_channel EndpointIf)
+hydra_component_add_slice(test_channel ComponentServiceIf)
+hydra_component_add_slice(test_channel SessionCommunicationsIf)
+hydra_component_add_slice(test_channel RoutingIf)
+hydra_component_add_file(test_channel Service.cpp)
+hydra_component_add_file(test_channel MediaEchoThread.cpp)
+hydra_component_add_file(test_channel MediaSession.cpp)
+hydra_component_add_file(test_channel ConsoleDriver.cpp)
+hydra_component_add_file(test_channel TestEndpoint.cpp)
+hydra_component_add_ice_libraries(test_channel IceStorm)
+hydra_component_add_boost_libraries(test_channel thread)
+hydra_component_add_boost_libraries(test_channel date_time)
+hydra_component_build_standalone(test_channel)
+hydra_component_install(test_channel RUNTIME bin "Test Channel Emulator." Core)
diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
new file mode 100644
index 0000000..ca75569
--- /dev/null
+++ b/src/ConsoleDriver.cpp
@@ -0,0 +1,62 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include <iostream>
+#include <IceUtil/Thread.h>
+
+#include "ConsoleDriver.h"
+
+#ifndef _WIN32
+#include <unistd.h>
+int _kbhit()
+{
+    struct timeval tv;
+    fd_set fds;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+    FD_ZERO(&fds);
+    FD_SET(STDIN_FILENO, &fds);
+    select(STDIN_FILENO + 1, &fds, 0, 0, &tv);
+    return FD_ISSET(0, &fds);
+}
+#endif
+
+ConsoleDriver::ConsoleDriver() :
+    mDone(false)
+{
+}
+
+void ConsoleDriver::run()
+{
+    std::cerr << "Console driver started" << std::endl;
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    while(!mDone && _kbhit() == 0)
+    {
+        mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
+    }
+    if(!mDone)
+    {
+        std::string command;
+        std::getline(std::cin, command);
+        //
+        // TODO: do stuff!!! :)
+        //
+    }
+}
+
+void ConsoleDriver::write(const std::string& message)
+{
+    std::cout << message << std::endl;
+}
+
+void ConsoleDriver::destroy()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mLogger.getDebugStream() << __FUNCTION__ << " : destroyed. " << std::endl;
+    mDone = true;
+    mMonitor.notifyAll();
+}
diff --git a/src/ConsoleDriver.h b/src/ConsoleDriver.h
new file mode 100644
index 0000000..4ff5103
--- /dev/null
+++ b/src/ConsoleDriver.h
@@ -0,0 +1,29 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+
+#include <IceUtil/Monitor.h>
+#include <IceUtil/Thread.h>
+#include <string>
+#include "Logger.h"
+
+class ConsoleDriver : public IceUtil::Thread
+{
+public:
+    ConsoleDriver();
+    
+    void run();
+    void write(const std::string& msg);
+    void destroy();
+private:
+    IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+    bool mDone;
+    AsteriskSCF::TestUtil::Logger mLogger;
+};
+
+typedef IceUtil::Handle<ConsoleDriver> ConsoleDriverPtr;
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
new file mode 100644
index 0000000..4c094fc
--- /dev/null
+++ b/src/InternalExceptions.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <exception>
+#include <string>
+#include <sstream>
+
+// TODO: Some exceptions that might be better off in the shared codebase.
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    class ConfigException : public std::exception
+    {
+    public:
+
+        ConfigException(const std::string& propertyName, const std::string& message) 
+        {
+            std::stringstream msg;
+            msg << propertyName << " configuration error: ";
+            if(message.size() != 0)
+            {
+                msg << message;
+            }
+            else
+            {
+                msg << "(no message)";
+            }
+            mWhat = msg.str();
+        }
+
+        ~ConfigException() throw()
+        {
+        }
+
+        const char* what() const
+            throw()
+        {
+            return mWhat.c_str();
+        }
+
+    private:
+        std::string mWhat;
+    };
+}
+}
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
new file mode 100644
index 0000000..b9051da
--- /dev/null
+++ b/src/ListenerManager.h
@@ -0,0 +1,140 @@
+#pragma once
+
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <string>
+#include <vector>
+#include "InternalExceptions.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    //
+    // Helper template for classes that need to implement listener style interfaces.
+    //
+    template <class T>
+    class ListenerManagerT : public IceUtil::Shared
+    {
+        typedef std::vector<T> ListenerSeq;
+        typename std::vector<T>::iterator ListenerIter;
+    public:
+        ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+            mCommunicator(communicator),
+            mTopicName(topicName)
+        {
+            //
+            // TODO: While this is being concocted for a single component, it would make more sense
+            // to have the topic manager passed in during construction for more general usage.
+            //
+            const std::string propertyName = "TopicManager.Proxy";
+            std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
+            if(topicManagerProperty.size() == 0)
+            {
+                throw ConfigException(propertyName, "Topic manager proxy property missing. "
+                        "Unable to initialize listener support.");
+            }
+
+            IceStorm::TopicManagerPrx topicManager;
+            try
+            {
+                IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
+            }
+            catch(const Ice::Exception&)
+            {
+            }
+            if(!topicManager)
+            {
+                throw ConfigException(propertyName, "Topic manager proxy is not a valid proxy or is unreachable");
+            }
+
+            try
+            {
+                mTopic = topicManager->retrieve(mTopicName);
+            }
+            catch(const IceStorm::NoSuchTopic&)
+            {
+            }
+
+            if(!mTopic)
+            {
+                try
+                {
+                    mTopic = topicManager->create(mTopicName);
+                }
+                catch(const IceStorm::TopicExists&)
+                {
+                    //
+                    // In case there is a race condition when creating the topic.
+                    //
+                    mTopic = topicManager->retrieve(mTopicName);
+                }
+            }
+
+            if(!mTopic)
+            {
+                throw ConfigException(propertyName,
+                        std::string("unable to create topic with the provided configuration :") + mTopicName);
+            }
+        }
+
+        virtual ~ListenerManagerT()
+        {
+            //
+            // TODO: Destroy topic.
+            //
+        }
+
+        //
+        // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
+        // and whatnot are not flagged.
+        //
+        void addListener(const T& listener)
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if(mListeners.find(listener) == mListeners.end())
+            {
+                mListeners.push_back(listener);
+            }
+            IceStorm::QoS qos;
+            qos["reliability"] = "ordered";
+            try
+            {
+                mTopic->subscribeAndGetPublisher(qos, listener);
+            }
+            catch(const IceStorm::AlreadySubscribed&)
+            {
+                //
+                // This indicates some kind of inconsistent state.
+                //
+            }
+        }
+
+        void removeListener(const T& listener)
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            typename std::vector<T>::iterator i = mListeners.find(listener);
+            if(i != mListeners.end())
+            {
+                mListeners.erase(i);
+                mTopic->unsubscribe(listener);
+            }
+        }
+
+        std::vector<T> getListeners()
+        {
+            boost::shared_lock<boost::shared_mutex> lock(mLock);
+            std::vector<T> result(mListeners);
+            return result;
+        }
+
+    protected:
+        boost::shared_mutex mLock;
+        Ice::CommunicatorPtr mCommunicator;
+        std::string mTopicName;
+        IceStorm::TopicPrx mTopic;
+        ListenerSeq mListeners;
+    };
+}
+}
diff --git a/src/Logger.h b/src/Logger.h
new file mode 100644
index 0000000..f2c11d6
--- /dev/null
+++ b/src/Logger.h
@@ -0,0 +1,53 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+
+#include <iostream>
+
+namespace AsteriskSCF
+{
+namespace TestUtil 
+{
+    //
+    // Place holder log stream holder.
+    //
+    class Logger 
+    {
+    public:
+        Logger() :
+            mTraceDebug(true)
+        {
+        }
+
+        std::ostream& getTraceStream()
+        {
+            return std::cerr;
+        }
+
+        std::ostream& getErrorStream()
+        {
+            return std::cerr;
+        }
+
+        std::ostream& getDebugStream()
+        {
+            return std::cerr;
+        }
+
+        bool debugTracing() { return mTraceDebug; }
+
+        std::ostream& getInfoStream()
+        {
+            return std::cerr;
+        }
+
+    private:
+        bool mTraceDebug;
+    };
+} // End of namespace TestUtil
+} // End of namespace AsteriskSCF
diff --git a/src/MediaEchoThread.cpp b/src/MediaEchoThread.cpp
new file mode 100644
index 0000000..1cc555a
--- /dev/null
+++ b/src/MediaEchoThread.cpp
@@ -0,0 +1,55 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "MediaEchoThread.h"
+
+MediaEchoThread::MediaEchoThread():
+    mDone(false)
+{
+}
+
+void MediaEchoThread::run()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    while(!mDone && mFrames.size() && mSink != 0)
+    {
+        mMonitor.wait();
+    }
+    if(!mDone)
+    {
+        try
+        {
+            mSink->write(mFrames);
+            mFrames.clear();
+        }
+        catch(const Ice::Exception&)
+        {
+            // TODO: log and continue
+        }
+    }
+}
+
+void MediaEchoThread::pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFrames)
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mFrames.insert(mFrames.end(), newFrames.begin(), newFrames.end());
+    mMonitor.notify();
+}
+
+void MediaEchoThread::destroy()
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mDone = true;
+    mMonitor.notify();
+}
+
+void MediaEchoThread::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx)
+{
+    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+    mSink = prx;
+    mMonitor.notify();
+}
diff --git a/src/MediaEchoThread.h b/src/MediaEchoThread.h
new file mode 100644
index 0000000..edd0fee
--- /dev/null
+++ b/src/MediaEchoThread.h
@@ -0,0 +1,32 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <Ice/Ice.h>
+#include <Media/MediaIf.h>
+
+//
+// A thread that handles taking frames that were written to the sink
+// implementation and writing them out to another sink (usually the one
+// associated with a related source).
+//
+class MediaEchoThread : public virtual IceUtil::Thread
+{
+public:
+    MediaEchoThread();
+    void run();
+    void pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFrames);
+    void destroy();
+    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx);
+
+private:
+    AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+    IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+    AsteriskSCF::Media::V1::FrameSeq mFrames;
+    bool mDone;
+};
+typedef IceUtil::Handle<MediaEchoThread> MediaEchoThreadPtr;
diff --git a/src/MediaSession.cpp b/src/MediaSession.cpp
new file mode 100644
index 0000000..6704824
--- /dev/null
+++ b/src/MediaSession.cpp
@@ -0,0 +1,156 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "MediaSession.h"
+#include "MediaEchoThread.h"
+
+//
+// The test media session echoes media received on its sink object to its
+// associated source object.
+//
+
+//
+// Test Sink.
+//
+class SinkI : public AsteriskSCF::Media::V1::StreamSink
+{
+public:
+    SinkI(const std::string& id) :
+        mId(id),
+        mThread(new MediaEchoThread)
+    {
+    }
+
+    void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
+    {
+        mThread->pushFrames(frames);
+    }
+
+    void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+    {
+        mSource = source;
+    }
+
+    AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
+    {
+        return mSource;
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+    {
+        return mFormats;
+    }
+
+    std::string getId(const Ice::Current&)
+    {
+        return mId;
+    }
+
+    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+    {
+        // XXX
+    }
+
+    void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
+    {
+        mThread->setSink(sink);
+    }
+
+private:
+    AsteriskSCF::Media::V1::StreamSourcePrx mSource;
+    AsteriskSCF::Media::V1::FormatSeq mFormats;
+    std::string mId;
+    MediaEchoThreadPtr mThread;
+};
+typedef IceUtil::Handle<SinkI> SinkIPtr;
+
+//
+// Test source.
+//
+class SourceI : public AsteriskSCF::Media::V1::StreamSource
+{
+public:
+
+    SourceI(const std::string& id, const SinkIPtr& sinkServant) :
+        mId(id),
+        mSinkServant(sinkServant)
+    {
+    }
+
+    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+    {
+        mSink = sink;
+        mSinkServant->setRelaySink(mSink);
+    }
+
+    AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
+    {
+        return mSink;
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+    {
+        return mFormats;
+    }
+
+    std::string getId(const Ice::Current&)
+    {
+        return mId;
+    }
+
+    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+    {
+        // XXX
+    }
+
+private:
+    AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+    AsteriskSCF::Media::V1::FormatSeq mFormats;
+    std::string mId;
+    SinkIPtr mSinkServant;
+};
+
+MediaSessionI::MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter) :
+    mId(id),
+    mAdapter(adapter)
+{
+    SinkIPtr sinkServant = new SinkI(mId + "Sink");
+    mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(sinkServant,
+                            mAdapter->getCommunicator()->stringToIdentity(mId + "Sink"))));
+    mSources.push_back(
+        AsteriskSCF::Media::V1::StreamSourcePrx::uncheckedCast(mAdapter->add(
+                    new SourceI(mId + "Source", sinkServant),
+                            mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
+}
+
+MediaSessionI::~MediaSessionI()
+{
+    for(AsteriskSCF::Media::V1::StreamSourceSeq::iterator i = mSources.begin(); i != mSources.end(); ++i)
+    {
+        mAdapter->remove((*i)->ice_getIdentity());
+    }
+
+    for(AsteriskSCF::Media::V1::StreamSinkSeq::iterator i = mSinks.begin(); i != mSinks.end(); ++i)
+    {
+        mAdapter->remove((*i)->ice_getIdentity());
+    }
+}
+
+AsteriskSCF::Media::V1::StreamSourceSeq MediaSessionI::getSources(const Ice::Current&)
+{
+    return mSources;
+}
+
+AsteriskSCF::Media::V1::StreamSinkSeq MediaSessionI::getSinks(const Ice::Current&)
+{
+    return mSinks;
+}
+
+std::string MediaSessionI::getId(const Ice::Current&)
+{
+    return mId;
+}
diff --git a/src/MediaSession.h b/src/MediaSession.h
new file mode 100644
index 0000000..d1a3c09
--- /dev/null
+++ b/src/MediaSession.h
@@ -0,0 +1,31 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <Ice/Ice.h>
+#include <Media/MediaIf.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+class MediaSessionI : public AsteriskSCF::Media::V1::Session
+{
+public:
+    MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter);
+    ~MediaSessionI();
... 401 lines suppressed ...


-- 
asterisk-scf/integration/test_channel.git



More information about the asterisk-scf-commits mailing list