[Asterisk-code-review] stasis_state: Create internal stasis_state_proxy object. (...asterisk[17])
George Joseph
asteriskteam at digium.com
Fri Oct 4 09:54:39 CDT 2019
George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/12911 )
Change subject: stasis_state: Create internal stasis_state_proxy object.
......................................................................
stasis_state: Create internal stasis_state_proxy object.
This improves the way which stasis_state reference counting works.
Since manager->states holds onto the proxy object instead of the real
object this allows stasis_state objects to be freed when appropriate
without use of a special state_remove function. Additionally each
distinct eid associated with the state holds a reference to the state to
prevent early release and potentially allow easier debug of leaks.
Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9
---
M main/stasis_state.c
1 file changed, 147 insertions(+), 171 deletions(-)
Approvals:
George Joseph: Looks good to me, approved; Approved for Submit
diff --git a/main/stasis_state.c b/main/stasis_state.c
index aa00f9a..b85462f 100644
--- a/main/stasis_state.c
+++ b/main/stasis_state.c
@@ -26,6 +26,18 @@
/*!
* \internal
+ * \brief Used to link a stasis_state to it's manager
+ */
+struct stasis_state_proxy {
+ AO2_WEAKPROXY();
+ /*! The manager that owns and handles this state */
+ struct stasis_state_manager *manager;
+ /*! A unique id for this state object. */
+ char id[0];
+};
+
+/*!
+ * \internal
* \brief Associates a stasis topic to its last known published message
*
* This object's lifetime is tracked by the number of publishers and subscribers to it.
@@ -38,7 +50,10 @@
struct stasis_state {
/*! The number of state subscribers */
unsigned int num_subscribers;
- /*! The manager that owns and handles this state */
+ /*!
+ * \brief The manager that owns and handles this state
+ * \note This reference is owned by stasis_state_proxy
+ */
struct stasis_state_manager *manager;
/*! Forwarding information, i.e. this topic to manager's topic */
struct stasis_forward *forward;
@@ -52,11 +67,11 @@
*/
AST_VECTOR(, struct ast_eid) eids;
/*! A unique id for this state object. */
- char id[0];
+ char *id;
};
-AO2_STRING_FIELD_HASH_FN(stasis_state, id);
-AO2_STRING_FIELD_CMP_FN(stasis_state, id);
+AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id);
+AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id);
/*! The number of buckets to use for managed states */
#define STATE_BUCKETS 57
@@ -112,17 +127,28 @@
state->topic = NULL;
ao2_cleanup(state->msg);
state->msg = NULL;
- ao2_cleanup(state->manager);
- state->manager = NULL;
/* All eids should have been removed */
ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
AST_VECTOR_FREE(&state->eids);
}
+static void state_proxy_dtor(void *obj) {
+ struct stasis_state_proxy *proxy = obj;
+
+ ao2_cleanup(proxy->manager);
+}
+
+static void state_proxy_sub_cb(void *obj, void *data)
+{
+ struct stasis_state_proxy *proxy = obj;
+
+ ao2_unlink(proxy->manager->states, proxy);
+}
+
/*!
* \internal
- * \brief Allocate a stasis state object.
+ * \brief Allocate a stasis state object and add it to the manager.
*
* Create and initialize a state structure. It's required that either a state
* topic, or an id is specified. If a state topic is not given then one will be
@@ -134,45 +160,16 @@
*
* \return A stasis_state object or NULL
* \return NULL on error
+ *
+ * \pre manager->states must be locked.
+ * \pre manager->states does not contain an object matching key \a id.
*/
static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
- struct stasis_topic *state_topic, const char *id)
+ struct stasis_topic *state_topic, const char *id,
+ const char *file, int line, const char *func)
{
- struct stasis_state *state;
-
- if (!state_topic) {
- char *name;
-
- /* If not given a state topic, then an id is required */
- ast_assert(id != NULL);
-
- /*
- * To provide further detail and to ensure that the topic is unique within the
- * scope of the system we prefix it with the manager's topic name, which should
- * itself already be unique.
- */
- if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
- ast_log(LOG_ERROR, "Unable to create state topic name '%s/%s'\n",
- stasis_topic_name(manager->all_topic), id);
- return NULL;
- }
-
- state_topic = stasis_topic_create(name);
-
- if (!state_topic) {
- ast_log(LOG_ERROR, "Unable to create state topic '%s'\n", name);
- ast_free(name);
- return NULL;
- }
- ast_free(name);
- } else {
- /*
- * Since the state topic was passed in, go ahead and bump its reference.
- * By doing this here first, it allows us to consistently decrease the reference on
- * state allocation error.
- */
- ao2_ref(state_topic, +1);
- }
+ struct stasis_state_proxy *proxy = NULL;
+ struct stasis_state *state = NULL;
if (!id) {
/* If not given an id, then a state topic is required */
@@ -182,77 +179,87 @@
id = state_id_by_topic(manager->all_topic, state_topic);
}
- state = ao2_alloc(sizeof(*state) + strlen(id) + 1, state_dtor);
+ state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
if (!state) {
- ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
- id, stasis_topic_name(manager->all_topic));
- ao2_ref(state_topic, -1);
- return NULL;
+ goto error_return;
}
- strcpy(state->id, id); /* Safe */
- state->topic = state_topic; /* ref already bumped above */
+ if (!state_topic) {
+ char *name;
+
+ /*
+ * To provide further detail and to ensure that the topic is unique within the
+ * scope of the system we prefix it with the manager's topic name, which should
+ * itself already be unique.
+ */
+ if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
+ goto error_return;
+ }
+
+ state->topic = stasis_topic_create(name);
+
+ ast_free(name);
+ if (!state->topic) {
+ goto error_return;
+ }
+ } else {
+ /*
+ * Since the state topic was passed in, go ahead and bump its reference.
+ * By doing this here first, it allows us to consistently decrease the reference on
+ * state allocation error.
+ */
+ ao2_ref(state_topic, +1);
+ state->topic = state_topic;
+ }
+
+ proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
+ if (!proxy) {
+ goto error_return;
+ }
+
+ strcpy(proxy->id, id); /* Safe */
+
+ state->id = proxy->id;
+ proxy->manager = ao2_bump(manager);
+ state->manager = proxy->manager; /* state->manager is owned by the proxy */
state->forward = stasis_forward_all(state->topic, manager->all_topic);
if (!state->forward) {
- ast_log(LOG_ERROR, "Unable to add state '%s' forward in manager '%s'\n",
- id, stasis_topic_name(manager->all_topic));
- ao2_ref(state, -1);
- return NULL;
+ goto error_return;
}
if (AST_VECTOR_INIT(&state->eids, 2)) {
- ast_log(LOG_ERROR, "Unable to initialize eids for state '%s' in manager '%s'\n",
- id, stasis_topic_name(manager->all_topic));
- ao2_ref(state, -1);
- return NULL;
+ goto error_return;
}
- state->manager = ao2_bump(manager);
+ if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
+ goto error_return;
+ }
+
+ if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) {
+ goto error_return;
+ }
+
+ if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
+ goto error_return;
+ }
+
+ ao2_ref(proxy, -1);
return state;
-}
-/*!
- * \internal
- * \brief Create a state object, and add it to the manager.
- *
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
- * \param manager The manager to be added to
- * \param state_topic A state topic to be managed (if NULL id is required)
- * \param id The unique id for the state (if NULL state_topic is required)
- *
- * \return The added state object
- * \return NULL on error
- */
-static struct stasis_state *state_add(struct stasis_state_manager *manager,
- struct stasis_topic *state_topic, const char *id)
-{
- struct stasis_state *state = state_alloc(manager, state_topic, id);
-
- if (!state) {
- return NULL;
- }
-
- if (!ao2_link_flags(manager->states, state, OBJ_NOLOCK)) {
- ast_log(LOG_ERROR, "Unable to add state '%s' to manager '%s'\n",
- state->id ? state->id : "", stasis_topic_name(manager->all_topic));
- ao2_ref(state, -1);
- return NULL;
- }
-
- return state;
+error_return:
+ ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
+ id, stasis_topic_name(manager->all_topic));
+ ao2_cleanup(state);
+ ao2_cleanup(proxy);
+ return NULL;
}
/*!
* \internal
* \brief Find a state by id, or create one if not found and add it to the manager.
*
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
* \param manager The manager to be added to
* \param state_topic A state topic to be managed (if NULL id is required)
* \param id The unique id for the state (if NULL state_topic is required)
@@ -260,18 +267,26 @@
* \return The added state object
* \return NULL on error
*/
-static struct stasis_state *state_find_or_add(struct stasis_state_manager *manager,
- struct stasis_topic *state_topic, const char *id)
+#define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager,
+ struct stasis_topic *state_topic, const char *id,
+ const char *file, int line, const char *func)
{
struct stasis_state *state;
+ ao2_lock(manager->states);
if (ast_strlen_zero(id)) {
id = state_id_by_topic(manager->all_topic, state_topic);
}
- state = ao2_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
+ if (!state) {
+ state = state_alloc(manager, state_topic, id, file, line, func);
+ }
- return state ? state : state_add(manager, state_topic, id);
+ ao2_unlock(manager->states);
+
+ return state;
}
static void state_manager_dtor(void *obj)
@@ -317,7 +332,7 @@
}
manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- STATE_BUCKETS, stasis_state_hash_fn, NULL, stasis_state_cmp_fn);
+ STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
if (!manager->states) {
ao2_ref(manager, -1);
return NULL;
@@ -356,10 +371,7 @@
struct stasis_topic *topic;
struct stasis_state *state;
- ao2_lock(manager->states);
state = state_find_or_add(manager, NULL, id);
- ao2_unlock(manager->states);
-
if (!state) {
return NULL;
}
@@ -369,53 +381,6 @@
return topic;
}
-/*!
- * \internal
- * \brief Remove a state from the stasis manager
- *
- * State should only be removed from the manager under the following conditions:
- *
- * There are no more subscribers to it
- * There are no more explicit publishers publishing to it
- * There are no more implicit publishers publishing to it
- *
- * Subscribers and explicit publishers hold a reference to the state object itself, so
- * once a state's reference count drops to 2 (1 for the manager, 1 passed in) then we
- * know there are no more subscribers or explicit publishers. Implicit publishers are
- * tracked by eids, so once that container is empty no more implicit publishers exist
- * for the state either. Only then can a state be removed.
- *
- * \param state The state to remove
- */
-static void state_remove(struct stasis_state *state)
-{
- ao2_lock(state);
-
- /*
- * The manager's state container also needs to be locked here prior to checking
- * the state's reference count, and potentially removing since we don't want its
- * count to possibly increase between the check and unlinking.
- */
- ao2_lock(state->manager->states);
-
- /*
- * If there are only 2 references left then it's the one owned by the manager,
- * and the one passed in to this function. However, before removing it from the
- * manager we need to also check that no eid is associated with the given state.
- * If an eid still remains then this means that an implicit publisher is still
- * publishing to this state.
- */
- if (ao2_ref(state, 0) == 2 && AST_VECTOR_SIZE(&state->eids) == 0) {
- ao2_unlink_flags(state->manager->states, state, 0);
- }
-
- ao2_unlock(state->manager->states);
- ao2_unlock(state);
-
- /* Now it's safe to remove the reference that is held on the given state */
- ao2_ref(state, -1);
-}
-
struct stasis_state_subscriber {
/*! The stasis state subscribed to */
struct stasis_state *state;
@@ -441,7 +406,7 @@
--sub->state->num_subscribers;
ao2_unlock(sub->state);
- state_remove(sub->state);
+ ao2_ref(sub->state, -1);
}
struct stasis_state_subscriber *stasis_state_add_subscriber(
@@ -457,14 +422,11 @@
return NULL;
}
- ao2_lock(manager->states);
sub->state = state_find_or_add(manager, NULL, id);
if (!sub->state) {
- ao2_unlock(manager->states);
ao2_ref(sub, -1);
return NULL;
}
- ao2_unlock(manager->states);
ao2_lock(sub->state);
++sub->state->num_subscribers;
@@ -563,7 +525,7 @@
{
struct stasis_state_publisher *pub = obj;
- state_remove(pub->state);
+ ao2_ref(pub->state, -1);
}
struct stasis_state_publisher *stasis_state_add_publisher(
@@ -578,14 +540,11 @@
return NULL;
}
- ao2_lock(manager->states);
pub->state = state_find_or_add(manager, NULL, id);
if (!pub->state) {
- ao2_unlock(manager->states);
ao2_ref(pub, -1);
return NULL;
}
- ao2_unlock(manager->states);
return pub;
}
@@ -639,7 +598,10 @@
}
if (i == AST_VECTOR_SIZE(&state->eids)) {
- AST_VECTOR_APPEND(&state->eids, *eid);
+ if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
+ /* This ensures state cannot be freed if it has any eids */
+ ao2_ref(state, +1);
+ }
}
}
@@ -666,6 +628,8 @@
for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
+ /* Balance the reference from state_find_or_add_eid */
+ ao2_ref(state, -1);
return;
}
}
@@ -676,10 +640,7 @@
{
struct stasis_state *state;
- ao2_lock(manager->states);
state = state_find_or_add(manager, NULL, id);
- ao2_unlock(manager->states);
-
if (!state) {
return;
}
@@ -697,7 +658,7 @@
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
const char *id, const struct ast_eid *eid, struct stasis_message *msg)
{
- struct stasis_state *state = ao2_find(manager->states, id, OBJ_SEARCH_KEY);
+ struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
if (!state) {
/*
@@ -721,7 +682,7 @@
state_find_and_remove_eid(state, eid);
ao2_unlock(state);
- state_remove(state);
+ ao2_ref(state, -1);
}
int stasis_state_add_observer(struct stasis_state_manager *manager,
@@ -744,10 +705,8 @@
AST_VECTOR_RW_UNLOCK(&manager->observers);
}
-static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
+static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
{
- struct stasis_state *state = obj;
- on_stasis_state handler = arg;
struct stasis_message *msg;
int res;
@@ -764,24 +723,41 @@
return res;
}
+static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
+{
+ struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+
+ if (state) {
+ int res;
+ res = handle_stasis_state(state, arg, data);
+ ao2_ref(state, -1);
+ return res;
+ }
+
+ return 0;
+}
+
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler,
void *data)
{
ast_assert(handler != NULL);
ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
- handle_stasis_state, handler, data);
+ handle_stasis_state_proxy, handler, data);
}
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
{
- struct stasis_state *state = obj;
+ struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+ int res = 0;
- if (state->num_subscribers) {
- return handle_stasis_state(obj, arg, data, flags);
+ if (state && state->num_subscribers) {
+ res = handle_stasis_state(state, arg, data);
}
- return 0;
+ ao2_cleanup(state);
+
+ return res;
}
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,
--
To view, visit https://gerrit.asterisk.org/c/asterisk/+/12911
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: 17
Gerrit-Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9
Gerrit-Change-Number: 12911
Gerrit-PatchSet: 3
Gerrit-Owner: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20191004/9ef7a077/attachment-0001.html>
More information about the asterisk-code-review
mailing list