[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "bridge-replication" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Feb 14 10:04:32 CST 2011
branch "bridge-replication" has been created
at d416f9ab138874980cf332c1dbd5a4e8ec354b62 (commit)
- Log -----------------------------------------------------------------
commit d416f9ab138874980cf332c1dbd5a4e8ec354b62
Merge: 37173c7 8802a77
Author: Brent Eagles <beagles at digium.com>
Date: Mon Feb 14 12:32:45 2011 -0330
Initial swipe at replication!
diff --cc src/BridgeManagerImpl.cpp
index 4c78169,26f5580..e0cd0e8
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@@ -17,9 -17,24 +17,25 @@@
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
#include <boost/thread/locks.hpp>
+#include <AsteriskSCF/logger.h>
#include "BridgeListenerMgr.h"
#include "BridgeManagerListenerMgr.h"
+ #include <AsteriskSCF/logger.h>
+ #include "BridgeImpl.h"
+ #include "BridgeManagerListenerMgr.h"
+ #include "BridgeReplicatorIf.h"
+ #include <AsteriskSCF/System/ExceptionsIf.h>
+ #include <AsteriskSCF/System/Component/ComponentServiceIf.h>
+
+ using namespace AsteriskSCF::System::Logging;
+ using namespace AsteriskSCF::System;
+ using namespace AsteriskSCF::System::V1;
+ using namespace AsteriskSCF::System::Component::V1;
+ using namespace AsteriskSCF::SessionCommunications::V1;
+ using namespace AsteriskSCF::Core::Discovery::V1;
+ using namespace AsteriskSCF::BridgeService;
+ using namespace AsteriskSCF::Bridge::V1;
+ using namespace std;
//
// Compiled in constants.
@@@ -53,25 -141,24 +142,23 @@@ private
const Ice::Current mCurrent;
};
- } // End of namespace BridgeService
- } // End of namespace AsteriskSCF
- AsteriskSCF::BridgeService::BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter,
- const std::string& name, const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& prx,
- const AsteriskSCF::System::Logging::Logger& logger) :
-
+ BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const ReplicatorSmartPrx& replicator,
+ const Logger& logger) :
mName(name),
- mShuttingDown(false),
- mSuspended(false),
+ mActivated(false),
mAdapter(adapter),
- mSourceProxy(prx),
- mLogger(logger)
+ mReplicator(replicator),
+ mLogger(logger),
+ mState(new BridgeManagerStateItem)
{
mLogger(Info) << "Created AsteriskSCF Session-Oriented Bridge Manager." ;
- mListeners =
- new AsteriskSCF::BridgeService::BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+ mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+ mState->runningState = ServiceState::Running;
+ mState->key = name;
}
- AsteriskSCF::BridgeService::BridgeManagerImpl::~BridgeManagerImpl()
+ BridgeManagerImpl::~BridgeManagerImpl()
{
if (mListeners)
{
diff --cc src/BridgeManagerImpl.h
index 43671fe,5f768ae..f466000
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@@ -24,59 -26,51 +26,52 @@@
namespace AsteriskSCF
{
+
namespace BridgeService
{
- class BridgeManagerImpl : public SessionCommunications::V1::BridgeManager
+
+ class BridgeManagerServant : public virtual AsteriskSCF::SessionCommunications::V1::BridgeManager
{
public:
+ virtual bool destroyed() = 0;
- BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const std::string& name, const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& prx,
- const AsteriskSCF::System::Logging::Logger& logger);
- ~BridgeManagerImpl();
+ /**
+ *
+ * Replication helper methods
+ * TODO: complete documentation.
+ *
+ **/
- //
- // AsteriskSCF::SessionCommunications::V1::BridgeManager Interface
- //
- SessionCommunications::V1::BridgePrx createBridge(
- const SessionCommunications::V1::SessionSeq& endpoints,
- const SessionCommunications::V1::BridgeListenerPrx& listener,
- const Ice::Current& current);
+ virtual AsteriskSCF::Bridge::V1::BridgeManagerStateItemPtr getState() = 0;
- void addListener(const SessionCommunications::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
- void removeListener(const SessionCommunications::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
+ virtual void updateState(const AsteriskSCF::Bridge::V1::BridgeManagerStateItemPtr& state) = 0;
- void addDefaultBridgeListener(const SessionCommunications::V1::BridgeListenerPrx& newListener, const Ice::Current& current);
- void removeDefaultBridgeListener(const SessionCommunications::V1::BridgeListenerPrx& toRemove, const Ice::Current& current);
+ virtual std::vector<BridgeServantPtr> getBridges() = 0;
- SessionCommunications::V1::BridgeSeq listBridges(const Ice::Current& current);
- void shutdown(const Ice::Current& current);
+ virtual void activate() = 0;
- struct BridgeInfo
- {
- BridgeImplPtr servant;
- SessionCommunications::V1::BridgePrx proxy;
- };
+ virtual std::string id() = 0;
- private:
+ virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
+ };
- boost::shared_mutex mLock;
- std::string mName;
- std::vector<BridgeInfo> mBridges;
- bool mShuttingDown;
- bool mSuspended;
- Ice::ObjectAdapterPtr mAdapter;
- AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mSourceProxy;
- BridgeManagerListenerMgrPtr mListeners;
- AsteriskSCF::System::Logging::Logger mLogger;
- std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx> mDefaultBridgeListeners;
+ typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
- void reap();
- void statePreCheck(const std::string&);
- };
+ /**
+ *
+ * Factory method for creating BridgeManager objects.
+ *
+ * @param adapter The Ice object adapter for the BridgeManager all of the servants it will create.
+ *
+ * @param name A name for this servant. This will be used to construct the new BridgeManager object's Ice::Identity.
+ * It is used to construct relative object ids for all objects that the bridge manager creates.
+ *
+ * @param logger The logger the BridgeManager should use.
+ *
+ **/
+ BridgeManagerServantPtr createBridgeManager(const Ice::ObjectAdapterPtr& adapter,
+ const std::string& name, const ReplicatorSmartPrx& replicator,
+ const AsteriskSCF::System::Logging::Logger& logger);
- typedef IceUtil::Handle<BridgeManagerImpl> BridgeManagerImplPtr;
};
};
commit 8802a77df53c3b5063fb7b8c0e974ad268bd5e6a
Author: Brent Eagles <beagles at digium.com>
Date: Mon Feb 14 12:12:28 2011 -0330
* Some more refactoriing around the replication code.
* Added a dump function for bridge state items so the details of replicated
state can be inspected in the logs.
TODO: Test driver needs work.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 3d3a0db..9d70895 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -24,6 +24,7 @@
#include <algorithm>
#include "MediaSplicer.h"
#include "ServiceUtil.h"
+#include "DebugUtil.h"
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -555,6 +556,7 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
SessionWrapper s(newSession);
s.setConnector(mSplicer.connect(*i, mLogger));
+ newSession->currentState = BridgedSessionState::Connected;
}
mState->bridgedSessions.push_back(newSession);
@@ -907,7 +909,10 @@ void BridgeImpl::activate(const BridgePrx& proxy)
BridgeStateItemPtr BridgeImpl::getState()
{
mLogger(Debug) << FUNLOG;
- return new BridgeStateItem(*mState.get());
+ BridgeStateItemPtr result(new BridgeStateItem);
+ *result = *mState;
+ dumpState(cerr, result, mObjAdapter->getCommunicator());
+ return result;
}
void BridgeImpl::updateState(const BridgeStateItemPtr& state)
@@ -1056,7 +1061,15 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
const ReplicatorSmartPrx& replicator,
const Logger& logger)
{
- return new BridgeImpl(objectAdapter, listeners, listenerMgr, replicator, new AsteriskSCF::Bridge::V1::BridgeStateItem, logger);
+ BridgeStateItemPtr state(new AsteriskSCF::Bridge::V1::BridgeStateItem);
+ state->runningState = Running;
+ //
+ // TODO: "replicate" is the only replication policy currently supported by the bridge service.
+ // In the future it may be possible for the bridge replica to reconstruct its media operations
+ // allowing localized resources to be used.
+ //
+ state->mediaReplicationPolicy = MediaOperationReplicationPolicy::Replicate;
+ return new BridgeImpl(objectAdapter, listeners, listenerMgr, replicator, state, logger);
}
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 32f9421..26f5580 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -369,6 +369,19 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
mLogger(Debug) << FUNLOG;
boost::unique_lock<boost::shared_mutex> lock(mLock);
Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(state->bridgeId));
+ BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+ BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), state->bridgeId, prx));
+
+ BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicator, mLogger, state);
+ Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+
+ mLogger(Info) << ": creating bridge replica " << obj->ice_toString() << "." ;
+ BridgeInfo info;
+ info.servant = bridge;
+ info.proxy = BridgePrx::uncheckedCast(obj);
+
+ bridge->activate(info.proxy);
+ mBridges.push_back(info);
}
void BridgeManagerImpl::reap()
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 049dfbe..ed2121c 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -18,6 +18,7 @@
#include "BridgeReplicatorIf.h"
#include <AsteriskSCF/StateReplicator.h>
#include "BridgeServiceConfig.h"
+#include "DebugUtil.h"
#include <map>
#include <string>
@@ -74,6 +75,7 @@ public:
BridgeStateItemPtr bridgeItem = BridgeStateItemPtr::dynamicCast((*i));
if (bridgeItem)
{
+ dumpState(cerr, bridgeItem, current.adapter->getCommunicator());
vector<BridgeServantPtr> bridges = mManager->getBridges();
bool found = false;
for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
diff --git a/src/DebugUtil.h b/src/DebugUtil.h
new file mode 100644
index 0000000..2551441
--- /dev/null
+++ b/src/DebugUtil.h
@@ -0,0 +1,147 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <Ice/Ice.h>
+#include <ostream>
+#include "BridgeReplicatorIf.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+inline
+std::ostream& dumpState(std::ostream& os, const std::string& prefix,
+ const AsteriskSCF::Bridge::V1::BridgedSessionPtr& session,
+ const Ice::CommunicatorPtr& comm)
+{
+#ifndef _NDEBUG
+
+ if (!session)
+ {
+ os << prefix << "(null)\n";
+ return os;
+ }
+
+ os << prefix << "Id: " << comm->identityToString(session->session->ice_getIdentity()) << '\n';
+ os << prefix << "State: ";
+ switch (session->currentState)
+ {
+ case AsteriskSCF::Bridge::V1::BridgedSessionState::Added:
+ os << "added\n";
+ break;
+ case AsteriskSCF::Bridge::V1::BridgedSessionState::Connected:
+ os << "connected\n";
+ break;
+ case AsteriskSCF::Bridge::V1::BridgedSessionState::Disconnected:
+ os << "disconnected\n";
+ break;
+ default:
+ os << "(invalid)\n";
+ }
+ int index = 0;
+ for (AsteriskSCF::Bridge::V1::MediaPairingSeq::const_iterator i = session->mediaPairings.begin();
+ i != session->mediaPairings.end(); ++i)
+ {
+ if (index == 0)
+ {
+ os << prefix << "Pairings:\n";
+ }
+ os << prefix << "\t" << index << ". source: " << comm->identityToString((*i)->source->ice_getIdentity()) << " to sink : " <<
+ comm->identityToString((*i)->sink->ice_getIdentity()) << '\n';
+ ++index;
+ }
+
+ return os;
+#else
+ return os;
+#endif
+}
+
+inline
+std::ostream& dumpState(std::ostream& os, const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state,
+ const Ice::CommunicatorPtr& comm)
+{
+#ifndef _NDEBUG
+ if (!state)
+ {
+ os << "(null)\n";
+ return os;
+ }
+ os << "BRIDGE STATE DUMP <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n";
+ os << "key : " << state->key << "\nbridge id : " << state->bridgeId << "\n";
+ switch (state->runningState)
+ {
+ case AsteriskSCF::Bridge::V1::ServiceState::Running:
+ os << "running\n";
+ break;
+ case AsteriskSCF::Bridge::V1::ServiceState::Paused:
+ os << "paused\n";
+ break;
+ case AsteriskSCF::Bridge::V1::ServiceState::ShuttingDown:
+ os << "shutting down\n";
+ break;
+ case AsteriskSCF::Bridge::V1::ServiceState::Destroyed:
+ os << "destroyed\n";
+ break;
+ default:
+ os << "(invalid)\n";
+ }
+ int index = 0;
+ for (AsteriskSCF::Bridge::V1::BridgedSessionSeq::const_iterator i = state->bridgedSessions.begin();
+ i != state->bridgedSessions.end(); ++i)
+ {
+ if (index == 0)
+ {
+ os << "Bridged sessions (" << state->bridgedSessions.size() << "):\n";
+ index = 1;
+ }
+ dumpState(os, "\t", *i, comm);
+ }
+ index = 0;
+ for (AsteriskSCF::Bridge::V1::BridgeListenerSeq::const_iterator i = state->listeners.begin();
+ i != state->listeners.end(); ++i)
+ {
+ if (index == 0)
+ {
+ os << "Bridge listeners (" << state->listeners.size() << "):\n";
+ }
+ os << index << ". " << comm->identityToString((*i)->ice_getIdentity()) << '\n';
+ ++index;
+ }
+ index = 0;
+ os << "Media replication policy: ";
+ switch (state->mediaReplicationPolicy)
+ {
+ case AsteriskSCF::Bridge::V1::MediaOperationReplicationPolicy::Replicate:
+ os << "replicate\n";
+ break;
+ case AsteriskSCF::Bridge::V1::MediaOperationReplicationPolicy::Reconstruct:
+ os << "reconstruct\n";
+ break;
+ default:
+ os << "(invalid)\n";
+ }
+ os << "END BRIDGE STATE DUMP >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n";
+ return os;
+#else
+ return os;
+#endif
+}
+
+} /* End of namespace BridgeService */
+} /* End of namespace AsteriskSCF */
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 1cc7ec3..3ffb41d 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -36,6 +36,7 @@
//
using namespace AsteriskSCF::BridgingTest;
+using namespace std;
/* Cache the command line arguments so that Ice can be initialized within the global fixture. */
namespace
@@ -63,7 +64,7 @@ public:
~TestEnvironment()
{
- if(mArgc > 0)
+ if (mArgc > 0)
{
for(size_t i = 0; i < mArgc; ++i)
{
@@ -128,7 +129,7 @@ public:
{
mCommunicator->destroy();
}
- catch(...)
+ catch (...)
{
std::cerr << "exception on destroy!" << std::endl;
}
@@ -164,7 +165,7 @@ public:
{
mCommunicator->destroy();
}
- catch(...)
+ catch (...)
{
}
}
@@ -200,7 +201,7 @@ bool find(const std::vector<std::string>& list, const std::string& value)
{
for(std::vector<std::string>::const_iterator i = list.begin(); i != list.end(); ++i)
{
- if(i->find(value) != std::string::npos)
+ if (i->find(value) != std::string::npos)
{
return true;
}
@@ -260,18 +261,18 @@ public:
BOOST_CHECK(servant->createCalls() == 1);
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
@@ -346,18 +347,18 @@ public:
channel.commands()->getlog(idB, log);
BOOST_CHECK(find(log, "stop"));
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
@@ -440,18 +441,18 @@ public:
channel.commands()->getlog(idB, log);
BOOST_CHECK(find(log, "stop"));
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
@@ -492,18 +493,18 @@ public:
BOOST_CHECK(servant->createCalls() == 2);
bridge->shutdown();
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
@@ -565,23 +566,33 @@ public:
bridge->shutdown();
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
+ //
+ // Technically speaking the replication tests *violate* the intended operation of
+ // replicas. Specifically, we treat them both as active for these tests. The alternative
+ // are to additional queries to the slice interfaces or make functional level tests that
+ // terminate the processes. However, the purpose of these tests is to verify that replication
+ // occurs, not that it functions EXACTLY correct in a live environment. That's outside
+ // of the scope of these particular tests. On that note, THAT's A BIG TODO:
+ //
+ // Add system level tests to verify replication in a simulated live environment.
+ //
void testBridgeManagerListenerReplication()
{
try
@@ -612,29 +623,43 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionSeq sessions;
AsteriskSCF::SessionCommunications::V1::BridgePrx bridge(mgrPrx->createBridge(sessions, 0));
servant->wait(5000);
+ if (servant->createCalls() != 1)
+ {
+ cerr << "Listener received : "+ servant->createCalls() << " create calls\n";
+ }
BOOST_CHECK(servant->createCalls() == 1);
mgrPrx->removeListener(listenerPrx);
bridge = mgrPrx->createBridge(sessions, 0);
- servant->wait(5000);
+ if (!servant->wait(5000))
+ {
+ BOOST_MESSAGE("Wait for event expired");
+ }
+ if (servant->createCalls() != 1)
+ {
+ cerr << "Listener received : " << servant->createCalls() << " create calls\n";
+ }
BOOST_CHECK(servant->createCalls() == 1);
- mgrPrx->addListener(listenerPrx);
- bridge = mgrPrx->createBridge(sessions, 0);
+ mgrPrx->removeListener(listenerPrx);
servant->wait(5000);
+ if (servant->createCalls() != 2)
+ {
+ cerr << "Listener received : " << servant->createCalls() << " create calls\n";
+ }
BOOST_CHECK(servant->createCalls() == 2);
bridge->shutdown();
}
- catch(const Ice::Exception& ex)
+ catch (const Ice::Exception& ex)
{
std::ostringstream msg;
msg << "Unexpected Ice exception " << ex.what();
BOOST_FAIL(msg.str());
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
}
- catch(...)
+ catch (...)
{
BOOST_FAIL("Unexpected exception");
}
@@ -659,6 +684,8 @@ bool init_unit_test()
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::bridgeManagerListeners, bridgeTester)));
framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::testReplaceSession, bridgeTester)));
+ framework::master_test_suite().
+ add(BOOST_TEST_CASE(boost::bind(&BridgeTester::testBridgeManagerListenerReplication, bridgeTester)));
return true;
}
@@ -676,7 +703,7 @@ public:
initData.properties = globalTestEnvironment->communicator()->getProperties();
Ice::CommunicatorPtr t = Ice::initialize(initData);
IceBox::ServiceManagerPrx mgr = IceBox::ServiceManagerPrx::checkedCast(t->propertyToProxy("IceBoxMgr.Proxy"));
- if(mgr)
+ if (mgr)
{
mgr->shutdown();
}
commit 53eb34cefe2543d77ebf5e808e125bc4924d0aae
Author: Brent Eagles <beagles at digium.com>
Date: Fri Feb 11 16:17:41 2011 -0330
Fix some merge/conflict/bugs.
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index c9e92e0..22c0c4a 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -19,7 +19,9 @@ TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
IceBox.InheritProperties=1
IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
+IceBox.Service.Replicator=../src at BridgeReplicator::create
IceBox.Service.TestBridge=../src at bridgeservice:create
+IceBox.Service.TestBridge2=../src/@bridgeservice:create
IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
IceBox.Service.TestDriver=../test at bridging_unit_test:create
@@ -27,6 +29,9 @@ IceBox.Service.TestDriver=../test at bridging_unit_test:create
TestBridge.InstanceName=TestBridge
TestBridge.BridgeService.Endpoints=default -p 55561
TestBridge.Proxy=BridgeManager:default -p 55561
+TestBridge2.InstanceName=TestBridge2
+TestBridge2.BridgeService.Endpoints=default -p 55562
+TestBridge2.Proxy=BridgeManager:default -p 55562
TestChannel.Proxy=TestChannel:default -p 55560
TestUtilAdapter.Endpoints=default -p 55562
Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 7a213fa..3d3a0db 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -23,7 +23,6 @@
#include <memory>
#include <algorithm>
#include "MediaSplicer.h"
-#include "BridgeServiceConfig.h"
#include "ServiceUtil.h"
using namespace AsteriskSCF::System::Logging;
@@ -136,9 +135,11 @@ class BridgeImpl : virtual public BridgeServant
{
public:
BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter,
- const vector<BridgeListenerPrx>& listeners,
- const BridgeListenerMgrPtr& listenerMgr,
- const Logger& logger);
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger);
~BridgeImpl();
@@ -191,6 +192,7 @@ private:
Ice::ObjectAdapterPtr mObjAdapter;
BridgeListenerMgrPtr mListeners;
+ ReplicatorSmartPrx mReplicator;
SessionListenerPtr mSessionListener;
SessionListenerPrx mSessionListenerPrx;
BridgePrx mPrx;
@@ -199,6 +201,7 @@ private:
IceUtil::Handle<IceUtil::Thread> mShutdownThread;
void statePreCheck();
+ void update();
};
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
@@ -446,12 +449,16 @@ private:
} // End of anonymous namespace
BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
- const vector<BridgeListenerPrx>& listeners,
- const BridgeListenerMgrPtr& listenerMgr,
- const Logger& logger) :
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger) :
mActivated(false),
+ mState(state),
mObjAdapter(adapter),
mListeners(listenerMgr),
+ mReplicator(replicator),
mSessionListener(new SessionListenerImpl(this, logger)),
mLogger(logger)
{
@@ -553,6 +560,7 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
addedSessions.push_back(*i);
}
+ update();
}
if (addedSessions.size())
{
@@ -603,6 +611,7 @@ void BridgeImpl::removeSessions(
//
}
}
+ update();
}
if (removedSessions.size() != 0)
@@ -673,6 +682,7 @@ void BridgeImpl::shutdown(const Ice::Current& current)
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
mSessionListener = 0;
+ update();
}
void BridgeImpl::destroy(const Ice::Current& current)
@@ -698,16 +708,21 @@ void BridgeImpl::destroy(const Ice::Current& current)
// Remove references to the session listener implementation.
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+ update();
}
void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current&)
{
mListeners->addListener(listener);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ update();
}
void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current&)
{
mListeners->removeListener(listener);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ update();
}
void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
@@ -797,6 +812,7 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
{
mState->bridgedSessions.push_back(*j);
}
+ update();
}
//
@@ -879,6 +895,8 @@ void BridgeImpl::activate(const BridgePrx& proxy)
{
mPrx = proxy;
string listenerId = mObjAdapter->getCommunicator()->identityToString(mPrx->ice_getIdentity());
+ mState->key = listenerId;
+ mState->bridgeId = listenerId;
listenerId += ".sessionListener";
mSessionListenerPrx =
SessionListenerPrx::uncheckedCast(
@@ -889,7 +907,6 @@ void BridgeImpl::activate(const BridgePrx& proxy)
BridgeStateItemPtr BridgeImpl::getState()
{
mLogger(Debug) << FUNLOG;
- boost::shared_lock<boost::shared_mutex> lock(mLock);
return new BridgeStateItem(*mState.get());
}
@@ -898,7 +915,6 @@ void BridgeImpl::updateState(const BridgeStateItemPtr& state)
mLogger(Debug) << FUNLOG;
boost::unique_lock<boost::shared_mutex> lock(mLock);
*mState.get() = *state.get();
-
}
string BridgeImpl::id()
@@ -971,20 +987,6 @@ size_t BridgeImpl::sessionStopped(const SessionPrx& session, const ResponseCodeP
return sizeAfter;
}
-void BridgeImpl::statePreCheck()
-{
- if (mState == ShuttingDown)
- {
- mLogger(Debug) << FUNLOG << ": called when shutting down." ;
- throw AsteriskSCF::System::Component::V1::ShuttingDown();
- }
- if (mState == Destroyed)
- {
- mLogger(Debug) << FUNLOG << ": called when destroyed." ;
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
-}
-
namespace
{
class ShutdownThread : public IceUtil::Thread
@@ -1026,11 +1028,43 @@ vector<BridgedSessionPtr> BridgeImpl::currentSessions()
return mState->bridgedSessions;
}
+void BridgeImpl::update()
+{
+ ReplicatedStateItemSeq seq;
+ seq.push_back(getState());
+ mReplicator->setState(seq);
+}
+
+void BridgeImpl::statePreCheck()
+{
+ if (mState == ShuttingDown)
+ {
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+ throw AsteriskSCF::System::Component::V1::ShuttingDown();
+ }
+ if (mState == Destroyed)
+ {
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+}
+
+IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
+AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& objectAdapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger)
+{
+ return new BridgeImpl(objectAdapter, listeners, listenerMgr, replicator, new AsteriskSCF::Bridge::V1::BridgeStateItem, logger);
+}
+
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& objectAdapter,
- const vector<BridgeListenerPrx>& listeners,
- const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
- const Logger& logger)
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger,
+ const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
{
- return new BridgeImpl(objectAdapter, listeners, listenerMgr, logger);
+ return new BridgeImpl(objectAdapter, state->listeners, listenerMgr, replicator, state, logger);
}
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index a3da8ad..d20be7a 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -20,6 +20,7 @@
#include <vector>
#include "BridgeReplicatorIf.h"
#include "BridgeListenerMgr.h"
+#include "BridgeServiceConfig.h"
namespace AsteriskSCF
{
@@ -116,6 +117,7 @@ public:
static IceUtil::Handle<BridgeServant> create(const Ice::ObjectAdapterPtr& objectAdapter,
const std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>& listeners,
const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
const AsteriskSCF::System::Logging::Logger& logger);
/**
@@ -125,6 +127,7 @@ public:
**/
static IceUtil::Handle<BridgeServant> create(const Ice::ObjectAdapterPtr& objectAdapter,
const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
const AsteriskSCF::System::Logging::Logger& logger,
const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state);
};
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index eb4f8ee..32f9421 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -52,7 +52,8 @@ class BridgeManagerImpl : public BridgeManagerServant
{
public:
- BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const Logging::Logger& logger);
+ BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name,
+ const ReplicatorSmartPrx& replicator, const Logging::Logger& logger);
~BridgeManagerImpl();
//
@@ -85,6 +86,13 @@ public:
void activate();
+ string id()
+ {
+ return mName;
+ }
+
+ void createBridgeReplica(const BridgeStateItemPtr& bridgeState);
+
private:
boost::shared_mutex mLock;
@@ -92,6 +100,7 @@ private:
vector<BridgeInfo> mBridges;
bool mActivated;
Ice::ObjectAdapterPtr mAdapter;
+ ReplicatorSmartPrx mReplicator;
BridgeManagerPrx mSourceProxy;
BridgeManagerListenerMgrPtr mListeners;
Logger mLogger;
@@ -101,6 +110,8 @@ private:
void reap();
void statePreCheck(const string&);
+
+ void update();
};
typedef IceUtil::Handle<BridgeManagerImpl> BridgeManagerImplPtr;
@@ -132,16 +143,19 @@ private:
-BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const Logger& logger) :
+BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const ReplicatorSmartPrx& replicator,
+ const Logger& logger) :
mName(name),
mActivated(false),
mAdapter(adapter),
+ mReplicator(replicator),
mLogger(logger),
mState(new BridgeManagerStateItem)
{
mLogger(Info) << "Created AsteriskSCF Session-Oriented Bridge Manager." ;
mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
mState->runningState = ServiceState::Running;
+ mState->key = name;
}
BridgeManagerImpl::~BridgeManagerImpl()
@@ -171,7 +185,7 @@ BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
listeners.push_back(listener);
}
- BridgeServantPtr bridge = BridgeServant::create(mAdapter, listeners, mgr, mLogger);
+ BridgeServantPtr bridge = BridgeServant::create(mAdapter, listeners, mgr, mReplicator, mLogger);
Ice::ObjectPrx obj = mAdapter->add(bridge, id);
mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
@@ -206,6 +220,7 @@ void BridgeManagerImpl::addListener(const BridgeManagerListenerPrx& listener, co
mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
}
mListeners->addListener(listener);
+ update();
}
void BridgeManagerImpl::removeListener(const BridgeManagerListenerPrx& listener, const Ice::Current& current)
@@ -219,6 +234,7 @@ void BridgeManagerImpl::removeListener(const BridgeManagerListenerPrx& listener,
mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
}
mListeners->removeListener(listener);
+ update();
}
void BridgeManagerImpl::addDefaultBridgeListener(const BridgeListenerPrx& newListener,
@@ -237,6 +253,7 @@ void BridgeManagerImpl::addDefaultBridgeListener(const BridgeListenerPrx& newLis
{
mState->defaultBridgeListeners.push_back(newListener);
}
+ update();
}
void BridgeManagerImpl::removeDefaultBridgeListener(const BridgeListenerPrx& toRemove, const Ice::Current& current)
@@ -251,7 +268,8 @@ void BridgeManagerImpl::removeDefaultBridgeListener(const BridgeListenerPrx& toR
boost::unique_lock<boost::shared_mutex> lock(mLock);
statePreCheck(BOOST_CURRENT_FUNCTION);
remove_if(mState->defaultBridgeListeners.begin(), mState->defaultBridgeListeners.end(),
- IdentityComparePredicate<BridgeListenerPrx>(toRemove));
+ IdentityComparePredicate<BridgeListenerPrx>(toRemove));
+ update();
}
BridgeSeq BridgeManagerImpl::listBridges(const Ice::Current& current)
@@ -312,7 +330,6 @@ BridgeManagerStateItemPtr BridgeManagerImpl::getState()
//
// Copy it instead of passing a reference for thread safety.
//
- boost::shared_lock<boost::shared_mutex> lock(mLock);
BridgeManagerStateItemPtr result(new BridgeManagerStateItem(*(mState.get())));
result->listeners = mListeners->getListeners();
return result;
@@ -347,6 +364,13 @@ void BridgeManagerImpl::activate()
}
}
+void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
+{
+ mLogger(Debug) << FUNLOG;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(state->bridgeId));
+}
+
void BridgeManagerImpl::reap()
{
mLogger(Debug) << FUNLOG << ": reaping bridge set of " << mBridges.size() << " bridges." ;
@@ -381,14 +405,21 @@ void BridgeManagerImpl::statePreCheck(const string& caller)
mLogger(Debug) << caller << ": called when suspended." ;
throw AsteriskSCF::System::Component::V1::Suspended();
}
+}
+void BridgeManagerImpl::update()
+{
+ ReplicatedStateItemSeq seq;
+ seq.push_back(getState());
+ mReplicator->setState(seq);
}
} // End of anonymous namespace
BridgeManagerServantPtr
-AsteriskSCF::BridgeService::createBridgeManager(const Ice::ObjectAdapterPtr& adapter, const string& name,
+AsteriskSCF::BridgeService::createBridgeManager(const Ice::ObjectAdapterPtr& adapter, const string& name,
+ const ReplicatorSmartPrx& replicator,
const Logger& logger)
{
- return new BridgeManagerImpl(adapter, name, logger);
+ return new BridgeManagerImpl(adapter, name, replicator, logger);
}
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index 662656d..5f768ae 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -48,6 +48,10 @@ public:
virtual std::vector<BridgeServantPtr> getBridges() = 0;
virtual void activate() = 0;
+
+ virtual std::string id() = 0;
+
+ virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
};
typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
@@ -65,7 +69,8 @@ typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
*
**/
BridgeManagerServantPtr createBridgeManager(const Ice::ObjectAdapterPtr& adapter,
- const std::string& name, const AsteriskSCF::System::Logging::Logger& logger);
+ const std::string& name, const ReplicatorSmartPrx& replicator,
+ const AsteriskSCF::System::Logging::Logger& logger);
};
};
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index b152c97..049dfbe 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -48,18 +48,26 @@ public:
}
}
- void stateSet(const ReplicatedStateItemSeq& items, const Ice::Current&)
+ void stateSet(const ReplicatedStateItemSeq& items, const Ice::Current& current)
{
for (ReplicatedStateItemSeq::const_iterator i = items.begin(); i != items.end(); ++i)
{
+ //
+ // I don't really have a use for these hash table items at the moment.
+ //
+ mItems[(*i)->key] = *i;
+
BridgeManagerStateItemPtr managerItem = BridgeManagerStateItemPtr::dynamicCast((*i));
if (managerItem)
{
- //
- // There is only on bridge per listener instance by design/implementation, so this
- // one is pretty easy.
- //
- mManager->updateState(managerItem);
+ if(managerItem->key == mManager->id())
+ {
+ //
+ // There is only on bridge per listener instance by design/implementation, so this
+ // one is pretty easy.
+ //
+ mManager->updateState(managerItem);
+ }
continue;
}
@@ -67,16 +75,24 @@ public:
if (bridgeItem)
{
vector<BridgeServantPtr> bridges = mManager->getBridges();
+ bool found = false;
for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
{
if ((*b)->id() == bridgeItem->bridgeId)
{
(*b)->updateState(bridgeItem);
+ found = true;
}
//
// We could break here if we could be sure that there were no other updates.
//
}
+ if (!found)
+ {
+ //
+ // XXX- new bridge!
+ //
+ }
continue;
}
@@ -93,7 +109,7 @@ private:
}
-ReplicatorListenerPtr createStateListener(const Logger& logger, const BridgeManagerServantPtr& manager)
+ReplicatorListenerPtr AsteriskSCF::BridgeService::createStateListener(const Logger& logger, const BridgeManagerServantPtr& manager)
{
return new ListenerI(logger, manager);
}
diff --git a/src/BridgeServiceConfig.h b/src/BridgeServiceConfig.h
index 537b3f9..c6ddc65 100644
--- a/src/BridgeServiceConfig.h
+++ b/src/BridgeServiceConfig.h
@@ -18,6 +18,8 @@
#include <boost/current_function.hpp>
#include <Ice/Ice.h>
#include <string>
+#include <AsteriskSCF/SmartProxy.h>
+#include "BridgeReplicatorIf.h"
//
// Some bridge service wide definitions.
@@ -65,5 +67,8 @@ inline std::string objectIdFromCurrent(const Ice::Current& current)
return current.adapter->getCommunicator()->identityToString(current.id);
}
+typedef AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::Bridge::V1::ReplicatorPrx> ReplicatorSmartPrx;
+
+
} // End of namespace BridgeService
} // End of namespace AsteriskSCF
diff --git a/src/Service.cpp b/src/Service.cpp
index fa2d34e..d773280 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -23,7 +23,6 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
-#include <AsteriskSCF/SmartProxy.h>
#include "ServiceUtil.h"
#include "BridgeManagerImpl.h"
#include "BridgeReplicatorStateListenerI.h"
@@ -142,7 +141,8 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
// which adapter is currently the primary in a group of replicas.
//
mAdapter = communicator->createObjectAdapter(adapterName);
- mInfrastructureAdapter = communicator->createObjectAdapter(adapterName + ".Internal");
+ mInfrastructureAdapter = communicator->createObjectAdapter(adapterName + "Internal");
+ mInfrastructureAdapter->activate();
//
// Configure the AsteriskSCF logger.
@@ -157,6 +157,28 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
throw IceBox::FailureException(__FILE__, __LINE__, "Configuration error: Unable to locate property `ServiceLocatorManagementProxy'");
}
+ ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(communicator->propertyToProxy("LocatorService.Proxy"));
+ /*
+ * TODO
+
+ BridgeStateReplicatorParamsPtr searchParams = new BridgeStateReplicatorParams;
+ searchParams->category = StateReplicatorDiscoveryCategory;
+ searchParams->name = communicator->getProperties()->getPropertyWithDefault("Bridge.StateReplicatorName", "default");
+ */
+ ServiceLocatorParamsPtr searchParams = new ServiceLocatorParams;
+ searchParams->category = StateReplicatorDiscoveryCategory;
+
+ ReplicatorSmartPrx replicator;
+ try
+ {
+ replicator = ReplicatorSmartPrx(locator, searchParams, logger);
+ }
+ catch (const std::exception& ex)
+ {
+ logger(Error) << "Bridge state replicator lookup failed. Continuing without replication. " << ex.what();
+ }
+
+
//
// TODO: The bridge manager's identity should come from a central configuration and/or a replicator
// service.
@@ -167,9 +189,9 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
// It's very important that uncheckedCast's be used here as there are no guarantees
// that the object adapter is activated yet. If it is not, then this can hang.
//
- BridgeManagerServantPtr manager = createBridgeManager(mAdapter, managerName, logger);
+ BridgeManagerServantPtr manager = createBridgeManager(mAdapter, managerName, replicator, logger);
BridgeManagerPrx bridgeManagerPrx = BridgeManagerPrx::uncheckedCast(mAdapter->add(manager,
- mAdapter->getCommunicator()->stringToIdentity(name)));
+ mAdapter->getCommunicator()->stringToIdentity(managerName)));
assert(bridgeManagerPrx != 0);
if (bridgeManagerPrx == 0)
{
@@ -186,28 +208,17 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
ReplicaPrx replicaControlPrx =
ReplicaPrx::uncheckedCast(mInfrastructureAdapter->add(mReplicaControl, communicator->stringToIdentity(BridgeReplicaId)));
- ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(communicator->propertyToProxy("LocatorService.Proxy"));
- BridgeStateReplicatorParamsPtr searchParams = new BridgeStateReplicatorParams;
- searchParams->category = StateReplicatorDiscoveryCategory;
- searchParams->name = communicator->getProperties()->getPropertyWithDefault("Bridge.StateReplicatorName", "default");
-
- SmartProxy::SmartProxy<ReplicatorPrx> replicator;
- try
- {
- replicator = SmartProxy::SmartProxy<ReplicatorPrx>(locator, searchParams, logger);
- }
- catch (const std::exception& ex)
+ bool onStandby = false;
+ if (replicator)
{
- logger(Error) << "Bridge state replicator lookup failed. Continuing without replication. " << ex.what();
+ onStandby = communicator->getProperties()->getPropertyWithDefault(adapterName + "StateReplicatorListener", "no") == "yes";
}
- if (replicator)
+ if (onStandby)
{
- if (communicator->getProperties()->getPropertyWithDefault("Bridge.StateReplicatorListener", "no") == "yes")
- {
- replicator->addListener(listenerPrx);
- replicaControlPrx->standby();
- }
+ replicator->addListener(listenerPrx);
+ replicaControlPrx->standby();
}
+ manager->activate();
bool registered = false;
try
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 95b8d1a..80e2a80 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -4,6 +4,7 @@ if(NOT integrated_build STREQUAL "true")
endif()
asterisk_scf_component_init(bridging_unit_test CXX)
include_directories("../src")
+include_directories(${utils_dir}/SmartProxy/include)
asterisk_scf_component_add_slice(bridging_unit_test CommandsIf)
asterisk_scf_component_add_file(bridging_unit_test TestBridging.cpp)
asterisk_scf_component_add_file(bridging_unit_test SessionListenerI.h)
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 6ece8b7..1cc7ec3 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -582,6 +582,64 @@ public:
}
}
+ void testBridgeManagerListenerReplication()
+ {
+ try
+ {
+ IceEnvironment testEnv(env()->properties());
+ TestChannelWrapper channel(env()->properties());
+ try
+ {
+ Ice::ObjectAdapterPtr testAdapter = testEnv.communicator()->createObjectAdapter("TestUtilAdapter");
+ testAdapter->activate();
+ BridgeManagerListenerIPtr servant = new BridgeManagerListenerI;
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerListenerPrx listenerPrx;
+ addServant(listenerPrx, testAdapter, servant, testEnv.strToIdent("testBridgeManagerListener"));
+
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx =
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx::checkedCast(
+ testEnv.communicator()->propertyToProxy("TestBridge.Proxy"));
+ BOOST_CHECK(mgrPrx);
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx2 =
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx::checkedCast(
+ testEnv.communicator()->propertyToProxy("TestBridge2.Proxy"));
+ BOOST_CHECK(mgrPrx2);
+
+ mgrPrx->addListener(listenerPrx);
+ BOOST_CHECK(servant->stoppingCalls() == 0);
+ BOOST_CHECK(servant->stoppedCalls() == 0);
+ BOOST_CHECK(servant->createCalls() == 0);
+ AsteriskSCF::SessionCommunications::V1::SessionSeq sessions;
+ AsteriskSCF::SessionCommunications::V1::BridgePrx bridge(mgrPrx->createBridge(sessions, 0));
+ servant->wait(5000);
+ BOOST_CHECK(servant->createCalls() == 1);
+ mgrPrx->removeListener(listenerPrx);
+ bridge = mgrPrx->createBridge(sessions, 0);
+ servant->wait(5000);
+ BOOST_CHECK(servant->createCalls() == 1);
+ mgrPrx->addListener(listenerPrx);
+ bridge = mgrPrx->createBridge(sessions, 0);
+ servant->wait(5000);
+ BOOST_CHECK(servant->createCalls() == 2);
+ bridge->shutdown();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ std::ostringstream msg;
+ msg << "Unexpected Ice exception " << ex.what();
+ BOOST_FAIL(msg.str());
+ }
+ catch(...)
+ {
+ BOOST_FAIL("Unexpected exception");
+ }
+ }
+ catch(...)
+ {
+ BOOST_FAIL("Unexpected exception");
+ }
+ }
+
private:
TestEnvironmentPtr mTestEnvironment;
};
commit cf0d0502fe34e864f6d6dc2581ceccca3f5f3416
Merge: ab41b20 833b075
Author: Brent Eagles <beagles at digium.com>
Date: Thu Feb 10 13:19:55 2011 -0330
Merge branch 'replicator_bridge' of ssh://puffin/home/brent/src/asteriskscf/all/master/gitall/bridging
commit 833b075ffe85a46ff7173b7d3a5ed586a52a3d54
Merge: 0c862d7 d18f951
Author: Brent Eagles <beagles at digium.com>
Date: Thu Feb 10 13:17:56 2011 -0330
Bring replicator branch up to speed with master.
diff --cc src/BridgeManagerImpl.cpp
index cb74c91,682b606..eb4f8ee
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@@ -297,66 -236,18 +297,66 @@@ void BridgeManagerImpl::shutdown(const
mListeners->stopped();
}
mAdapter->getCommunicator()->shutdown();
+ mState->runningState = ServiceState::Destroyed;
+}
+
+bool BridgeManagerImpl::destroyed()
+{
+ mLogger(Debug) << FUNLOG;
+ return mState->runningState == ServiceState::Destroyed;
+}
+
+BridgeManagerStateItemPtr BridgeManagerImpl::getState()
+{
+ mLogger(Debug) << FUNLOG;
+ //
+ // Copy it instead of passing a reference for thread safety.
+ //
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ BridgeManagerStateItemPtr result(new BridgeManagerStateItem(*(mState.get())));
- result->listeners.swap(mListeners->getListeners());
++ result->listeners = mListeners->getListeners();
+ return result;
+}
+
+void BridgeManagerImpl::updateState(const BridgeManagerStateItemPtr& state)
+{
+ mLogger(Debug) << FUNLOG;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ *mState.get() = *state.get();
+}
+
+vector<BridgeServantPtr> BridgeManagerImpl::getBridges()
+{
+ mLogger(Debug) << FUNLOG;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ vector<BridgeServantPtr> result;
+ for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
+ {
+ result.push_back(i->servant);
+ }
+ return result;
+}
+
+void BridgeManagerImpl::activate()
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mActivated = true;
+ for (BridgeManagerListenerSeq::iterator i = mState->listeners.begin(); i != mState->listeners.end(); ++i)
+ {
+ mListeners->addListener(*i);
+ }
}
-void AsteriskSCF::BridgeService::BridgeManagerImpl::reap()
+void BridgeManagerImpl::reap()
{
- mLogger(Debug) << __FUNCTION__ << ": reaping bridge set of " << mBridges.size() << " bridges." ;
- std::vector<BridgeInfo>::iterator i = mBridges.begin();
- while(i != mBridges.end())
+ mLogger(Debug) << FUNLOG << ": reaping bridge set of " << mBridges.size() << " bridges." ;
+ vector<BridgeInfo>::iterator i = mBridges.begin();
+ while (i != mBridges.end())
{
- if(i->servant->destroyed())
+ if (i->servant->destroyed())
{
i->servant->destroyImpl();
- std::vector<BridgeInfo>::iterator t = i;
+ vector<BridgeInfo>::iterator t = i;
i = mBridges.erase(t);
continue;
}
diff --cc src/BridgeManagerImpl.h
index e46561d,5a386d8..662656d
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@@ -22,8 -20,8 +22,7 @@@
#include <vector>
#include "BridgeServiceConfig.h"
-
#include "BridgeImpl.h"
-#include "BridgeManagerListenerMgr.h"
namespace AsteriskSCF
{
diff --cc src/BridgeServiceConfig.h
index 0ed76af,289892a..537b3f9
--- a/src/BridgeServiceConfig.h
+++ b/src/BridgeServiceConfig.h
@@@ -24,20 -22,26 +24,19 @@@
//
namespace AsteriskSCF
{
-
-namespace System
-{
-namespace Logging
-{
- class Logger;
- //
- // Temporary until the logger is a reference counted entity.
- //
- typedef Logger& LoggerPtr;
-
-} // End of namespace Logging
-} // End of namespace System
-
namespace BridgeService
{
+
+//
+// There are several places where we need to compare the identities of two
+// objects, particulaly when searching through collections. This functor
+// allows us to use std algorithms to scan them without hand coding loops.
+//
template <class T>
-class IdentityComparePred : public std::unary_function<T, bool>
+class IdentityComparePredicate : public std::unary_function<T, bool>
{
public:
- IdentityComparePred(const T& example) :
+ IdentityComparePredicate(const T& example) :
mExample(example)
{
}
diff --cc test/CMakeLists.txt
index 8587fd3,4c33c27..95b8d1a
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@@ -25,11 -21,12 +25,13 @@@ if(NOT logger_dir
endif()
include_directories(${logger_dir}/include)
include_directories(${API_INCLUDE_DIR})
-include_directories(${TEST_CHANNEL_API_INCLUDE_DIR})
-
+if(integrated_build STREQUAL "true")
+ include_directories(${CMAKE_CURRENT_BINARY_DIR}/../../test_channel/local_slice/generated)
+endif()
asterisk_scf_component_build_icebox(bridging_unit_test)
target_link_libraries(bridging_unit_test logging-client)
+ target_link_libraries(bridging_unit_test asterisk-scf-api)
+ target_link_libraries(bridging_unit_test test-channel-api)
# component tests only work for integrated builds
if(integrated_build STREQUAL "true")
commit ab41b20d012625294c767750d9ed30efda04f4d8
Author: Brent Eagles <beagles at digium.com>
Date: Thu Feb 10 12:59:50 2011 -0330
Fix build error
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index cb74c91..eb4f8ee 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -314,7 +314,7 @@ BridgeManagerStateItemPtr BridgeManagerImpl::getState()
//
boost::shared_lock<boost::shared_mutex> lock(mLock);
BridgeManagerStateItemPtr result(new BridgeManagerStateItem(*(mState.get())));
- result->listeners.swap(mListeners->getListeners());
+ result->listeners = mListeners->getListeners();
return result;
}
commit 0c862d717a6d38c245c600ef866118b6b78e50ec
Author: Brent Eagles <beagles at digium.com>
Date: Thu Feb 10 11:02:31 2011 -0330
More bridge replication changes.
* infrastructure
* listener
* initialization changes
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index f93bc7a..7a213fa 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -24,10 +24,13 @@
#include <algorithm>
#include "MediaSplicer.h"
#include "BridgeServiceConfig.h"
+#include "ServiceUtil.h"
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
using namespace std;
/**
@@ -44,102 +47,63 @@ using namespace std;
namespace
{
-//
-// TODO: this should be refactored into a utility library.
-//
-class RetryPolicy
-{
-public:
- RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
- mMaxRetries(maxRetries),
- mRetryInterval(intervalInMilliseconds),
- mCounter(0)
- {
- }
-
- bool canRetry()
- {
- return mCounter < mMaxRetries;
- }
-
- bool retry()
- {
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
- ++mCounter;
- return canRetry();
- }
-
-private:
- size_t mMaxRetries;
- size_t mRetryInterval;
- size_t mCounter;
-};
/**
- * Encapsulates the state of a bridge session along with
- * the other bridging helper objects.
+ *
+ * A helper class implemented as a wrapper for a BridgeStateItem.
+ *
**/
-class BridgeSession : public IceUtil::Shared
+class SessionWrapper
{
public:
- BridgeSession(const SessionPrx& s, const MediaConnectorPtr& con, bool isConnected) :
- mSession(s),
- mConnector(con),
- mConnected(isConnected)
+ SessionWrapper(const BridgedSessionPtr& s) :
+ mSession(s)
{
}
bool isConnected()
{
- IceUtil::Mutex::Lock lock(mMutex);
- return mConnected;
+ return mSession->currentState == BridgedSessionState::Connected;
}
void connect()
{
- //
- // Only call connect on a session if its not already connected.
- //
- IceUtil::Mutex::Lock lock(mMutex);
- if (mConnected)
+ if (mSession->currentState == BridgedSessionState::Connected)
+ {
return;
+ }
- mSession->connect();
- mConnected = true;
+ mSession->session->connect();
+ mSession->currentState = BridgedSessionState::Connected;
}
void ring()
{
- //
- // Don't relay ring notifications to connected sessions.
- //
- IceUtil::Mutex::Lock lock(mMutex);
- if (mConnected)
+ if (mSession->currentState == BridgedSessionState::Connected)
+ {
return;
- mSession->ring();
+ }
+ mSession->session->ring();
}
void setConnected()
{
- IceUtil::Mutex::Lock lock(mMutex);
- mConnected = true;
+ mSession->currentState = BridgedSessionState::Connected;
}
SessionPrx getSession() const
{
- return mSession;
+ return mSession->session;
}
void setConnector(const MediaConnectorPtr& connector)
{
- IceUtil::Mutex::Lock lock(mMutex);
mConnector = connector;
}
void disconnect()
{
- IceUtil::Mutex::Lock lock(mMutex);
if (mConnector)
{
try
@@ -154,15 +118,13 @@ public:
}
mConnector = 0;
}
+ mSession->currentState = BridgedSessionState::Disconnected;
}
private:
- SessionPrx mSession;
+ BridgedSessionPtr mSession;
MediaConnectorPtr mConnector;
- bool mConnected;
- IceUtil::Mutex mMutex;
};
-typedef IceUtil::Handle<BridgeSession> BridgeSessionPtr;
/**
*
@@ -202,23 +164,26 @@ public:
void destroyImpl();
void shutdownImpl(const Ice::Current& current);
void activate(const BridgePrx& proxy);
+
+ BridgeStateItemPtr getState();
+ void updateState(const BridgeStateItemPtr& state);
+ void activate();
+ string id();
+
void sessionConnected(const SessionPrx& session);
size_t sessionStopped(const SessionPrx& session, const ResponseCodePtr& response);
- vector<BridgeSessionPtr> currentSessions();
+
+
+ vector<BridgedSessionPtr> currentSessions();
void spawnShutdown();
private:
+
boost::shared_mutex mLock;
- enum ServiceStates
- {
- Running,
- ShuttingDown,
- Destroyed
- };
- ServiceStates mState;
- vector<BridgeSessionPtr> mSessions;
+ bool mActivated;
+ BridgeStateItemPtr mState;
MediaSplicer mSplicer;
@@ -237,6 +202,7 @@ private:
};
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
+
void checkSessions(const SessionSeq& sessions)
{
Ice::LongSeq invalidIndexes;
@@ -266,7 +232,7 @@ static const string TopicPrefix("AsteriskSCF.Bridge.");
//
// Functor to support using for_each on shutdown.
//
-class ShutdownImpl : public unary_function<BridgeSessionPtr, void>
+class ShutdownImpl : public unary_function<BridgedSessionPtr, void>
{
public:
ShutdownImpl(const SessionListenerPrx& listener,
@@ -278,13 +244,14 @@ public:
{
}
- void operator()(const BridgeSessionPtr& b)
+ void operator()(const BridgedSessionPtr& b)
{
try
{
- b->getSession()->removeBridge(mListener);
- b->disconnect();
- b->getSession()->stop(mResponse);
+ SessionWrapper s(b);
+ s.getSession()->removeBridge(mListener);
+ s.disconnect();
+ s.getSession()->stop(mResponse);
}
catch (const Ice::ObjectNotExistException& ex)
{
@@ -297,33 +264,34 @@ public:
}
}
- const vector<BridgeSessionPtr>& nonExistentObjects()
+ const vector<BridgedSessionPtr>& nonExistentObjects()
{
return mNonExistent;
}
private:
SessionListenerPrx mListener;
ResponseCodePtr mResponse;
- vector<BridgeSessionPtr> mNonExistent;
+ vector<BridgedSessionPtr> mNonExistent;
Logger mLogger;
};
-class RingImpl : public unary_function<BridgeSessionPtr, void>
+class RingImpl : public unary_function<BridgedSessionPtr, void>
{
public:
RingImpl(const SessionPrx& exclude, const Logger& logger) :
- mExclude(exclude),
+ mExclude(exclude->ice_getIdentity()),
mLogger(logger)
{
}
- void operator()(const BridgeSessionPtr& b)
+ void operator()(const BridgedSessionPtr& b)
{
- if (b->getSession() != mExclude)
+ SessionWrapper s(b);
+ if (s.getSession()->ice_getIdentity() != mExclude)
{
try
{
- b->ring();
+ s.ring();
}
catch (const Ice::ObjectNotExistException& ex)
{
@@ -337,33 +305,34 @@ public:
}
}
- const vector<BridgeSessionPtr>& nonExistentObjects()
+ const vector<BridgedSessionPtr>& nonExistentObjects()
{
return mNonExistent;
}
private:
- SessionPrx mExclude;
- vector<BridgeSessionPtr> mNonExistent;
+ Ice::Identity mExclude;
+ vector<BridgedSessionPtr> mNonExistent;
Logger mLogger;
};
-class ConnectImpl : public unary_function<BridgeSessionPtr, void>
+class ConnectImpl : public unary_function<BridgedSessionPtr, void>
{
public:
ConnectImpl(const SessionPrx& exclude, const Logger& logger) :
- mExclude(exclude),
+ mExclude(exclude->ice_getIdentity()),
mLogger(logger)
{
}
- void operator()(BridgeSessionPtr& b)
+ void operator()(BridgedSessionPtr& b)
{
- if (b->getSession() != mExclude)
+ SessionWrapper s(b);
+ if (s.getSession()->ice_getIdentity() != mExclude)
{
try
{
- b->connect();
+ s.connect();
}
catch (const Ice::ObjectNotExistException& ex)
{
@@ -377,18 +346,18 @@ public:
}
}
- const vector<BridgeSessionPtr>& nonExistentObjects()
+ const vector<BridgedSessionPtr>& nonExistentObjects()
{
return mNonExistent;
}
private:
- SessionPrx mExclude;
- vector<BridgeSessionPtr> mNonExistent;
+ Ice::Identity mExclude;
+ vector<BridgedSessionPtr> mNonExistent;
Logger mLogger;
};
-class FindImpl : public unary_function<BridgeSessionPtr, bool>
+class FindImpl : public unary_function<BridgedSessionPtr, bool>
{
public:
FindImpl(const SessionPrx& prx) :
@@ -396,9 +365,10 @@ public:
{
}
- bool operator()(const BridgeSessionPtr& b)
+ bool operator()(const BridgedSessionPtr& b)
{
- return b->getSession()->ice_getIdentity() == mId;
+ SessionWrapper s(b);
+ return s.getSession()->ice_getIdentity() == mId;
}
private:
Ice::Identity mId;
@@ -429,7 +399,7 @@ public:
mLogger(Debug) << FUNLOG << ": " << ex.what();
throw;
}
- vector<BridgeSessionPtr> sessions(mBridge->currentSessions());
+ vector<BridgedSessionPtr> sessions(mBridge->currentSessions());
for_each(sessions.begin(), sessions.end(), ConnectImpl(source, mLogger));
}
@@ -448,7 +418,7 @@ public:
void ringing(const SessionPrx& source, const Ice::Current&)
{
- vector<BridgeSessionPtr> sessions(mBridge->currentSessions());
+ vector<BridgedSessionPtr> sessions(mBridge->currentSessions());
if (sessions.size() > 0)
{
for_each(sessions.begin(), sessions.end(), RingImpl(source, mLogger));
@@ -479,7 +449,7 @@ BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
const vector<BridgeListenerPrx>& listeners,
const BridgeListenerMgrPtr& listenerMgr,
const Logger& logger) :
- mState(Running),
+ mActivated(false),
mObjAdapter(adapter),
mListeners(listenerMgr),
mSessionListener(new SessionListenerImpl(this, logger)),
@@ -522,9 +492,9 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
// impossible to guard against race conditions where multiple call paths might want to add a session
// more than once for some reason. We should probably just log it and move on.
//
- vector<BridgeSessionPtr>::iterator j =
- find_if(mSessions.begin(), mSessions.end(), FindImpl(*i));
- if (j != mSessions.end())
+ vector<BridgedSessionPtr>::iterator j =
+ find_if(mState->bridgedSessions.begin(), mState->bridgedSessions.end(), FindImpl(*i));
+ if (j != mState->bridgedSessions.end())
{
mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
<< " is already registered with this bridge.";
@@ -563,18 +533,23 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
//
// We need to define these states! Especially the ones that define when start is called or not.
//
+ BridgedSessionPtr newSession(new BridgedSession);
+ newSession->session = *i;
if (info->currentState == "ready")
{
mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
<< " current state is ready (not yet connected), not establishing media connections.";
- mSessions.push_back(new BridgeSession(*i, 0, false));
+ newSession->currentState = BridgedSessionState::Added;
}
else
{
mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
- << " media is expected to be establishing, plugging media into bridge.";
- mSessions.push_back(new BridgeSession(*i, mSplicer.connect(*i, mLogger), false));;
+ << " media is expected to be establishing plugging media into bridge.";
+
... 5052 lines suppressed ...
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list