[asterisk-scf-commits] asterisk-scf/integration/test_channel.git branch "master" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Sep 9 12:28:50 CDT 2010
branch "master" has been created
at 7d19cc48eb249ad3be96d0fd644afb146b760efc (commit)
- Log -----------------------------------------------------------------
commit 7d19cc48eb249ad3be96d0fd644afb146b760efc
Author: Brent Eagles <beagles at digium.com>
Date: Thu Sep 9 14:54:15 2010 -0230
Use an alternate method for hooking up the console driver.
diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
index eadbeba..0785e7a 100644
--- a/src/ConsoleDriver.cpp
+++ b/src/ConsoleDriver.cpp
@@ -102,6 +102,10 @@ void ConsoleDriver::run()
{
mCommandHandler->clearlog(arg);
}
+ else
+ {
+ std::cerr << "unknown command" << std::endl;
+ }
}
else
{
diff --git a/src/Service.cpp b/src/Service.cpp
index d7fc5c4..e0a9782 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -39,7 +39,6 @@ int TestChannelDriver::run(int, char*[])
mLogger.getTraceStream() << "Launching AsteriskSCF Session-Oriented Test Channel Service." << std::endl;
mConsoleDriver = new ConsoleDriver;
mConsoleDriver->start();
-
IceStorm::TopicManagerPrx topicManager;
try
{
@@ -61,7 +60,7 @@ int TestChannelDriver::run(int, char*[])
Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("AsteriskSCF.TestChannelService");
adapter->activate();
- AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr servant = AsteriskSCF::TestUtil::TestEndpoint::initialize(adapter, "TestChannel.Locator");
+ AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr servant = AsteriskSCF::TestUtil::TestEndpoint::initialize(mConsoleDriver, adapter, "TestChannel.Locator");
AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx endpointLocatorPrx =
AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx::checkedCast(adapter->add(servant, communicator()->stringToIdentity("TestChannel.Locator")));
@@ -71,14 +70,9 @@ int TestChannelDriver::run(int, char*[])
params->category = "TestChannel";
service_management->addLocatorParams(params, "");
- CommandsPtr cmd(CommandsPtr::dynamicCast(servant));
- if(cmd)
- {
- mConsoleDriver->setHandler(cmd);
- }
communicator()->waitForShutdown();
- // service_management->unregister();
+ service_management->unregister();
return EXIT_SUCCESS;
}
diff --git a/src/TestEndpoint.cpp b/src/TestEndpoint.cpp
index 55f080a..318f358 100644
--- a/src/TestEndpoint.cpp
+++ b/src/TestEndpoint.cpp
@@ -441,6 +441,11 @@ public:
return result;
}
+ CommandsPtr getCommandInterface()
+ {
+ return CommandsPtr::dynamicCast(mManager);
+ }
+
private:
Ice::ObjectAdapterPtr mAdapter;
@@ -449,15 +454,19 @@ private:
AsteriskSCF::Core::Endpoint::V1::BaseEndpointPrx mEndpointProxy;
};
+typedef IceUtil::Handle<EndpointLocatorI> EndpointLocatorIPtr;
+
boost::shared_mutex AsteriskSCF::TestUtil::TestEndpoint::mMutex;
AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr AsteriskSCF::TestUtil::TestEndpoint::mImpl;
-AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr TestEndpoint::initialize(const Ice::ObjectAdapterPtr& adapter, const std::string& id)
+ AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr TestEndpoint::initialize(const ConsoleDriverPtr& console, const Ice::ObjectAdapterPtr& adapter, const std::string& id)
{
boost::unique_lock<boost::shared_mutex> lock(mMutex);
if(mImpl.get() == 0)
{
- mImpl = new EndpointLocatorI(adapter, id);
+ EndpointLocatorIPtr impl(new EndpointLocatorI(adapter, id));
+ console->setHandler(CommandsPtr::dynamicCast(impl->getCommandInterface()));
+ mImpl = AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr::dynamicCast(impl);
}
return mImpl;
}
diff --git a/src/TestEndpoint.h b/src/TestEndpoint.h
index 7eca887..717c785 100644
--- a/src/TestEndpoint.h
+++ b/src/TestEndpoint.h
@@ -11,6 +11,7 @@
#include <Core/Routing/RoutingIf.h>
#include <boost/thread/shared_mutex.hpp>
+#include "ConsoleDriver.h"
namespace AsteriskSCF
{
@@ -19,7 +20,7 @@ namespace TestUtil
class TestEndpoint
{
public:
- static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr initialize(const Ice::ObjectAdapterPtr& adapter, const std::string& id);
+ static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr initialize(const ConsoleDriverPtr& driver, const Ice::ObjectAdapterPtr& adapter, const std::string& id);
private:
static boost::shared_mutex mMutex;
static AsteriskSCF::Core::Routing::V1::EndpointLocatorPtr mImpl;
commit 6a15d6bce50aa60a36b00f162b5fbf167e4ec523
Author: Brent Eagles <beagles at digium.com>
Date: Thu Sep 9 14:36:17 2010 -0230
Fixing up some more of the code and connecting the console driver to the command handler.
diff --git a/src/Commands.h b/src/Commands.h
new file mode 100644
index 0000000..208882f
--- /dev/null
+++ b/src/Commands.h
@@ -0,0 +1,70 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+
+#pragma once
+
+//
+// Interface for the console driver (or other UIs) to interact
+// with the session endpoint.
+//
+
+#include <string>
+#include <vector>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Handle.h>
+
+//
+// Add more commands.
+//
+class Commands : public virtual IceUtil::Shared
+{
+public:
+ virtual ~Commands() {}
+
+ //
+ // Echo any cached media frames.
+ //
+ virtual void echo(const std::string& id) = 0;
+
+ //
+ // Turn on automatic (immediate) echo of media frames.
+ //
+ virtual void echoOn(const std::string& id) = 0;
+
+ //
+ // Turn off automatic echo of media frames.
+ //
+ virtual void echoOff(const std::string& id) = 0;
+
+ //
+ // Answer an invite.
+ //
+ virtual void answer(const std::string& id) = 0;
+
+ //
+ // Hangup!
+ //
+ virtual void hangup(const std::string& id) = 0;
+
+ //
+ // Die horribly.
+ //
+ virtual void die(const std::string& id) = 0;
+
+ //
+ // Dump log
+ //
+ virtual void getlog(const std::string& id, std::vector<std::string>& log) = 0;
+
+ //
+ // Clear log
+ //
+ virtual void clearlog(const std::string& id) = 0;
+};
+
+typedef IceUtil::Handle<Commands> CommandsPtr;
diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
index ca75569..eadbeba 100644
--- a/src/ConsoleDriver.cpp
+++ b/src/ConsoleDriver.cpp
@@ -30,21 +30,85 @@ ConsoleDriver::ConsoleDriver() :
{
}
+void ConsoleDriver::setHandler(const CommandsPtr& handler)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mCommandHandler = handler;
+ mMonitor.notify();
+}
+
void ConsoleDriver::run()
{
std::cerr << "Console driver started" << std::endl;
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
- while(!mDone && _kbhit() == 0)
- {
- mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
- }
- if(!mDone)
+ while(!mDone)
{
- std::string command;
- std::getline(std::cin, command);
- //
- // TODO: do stuff!!! :)
- //
+ while(!mDone && _kbhit() == 0)
+ {
+ mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
+ }
+ if(!mDone)
+ {
+ std::string command;
+ std::getline(std::cin, command);
+ //
+ // TODO: do stuff!!! :)
+ //
+ if(!mCommandHandler)
+ {
+ std::cout << "TODO: Implement commands! Echoing: " << command << std::endl;
+ }
+ else
+ {
+ size_t split = command.find(" ");
+ if(split != std::string::npos)
+ {
+ std::string cmd = command.substr(0, split);
+ std::string arg = command.substr(split+1);
+ if(cmd == "echo")
+ {
+ mCommandHandler->echo(arg);
+ }
+ else if(cmd == "echoon")
+ {
+ mCommandHandler->echoOn(arg);
+ }
+ else if(cmd == "echooff")
+ {
+ mCommandHandler->echoOff(arg);
+ }
+ else if(cmd == "answer")
+ {
+ mCommandHandler->answer(arg);
+ }
+ else if(cmd == "hangup")
+ {
+ mCommandHandler->hangup(arg);
+ }
+ else if(cmd == "die")
+ {
+ mCommandHandler->die(arg);
+ }
+ else if(cmd == "getlog")
+ {
+ std::vector<std::string> result;
+ mCommandHandler->getlog(arg, result);
+ for(std::vector<std::string>::iterator i = result.begin(); i != result.end(); ++i)
+ {
+ std::cerr << *i << std::endl;
+ }
+ }
+ else if(cmd == "clearlog")
+ {
+ mCommandHandler->clearlog(arg);
+ }
+ }
+ else
+ {
+ std::cerr << "parsing error" << std::endl;
+ }
+ }
+ }
}
}
diff --git a/src/ConsoleDriver.h b/src/ConsoleDriver.h
index 4ff5103..465306a 100644
--- a/src/ConsoleDriver.h
+++ b/src/ConsoleDriver.h
@@ -11,6 +11,7 @@
#include <IceUtil/Thread.h>
#include <string>
#include "Logger.h"
+#include "Commands.h"
class ConsoleDriver : public IceUtil::Thread
{
@@ -18,12 +19,14 @@ public:
ConsoleDriver();
void run();
+ void setHandler(const CommandsPtr& handler);
void write(const std::string& msg);
void destroy();
private:
IceUtil::Monitor<IceUtil::Mutex> mMonitor;
bool mDone;
AsteriskSCF::TestUtil::Logger mLogger;
+ CommandsPtr mCommandHandler;
};
typedef IceUtil::Handle<ConsoleDriver> ConsoleDriverPtr;
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
index 4c094fc..a9c5e38 100644
--- a/src/InternalExceptions.h
+++ b/src/InternalExceptions.h
@@ -8,7 +8,7 @@
namespace AsteriskSCF
{
-namespace BridgeService
+namespace TestUtil
{
class ConfigException : public std::exception
{
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
index b9051da..6754533 100644
--- a/src/ListenerManager.h
+++ b/src/ListenerManager.h
@@ -9,7 +9,7 @@
namespace AsteriskSCF
{
-namespace BridgeService
+namespace TestUtil
{
//
// Helper template for classes that need to implement listener style interfaces.
diff --git a/src/MediaEchoThread.cpp b/src/MediaEchoThread.cpp
index 1cc555a..155e59f 100644
--- a/src/MediaEchoThread.cpp
+++ b/src/MediaEchoThread.cpp
@@ -8,27 +8,29 @@
#include "MediaEchoThread.h"
MediaEchoThread::MediaEchoThread():
- mDone(false)
+ mDone(false),
+ mPaused(true)
{
}
void MediaEchoThread::run()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
- while(!mDone && mFrames.size() && mSink != 0)
+ while(!mDone)
{
mMonitor.wait();
- }
- if(!mDone)
- {
- try
- {
- mSink->write(mFrames);
- mFrames.clear();
- }
- catch(const Ice::Exception&)
+
+ if(!mDone && mFrames.size() > 0 && mSink != 0)
{
- // TODO: log and continue
+ try
+ {
+ mSink->write(mFrames);
+ mFrames.clear();
+ }
+ catch(const Ice::Exception&)
+ {
+ // TODO: log and continue
+ }
}
}
}
@@ -37,7 +39,10 @@ void MediaEchoThread::pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFram
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
mFrames.insert(mFrames.end(), newFrames.begin(), newFrames.end());
- mMonitor.notify();
+ if(!mPaused)
+ {
+ mMonitor.notify();
+ }
}
void MediaEchoThread::destroy()
@@ -51,5 +56,27 @@ void MediaEchoThread::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
mSink = prx;
+ if(!mPaused)
+ {
+ mMonitor.notify();
+ }
+}
+
+void MediaEchoThread::echo()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mMonitor.notify();
+}
+
+void MediaEchoThread::pause()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mPaused = true;
+}
+
+void MediaEchoThread::resume()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mPaused = false;
mMonitor.notify();
}
diff --git a/src/MediaEchoThread.h b/src/MediaEchoThread.h
index edd0fee..9f350b5 100644
--- a/src/MediaEchoThread.h
+++ b/src/MediaEchoThread.h
@@ -23,10 +23,18 @@ public:
void destroy();
void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx);
+ //
+ // Use when paused... causes the loop to run once.
+ //
+ void echo();
+ void pause();
+ void resume();
+
private:
AsteriskSCF::Media::V1::StreamSinkPrx mSink;
IceUtil::Monitor<IceUtil::Mutex> mMonitor;
AsteriskSCF::Media::V1::FrameSeq mFrames;
bool mDone;
+ bool mPaused;
};
typedef IceUtil::Handle<MediaEchoThread> MediaEchoThreadPtr;
diff --git a/src/MediaSession.cpp b/src/MediaSession.cpp
index 6704824..1a45cc5 100644
--- a/src/MediaSession.cpp
+++ b/src/MediaSession.cpp
@@ -6,125 +6,23 @@
* All rights reserved.
*/
#include "MediaSession.h"
-#include "MediaEchoThread.h"
//
// The test media session echoes media received on its sink object to its
// associated source object.
//
-//
-// Test Sink.
-//
-class SinkI : public AsteriskSCF::Media::V1::StreamSink
-{
-public:
- SinkI(const std::string& id) :
- mId(id),
- mThread(new MediaEchoThread)
- {
- }
-
- void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
- {
- mThread->pushFrames(frames);
- }
-
- void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
- {
- mSource = source;
- }
-
- AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
- {
- return mSource;
- }
-
- AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
- {
- return mFormats;
- }
-
- std::string getId(const Ice::Current&)
- {
- return mId;
- }
-
- void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
- {
- // XXX
- }
-
- void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
- {
- mThread->setSink(sink);
- }
-
-private:
- AsteriskSCF::Media::V1::StreamSourcePrx mSource;
- AsteriskSCF::Media::V1::FormatSeq mFormats;
- std::string mId;
- MediaEchoThreadPtr mThread;
-};
-typedef IceUtil::Handle<SinkI> SinkIPtr;
-
-//
-// Test source.
-//
-class SourceI : public AsteriskSCF::Media::V1::StreamSource
-{
-public:
-
- SourceI(const std::string& id, const SinkIPtr& sinkServant) :
- mId(id),
- mSinkServant(sinkServant)
- {
- }
-
- void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
- {
- mSink = sink;
- mSinkServant->setRelaySink(mSink);
- }
-
- AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
- {
- return mSink;
- }
-
- AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
- {
- return mFormats;
- }
-
- std::string getId(const Ice::Current&)
- {
- return mId;
- }
-
- void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
- {
- // XXX
- }
-
-private:
- AsteriskSCF::Media::V1::StreamSinkPrx mSink;
- AsteriskSCF::Media::V1::FormatSeq mFormats;
- std::string mId;
- SinkIPtr mSinkServant;
-};
-
MediaSessionI::MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter) :
mId(id),
mAdapter(adapter)
{
- SinkIPtr sinkServant = new SinkI(mId + "Sink");
- mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(sinkServant,
+ mSink = new SinkI(mId + "Sink");
+ mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(mSink,
mAdapter->getCommunicator()->stringToIdentity(mId + "Sink"))));
+ mSource = new SourceI(mId + "Source", mSink);
mSources.push_back(
AsteriskSCF::Media::V1::StreamSourcePrx::uncheckedCast(mAdapter->add(
- new SourceI(mId + "Source", sinkServant),
- mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
+ mSource, mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
}
MediaSessionI::~MediaSessionI()
@@ -154,3 +52,18 @@ std::string MediaSessionI::getId(const Ice::Current&)
{
return mId;
}
+
+void MediaSessionI::echoOn()
+{
+ mSink->echoOn();
+}
+
+void MediaSessionI::echoOff()
+{
+ mSink->echoOff();
+}
+
+void MediaSessionI::echo()
+{
+ mSink->echo();
+}
diff --git a/src/MediaSession.h b/src/MediaSession.h
index d1a3c09..a49c0d1 100644
--- a/src/MediaSession.h
+++ b/src/MediaSession.h
@@ -10,6 +10,137 @@
#include <Media/MediaIf.h>
#include <boost/thread/shared_mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include "MediaEchoThread.h"
+
+//
+// TODO: Inline methods should be moved into CPP file.
+//
+
+//
+// Test Sink.
+//
+class SinkI : public AsteriskSCF::Media::V1::StreamSink
+{
+public:
+ SinkI(const std::string& id) :
+ mId(id),
+ mThread(new MediaEchoThread)
+ {
+ }
+
+ ~SinkI()
+ {
+ if(mThread)
+ {
+ mThread->destroy();
+ }
+ }
+
+ void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
+ {
+ mThread->pushFrames(frames);
+ }
+
+ void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+ {
+ mSource = source;
+ }
+
+ AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
+ {
+ return mSource;
+ }
+
+ AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+ {
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ return mId;
+ }
+
+ void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+ {
+ // XXX
+ }
+
+ void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
+ {
+ mThread->setSink(sink);
+ }
+
+ void echo()
+ {
+ mThread->echo();
+ }
+
+ void echoOn()
+ {
+ mThread->resume();
+ }
+
+ void echoOff()
+ {
+ mThread->pause();
+ }
+
+private:
+ AsteriskSCF::Media::V1::StreamSourcePrx mSource;
+ AsteriskSCF::Media::V1::FormatSeq mFormats;
+ std::string mId;
+ MediaEchoThreadPtr mThread;
+};
+typedef IceUtil::Handle<SinkI> SinkIPtr;
+
+//
+// Test source.
+//
+class SourceI : public AsteriskSCF::Media::V1::StreamSource
+{
+public:
+
+ SourceI(const std::string& id, const SinkIPtr& sinkServant) :
+ mId(id),
+ mSinkServant(sinkServant)
+ {
+ }
+
+ void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ {
+ mSink = sink;
+ mSinkServant->setRelaySink(mSink);
+ }
+
+ AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
+ {
+ return mSink;
+ }
+
+ AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+ {
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ return mId;
+ }
+
+ void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+ {
+ // XXX
+ }
+
+private:
+ AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+ AsteriskSCF::Media::V1::FormatSeq mFormats;
+ std::string mId;
+ SinkIPtr mSinkServant;
+};
+
+typedef IceUtil::Handle<SourceI> SourceIPtr;
class MediaSessionI : public AsteriskSCF::Media::V1::Session
{
@@ -20,12 +151,24 @@ public:
AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
+
+ //
+ // internal test methods.
+ //
+ void echoOn();
+ void echoOff();
+ void echo();
private:
boost::shared_mutex mMutex;
std::string mId;
Ice::ObjectAdapterPtr mAdapter;
+ SinkIPtr mSink;
+ SourceIPtr mSource;
+
AsteriskSCF::Media::V1::StreamSourceSeq mSources;
AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
};
+
+typedef IceUtil::Handle<MediaSessionI> MediaSessionIPtr;
diff --git a/src/Service.cpp b/src/Service.cpp
index 65b3924..d7fc5c4 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -13,6 +13,7 @@
#include "Logger.h"
#include "TestEndpoint.h"
#include "ConsoleDriver.h"
+#include "Commands.h"
class TestChannelDriver : public Ice::Application
{
@@ -69,8 +70,12 @@ int TestChannelDriver::run(int, char*[])
AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr params = new AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams();
params->category = "TestChannel";
service_management->addLocatorParams(params, "");
-
+ CommandsPtr cmd(CommandsPtr::dynamicCast(servant));
+ if(cmd)
+ {
+ mConsoleDriver->setHandler(cmd);
+ }
communicator()->waitForShutdown();
// service_management->unregister();
diff --git a/src/TestEndpoint.cpp b/src/TestEndpoint.cpp
index 1f7f497..55f080a 100644
--- a/src/TestEndpoint.cpp
+++ b/src/TestEndpoint.cpp
@@ -19,10 +19,19 @@
#include "Logger.h"
#include "MediaSession.h"
+#include "Commands.h"
+
+//
+// Forward declarations.
+//
class EndpointManager;
typedef IceUtil::Handle<EndpointManager> EndpointManagerPtr;
+//
+// Interface that defines the contract between the SessionI session implementation (child)
+// and its parent (EndpointManager).
+//
class InternalManagerIf : virtual public IceUtil::Shared
{
public:
@@ -31,10 +40,216 @@ public:
};
typedef IceUtil::Handle<InternalManagerIf> InternalManagerPtr;
+
+//
+// Specialization of ListenerManagerT helper template for SessionListeners. Basically takes care of invoking the
+// appropriate method on a publisher.
+//
+class SessionListenerMgr : public AsteriskSCF::TestUtil::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>
+{
+public:
+ SessionListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+ AsteriskSCF::TestUtil::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>(communicator, topicName)
+ {
+ mPublisher = AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(mTopic->getPublisher());
+ }
+
+ //
+ // TODO:
+ //
+ void connected()
+ {
+ mPublisher->connected(mSession);
+ }
+
+ void flashed()
+ {
+ mPublisher->flashed(mSession);
+ }
+
+ void held()
+ {
+ mPublisher->held(mSession);
+ }
+
+ void progressing(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
+ {
+ mPublisher->progressing(mSession, response);
+ }
+
+ void ringing()
+ {
+ mPublisher->ringing(mSession);
+ }
+
+ void stopped(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
+ {
+ mPublisher->stopped(mSession, response);
+ }
+
+ void unheld()
+ {
+ mPublisher->unheld(mSession);
+ }
+
+ void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx)
+ {
+ mSession = prx;
+ }
+
+protected:
+ AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mPublisher;
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mSession;
+};
+
+typedef IceUtil::Handle<SessionListenerMgr> SessionListenerMgrPtr;
+
+//
+// Needs some intelligence:
+// - scheduled ringing
+// - etc.
+class SessionI : public AsteriskSCF::SessionCommunications::V1::Session
+{
+public:
+ SessionI(const InternalManagerPtr& m, const AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx& prx, const std::string& id,
+ const Ice::ObjectAdapterPtr& adapter) :
+ mEndpointManager(m),
+ mEndpointPrx(prx),
+ mId(id),
+ mListeners(new SessionListenerMgr(adapter->getCommunicator(), mId))
+ {
+ }
+
+ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr addListener(
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ return 0;
+ }
+
+ void connect(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void flash(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx getEndpoint(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ return mEndpointPrx;
+ }
+
+ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ return 0;
+ }
+
+ AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current& current)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ if(!mMediaSession)
+ {
+ mMediaSession = AsteriskSCF::Media::V1::SessionPrx::uncheckedCast(
+ current.adapter->add(new MediaSessionI(mId + "Media", current.adapter),
+ current.adapter->getCommunicator()->stringToIdentity(mId + "Media")));
+ }
+ return mMediaSession;
+ };
+
+ void hold(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void progress(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void ring(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void start(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void unhold(const Ice::Current&)
+ {
+ mEndpointManager->log(mId, __FUNCTION__);
+ }
+
+ void echo()
+ {
+ mMediaServant->echo();
+ }
+
+ void echoOn()
+ {
+ mMediaServant->echoOn();
+ }
+
+ void echoOff()
+ {
+ mMediaServant->echoOff();
+ }
+
+ void answer()
+ {
+ mListeners->connected();
+ }
+
+ void hangup()
+ {
+ mListeners->stopped(new AsteriskSCF::SessionCommunications::V1::ResponseCode);
+ }
+
+ void die()
+ {
+ }
+
+ void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx)
+ {
+ mListeners->setProxy(prx);
+ }
+
+private:
+ boost::shared_mutex mMutex;
+ InternalManagerPtr mEndpointManager;
+ AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx mEndpointPrx;
+ const std::string mId;
+ MediaSessionIPtr mMediaServant;
+ AsteriskSCF::Media::V1::SessionPrx mMediaSession;
+ SessionListenerMgrPtr mListeners;
+};
+
+typedef IceUtil::Handle<SessionI> SessionIPtr;
+
class EndpointManager :
virtual public IceUtil::Shared,
virtual public InternalManagerIf,
- virtual public AsteriskSCF::SessionCommunications::V1::SessionEndpoint
+ virtual public AsteriskSCF::SessionCommunications::V1::SessionEndpoint,
+ virtual public Commands
{
public:
EndpointManager(const Ice::ObjectAdapterPtr& adapter, const std::string& id) :
@@ -50,9 +265,11 @@ public:
{
InternalSessionInfo info;
info.id = mId + "." + destination + "." + IceUtil::generateUUID();
- AsteriskSCF::SessionCommunications::V1::SessionPtr session = new SessionI(this,
- AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::checkedCast(current.adapter->createProxy(current.id)), info.id);
- info.session = AsteriskSCF::SessionCommunications::V1::SessionPrx::checkedCast(current.adapter->add(session, current.adapter->getCommunicator()->stringToIdentity(info.id)));
+ info.servant = new SessionI(this,
+ AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::checkedCast(current.adapter->createProxy(current.id)),
+ info.id, current.adapter);
+ info.session = AsteriskSCF::SessionCommunications::V1::SessionPrx::checkedCast(current.adapter->add(info.servant, current.adapter->getCommunicator()->stringToIdentity(info.id)));
+ info.servant->setProxy(info.session);
boost::unique_lock<boost::shared_mutex> lock(mMutex);
mSessions[info.id] = info;
return info.session;
@@ -85,120 +302,119 @@ public:
}
}
-private:
-
- struct InternalSessionInfo
+ void echo(const std::string& id)
{
- AsteriskSCF::SessionCommunications::V1::SessionPrx session;
- std::string id;
- std::vector<std::string> log;
- };
-
- typedef std::map<std::string, InternalSessionInfo> SessionMap;
- SessionMap mSessions;
- boost::shared_mutex mMutex;
- AsteriskSCF::TestUtil::Logger mLogger;
- Ice::ObjectAdapterPtr mAdapter;
- std::string mId;
-
- class EndpointManagerPtr;
-
- class SessionI : public AsteriskSCF::SessionCommunications::V1::Session
- {
- public:
- SessionI(const InternalManagerPtr& m, const AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx& prx, const std::string& id) :
- mEndpointManager(m),
- mEndpointPrx(prx),
- mId(id)
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
+ i->second.servant->echo();
+ return;
}
-
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr addListener(
- const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
- const Ice::Current&)
+ std::cerr << "Unknown id" << std::endl;
+ }
+
+ void echoOn(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
- return 0;
+ i->second.servant->echoOn();
+ return;
}
+ std::cerr << "Unknown id" << std::endl;
+ }
- void connect(const Ice::Current&)
+ void echoOff(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
+ i->second.servant->echoOff();
+ return;
}
+ std::cerr << "Unknown id" << std::endl;
+ }
- void flash(const Ice::Current&)
+ void answer(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
+ i->second.servant->answer();
+ return;
}
-
- AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx getEndpoint(const Ice::Current&)
+ std::cerr << "Unknown id" << std::endl;
+ }
+
+ void hangup(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
- return mEndpointPrx;
+ i->second.servant->hangup();
+ return;
}
+ std::cerr << "Unknown id" << std::endl;
+ }
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&)
+ void die(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
- return 0;
+ mAdapter->remove(i->second.session->ice_getIdentity());
+ return;
}
-
- AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current& current)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- boost::unique_lock<boost::shared_mutex> lock(mMutex);
- if(!mMediaSession)
- {
- mMediaSession = AsteriskSCF::Media::V1::SessionPrx::uncheckedCast(
- current.adapter->add(new MediaSessionI(mId + "Media", current.adapter),
- current.adapter->getCommunicator()->stringToIdentity(mId + "Media")));
- }
- return mMediaSession;
- };
-
- void hold(const Ice::Current&)
+ std::cerr << "Unknown id" << std::endl;
+ }
+
+ void getlog(const std::string& id, std::vector<std::string>& log)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
+ log = i->second.log;
+ return;
}
+ std::cerr << "Unknown id" << std::endl;
+ }
- void progress(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+ void clearlog(const std::string& id)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ SessionMap::iterator i = mSessions.find(id);
+ if(i != mSessions.end())
{
- mEndpointManager->log(mId, __FUNCTION__);
+ i->second.log.clear();
+ return;
}
+ std::cerr << "Unknown id" << std::endl;
+ }
- void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
- const Ice::Current&)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- }
+private:
- void ring(const Ice::Current&)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- }
-
- void start(const Ice::Current&)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- }
-
- void stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- }
-
- void unhold(const Ice::Current&)
- {
- mEndpointManager->log(mId, __FUNCTION__);
- }
-
- private:
- boost::shared_mutex mMutex;
- InternalManagerPtr mEndpointManager;
- AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx mEndpointPrx;
- const std::string mId;
- AsteriskSCF::Media::V1::SessionPrx mMediaSession;
+ struct InternalSessionInfo
+ {
+ AsteriskSCF::SessionCommunications::V1::SessionPrx session;
+ SessionIPtr servant;
+ std::string id;
+ std::vector<std::string> log;
};
+
+ typedef std::map<std::string, InternalSessionInfo> SessionMap;
+ SessionMap mSessions;
+ boost::shared_mutex mMutex;
+ AsteriskSCF::TestUtil::Logger mLogger;
+ Ice::ObjectAdapterPtr mAdapter;
+ std::string mId;
+
};
namespace AsteriskSCF
commit 2e3fb4b95e34c6c8b6c18e098ef535323612bf18
Author: Brent Eagles <beagles at digium.com>
Date: Thu Sep 9 12:33:51 2010 -0230
Initial commit of test channel driver component.
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..eba8938
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,6 @@
+[submodule "cmake"]
+ path = cmake
+ url = ssh://git@git.asterisk.org/asterisk-scf/release/cmake
+[submodule "slice"]
+ path = slice
+ url = ssh://git@git.asterisk.org/asterisk-scf/integration/slice
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..9c7be7c
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,17 @@
+# Service locator build system
+
+# Minimum we require is 2.6, any lower and stuff would fail horribly
+cmake_minimum_required(VERSION 2.6)
+
+# Include common Hydra build infrastructure
+include(cmake/Hydra_v4.cmake)
+
+# This project is C++ based and requires a minimum of 3.4
+hydra_project("Test Channel" 3.4 CXX)
+
+# Take care of slice definitions
+add_subdirectory(slice EXCLUDE_FROM_ALL)
+
+# Take care of the source code for this project
+add_subdirectory(src)
+
diff --git a/cmake b/cmake
new file mode 160000
index 0000000..1e7a172
--- /dev/null
+++ b/cmake
@@ -0,0 +1 @@
+Subproject commit 1e7a1725fe9e2d176a2c26820d6bcd96c6c3939e
diff --git a/slice b/slice
new file mode 160000
index 0000000..14413db
--- /dev/null
+++ b/slice
@@ -0,0 +1 @@
+Subproject commit 14413db47bfae3d1ff57d36e80cfe700d755ae0b
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..91c0483
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,15 @@
+hydra_component_init(test_channel CXX)
+hydra_component_add_slice(test_channel EndpointIf)
+hydra_component_add_slice(test_channel ComponentServiceIf)
+hydra_component_add_slice(test_channel SessionCommunicationsIf)
+hydra_component_add_slice(test_channel RoutingIf)
+hydra_component_add_file(test_channel Service.cpp)
+hydra_component_add_file(test_channel MediaEchoThread.cpp)
+hydra_component_add_file(test_channel MediaSession.cpp)
+hydra_component_add_file(test_channel ConsoleDriver.cpp)
+hydra_component_add_file(test_channel TestEndpoint.cpp)
+hydra_component_add_ice_libraries(test_channel IceStorm)
+hydra_component_add_boost_libraries(test_channel thread)
+hydra_component_add_boost_libraries(test_channel date_time)
+hydra_component_build_standalone(test_channel)
+hydra_component_install(test_channel RUNTIME bin "Test Channel Emulator." Core)
diff --git a/src/ConsoleDriver.cpp b/src/ConsoleDriver.cpp
new file mode 100644
index 0000000..ca75569
--- /dev/null
+++ b/src/ConsoleDriver.cpp
@@ -0,0 +1,62 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include <iostream>
+#include <IceUtil/Thread.h>
+
+#include "ConsoleDriver.h"
+
+#ifndef _WIN32
+#include <unistd.h>
+int _kbhit()
+{
+ struct timeval tv;
+ fd_set fds;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ FD_ZERO(&fds);
+ FD_SET(STDIN_FILENO, &fds);
+ select(STDIN_FILENO + 1, &fds, 0, 0, &tv);
+ return FD_ISSET(0, &fds);
+}
+#endif
+
+ConsoleDriver::ConsoleDriver() :
+ mDone(false)
+{
+}
+
+void ConsoleDriver::run()
+{
+ std::cerr << "Console driver started" << std::endl;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ while(!mDone && _kbhit() == 0)
+ {
+ mMonitor.timedWait(IceUtil::Time::milliSeconds(1000));
+ }
+ if(!mDone)
+ {
+ std::string command;
+ std::getline(std::cin, command);
+ //
+ // TODO: do stuff!!! :)
+ //
+ }
+}
+
+void ConsoleDriver::write(const std::string& message)
+{
+ std::cout << message << std::endl;
+}
+
+void ConsoleDriver::destroy()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mLogger.getDebugStream() << __FUNCTION__ << " : destroyed. " << std::endl;
+ mDone = true;
+ mMonitor.notifyAll();
+}
diff --git a/src/ConsoleDriver.h b/src/ConsoleDriver.h
new file mode 100644
index 0000000..4ff5103
--- /dev/null
+++ b/src/ConsoleDriver.h
@@ -0,0 +1,29 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+
+#include <IceUtil/Monitor.h>
+#include <IceUtil/Thread.h>
+#include <string>
+#include "Logger.h"
+
+class ConsoleDriver : public IceUtil::Thread
+{
+public:
+ ConsoleDriver();
+
+ void run();
+ void write(const std::string& msg);
+ void destroy();
+private:
+ IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+ bool mDone;
+ AsteriskSCF::TestUtil::Logger mLogger;
+};
+
+typedef IceUtil::Handle<ConsoleDriver> ConsoleDriverPtr;
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
new file mode 100644
index 0000000..4c094fc
--- /dev/null
+++ b/src/InternalExceptions.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <exception>
+#include <string>
+#include <sstream>
+
+// TODO: Some exceptions that might be better off in the shared codebase.
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ class ConfigException : public std::exception
+ {
+ public:
+
+ ConfigException(const std::string& propertyName, const std::string& message)
+ {
+ std::stringstream msg;
+ msg << propertyName << " configuration error: ";
+ if(message.size() != 0)
+ {
+ msg << message;
+ }
+ else
+ {
+ msg << "(no message)";
+ }
+ mWhat = msg.str();
+ }
+
+ ~ConfigException() throw()
+ {
+ }
+
+ const char* what() const
+ throw()
+ {
+ return mWhat.c_str();
+ }
+
+ private:
+ std::string mWhat;
+ };
+}
+}
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
new file mode 100644
index 0000000..b9051da
--- /dev/null
+++ b/src/ListenerManager.h
@@ -0,0 +1,140 @@
+#pragma once
+
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <string>
+#include <vector>
+#include "InternalExceptions.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ //
+ // Helper template for classes that need to implement listener style interfaces.
+ //
+ template <class T>
+ class ListenerManagerT : public IceUtil::Shared
+ {
+ typedef std::vector<T> ListenerSeq;
+ typename std::vector<T>::iterator ListenerIter;
+ public:
+ ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+ mCommunicator(communicator),
+ mTopicName(topicName)
+ {
+ //
+ // TODO: While this is being concocted for a single component, it would make more sense
+ // to have the topic manager passed in during construction for more general usage.
+ //
+ const std::string propertyName = "TopicManager.Proxy";
+ std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
+ if(topicManagerProperty.size() == 0)
+ {
+ throw ConfigException(propertyName, "Topic manager proxy property missing. "
+ "Unable to initialize listener support.");
+ }
+
+ IceStorm::TopicManagerPrx topicManager;
+ try
+ {
+ IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ if(!topicManager)
+ {
+ throw ConfigException(propertyName, "Topic manager proxy is not a valid proxy or is unreachable");
+ }
+
+ try
+ {
+ mTopic = topicManager->retrieve(mTopicName);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ }
+
+ if(!mTopic)
+ {
+ try
+ {
+ mTopic = topicManager->create(mTopicName);
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ //
+ // In case there is a race condition when creating the topic.
+ //
+ mTopic = topicManager->retrieve(mTopicName);
+ }
+ }
+
+ if(!mTopic)
+ {
+ throw ConfigException(propertyName,
+ std::string("unable to create topic with the provided configuration :") + mTopicName);
+ }
+ }
+
+ virtual ~ListenerManagerT()
+ {
+ //
+ // TODO: Destroy topic.
+ //
+ }
+
+ //
+ // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
+ // and whatnot are not flagged.
+ //
+ void addListener(const T& listener)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if(mListeners.find(listener) == mListeners.end())
+ {
+ mListeners.push_back(listener);
+ }
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+ try
+ {
+ mTopic->subscribeAndGetPublisher(qos, listener);
+ }
+ catch(const IceStorm::AlreadySubscribed&)
+ {
+ //
+ // This indicates some kind of inconsistent state.
+ //
+ }
+ }
+
+ void removeListener(const T& listener)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ typename std::vector<T>::iterator i = mListeners.find(listener);
+ if(i != mListeners.end())
+ {
+ mListeners.erase(i);
+ mTopic->unsubscribe(listener);
+ }
+ }
+
+ std::vector<T> getListeners()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ std::vector<T> result(mListeners);
+ return result;
+ }
+
+ protected:
+ boost::shared_mutex mLock;
+ Ice::CommunicatorPtr mCommunicator;
+ std::string mTopicName;
+ IceStorm::TopicPrx mTopic;
+ ListenerSeq mListeners;
+ };
+}
+}
diff --git a/src/Logger.h b/src/Logger.h
new file mode 100644
index 0000000..f2c11d6
--- /dev/null
+++ b/src/Logger.h
@@ -0,0 +1,53 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+
+#include <iostream>
+
+namespace AsteriskSCF
+{
+namespace TestUtil
+{
+ //
+ // Place holder log stream holder.
+ //
+ class Logger
+ {
+ public:
+ Logger() :
+ mTraceDebug(true)
+ {
+ }
+
+ std::ostream& getTraceStream()
+ {
+ return std::cerr;
+ }
+
+ std::ostream& getErrorStream()
+ {
+ return std::cerr;
+ }
+
+ std::ostream& getDebugStream()
+ {
+ return std::cerr;
+ }
+
+ bool debugTracing() { return mTraceDebug; }
+
+ std::ostream& getInfoStream()
+ {
+ return std::cerr;
+ }
+
+ private:
+ bool mTraceDebug;
+ };
+} // End of namespace TestUtil
+} // End of namespace AsteriskSCF
diff --git a/src/MediaEchoThread.cpp b/src/MediaEchoThread.cpp
new file mode 100644
index 0000000..1cc555a
--- /dev/null
+++ b/src/MediaEchoThread.cpp
@@ -0,0 +1,55 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "MediaEchoThread.h"
+
+MediaEchoThread::MediaEchoThread():
+ mDone(false)
+{
+}
+
+void MediaEchoThread::run()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ while(!mDone && mFrames.size() && mSink != 0)
+ {
+ mMonitor.wait();
+ }
+ if(!mDone)
+ {
+ try
+ {
+ mSink->write(mFrames);
+ mFrames.clear();
+ }
+ catch(const Ice::Exception&)
+ {
+ // TODO: log and continue
+ }
+ }
+}
+
+void MediaEchoThread::pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFrames)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mFrames.insert(mFrames.end(), newFrames.begin(), newFrames.end());
+ mMonitor.notify();
+}
+
+void MediaEchoThread::destroy()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mDone = true;
+ mMonitor.notify();
+}
+
+void MediaEchoThread::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mSink = prx;
+ mMonitor.notify();
+}
diff --git a/src/MediaEchoThread.h b/src/MediaEchoThread.h
new file mode 100644
index 0000000..edd0fee
--- /dev/null
+++ b/src/MediaEchoThread.h
@@ -0,0 +1,32 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <Ice/Ice.h>
+#include <Media/MediaIf.h>
+
+//
+// A thread that handles taking frames that were written to the sink
+// implementation and writing them out to another sink (usually the one
+// associated with a related source).
+//
+class MediaEchoThread : public virtual IceUtil::Thread
+{
+public:
+ MediaEchoThread();
+ void run();
+ void pushFrames(const AsteriskSCF::Media::V1::FrameSeq& newFrames);
+ void destroy();
+ void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& prx);
+
+private:
+ AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+ IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+ AsteriskSCF::Media::V1::FrameSeq mFrames;
+ bool mDone;
+};
+typedef IceUtil::Handle<MediaEchoThread> MediaEchoThreadPtr;
diff --git a/src/MediaSession.cpp b/src/MediaSession.cpp
new file mode 100644
index 0000000..6704824
--- /dev/null
+++ b/src/MediaSession.cpp
@@ -0,0 +1,156 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "MediaSession.h"
+#include "MediaEchoThread.h"
+
+//
+// The test media session echoes media received on its sink object to its
+// associated source object.
+//
+
+//
+// Test Sink.
+//
+class SinkI : public AsteriskSCF::Media::V1::StreamSink
+{
+public:
+ SinkI(const std::string& id) :
+ mId(id),
+ mThread(new MediaEchoThread)
+ {
+ }
+
+ void write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
+ {
+ mThread->pushFrames(frames);
+ }
+
+ void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+ {
+ mSource = source;
+ }
+
+ AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&)
+ {
+ return mSource;
+ }
+
+ AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+ {
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ return mId;
+ }
+
+ void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+ {
+ // XXX
+ }
+
+ void setRelaySink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink)
+ {
+ mThread->setSink(sink);
+ }
+
+private:
+ AsteriskSCF::Media::V1::StreamSourcePrx mSource;
+ AsteriskSCF::Media::V1::FormatSeq mFormats;
+ std::string mId;
+ MediaEchoThreadPtr mThread;
+};
+typedef IceUtil::Handle<SinkI> SinkIPtr;
+
+//
+// Test source.
+//
+class SourceI : public AsteriskSCF::Media::V1::StreamSource
+{
+public:
+
+ SourceI(const std::string& id, const SinkIPtr& sinkServant) :
+ mId(id),
+ mSinkServant(sinkServant)
+ {
+ }
+
+ void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ {
+ mSink = sink;
+ mSinkServant->setRelaySink(mSink);
+ }
+
+ AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&)
+ {
+ return mSink;
+ }
+
+ AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&)
+ {
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ return mId;
+ }
+
+ void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&)
+ {
+ // XXX
+ }
+
+private:
+ AsteriskSCF::Media::V1::StreamSinkPrx mSink;
+ AsteriskSCF::Media::V1::FormatSeq mFormats;
+ std::string mId;
+ SinkIPtr mSinkServant;
+};
+
+MediaSessionI::MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter) :
+ mId(id),
+ mAdapter(adapter)
+{
+ SinkIPtr sinkServant = new SinkI(mId + "Sink");
+ mSinks.push_back(AsteriskSCF::Media::V1::StreamSinkPrx::uncheckedCast(mAdapter->add(sinkServant,
+ mAdapter->getCommunicator()->stringToIdentity(mId + "Sink"))));
+ mSources.push_back(
+ AsteriskSCF::Media::V1::StreamSourcePrx::uncheckedCast(mAdapter->add(
+ new SourceI(mId + "Source", sinkServant),
+ mAdapter->getCommunicator()->stringToIdentity(mId + "Source"))));
+}
+
+MediaSessionI::~MediaSessionI()
+{
+ for(AsteriskSCF::Media::V1::StreamSourceSeq::iterator i = mSources.begin(); i != mSources.end(); ++i)
+ {
+ mAdapter->remove((*i)->ice_getIdentity());
+ }
+
+ for(AsteriskSCF::Media::V1::StreamSinkSeq::iterator i = mSinks.begin(); i != mSinks.end(); ++i)
+ {
+ mAdapter->remove((*i)->ice_getIdentity());
+ }
+}
+
+AsteriskSCF::Media::V1::StreamSourceSeq MediaSessionI::getSources(const Ice::Current&)
+{
+ return mSources;
+}
+
+AsteriskSCF::Media::V1::StreamSinkSeq MediaSessionI::getSinks(const Ice::Current&)
+{
+ return mSinks;
+}
+
+std::string MediaSessionI::getId(const Ice::Current&)
+{
+ return mId;
+}
diff --git a/src/MediaSession.h b/src/MediaSession.h
new file mode 100644
index 0000000..d1a3c09
--- /dev/null
+++ b/src/MediaSession.h
@@ -0,0 +1,31 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <Ice/Ice.h>
+#include <Media/MediaIf.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+class MediaSessionI : public AsteriskSCF::Media::V1::Session
+{
+public:
+ MediaSessionI(const std::string& id, const Ice::ObjectAdapterPtr& adapter);
+ ~MediaSessionI();
... 401 lines suppressed ...
--
asterisk-scf/integration/test_channel.git
More information about the asterisk-scf-commits
mailing list