[Asterisk-code-review] stasis_state: Create internal stasis_state_proxy object. (...asterisk[17])

George Joseph asteriskteam at digium.com
Fri Oct 4 09:54:39 CDT 2019


George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/12911 )

Change subject: stasis_state: Create internal stasis_state_proxy object.
......................................................................

stasis_state: Create internal stasis_state_proxy object.

This improves the way which stasis_state reference counting works.
Since manager->states holds onto the proxy object instead of the real
object this allows stasis_state objects to be freed when appropriate
without use of a special state_remove function.  Additionally each
distinct eid associated with the state holds a reference to the state to
prevent early release and potentially allow easier debug of leaks.

Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9
---
M main/stasis_state.c
1 file changed, 147 insertions(+), 171 deletions(-)

Approvals:
  George Joseph: Looks good to me, approved; Approved for Submit



diff --git a/main/stasis_state.c b/main/stasis_state.c
index aa00f9a..b85462f 100644
--- a/main/stasis_state.c
+++ b/main/stasis_state.c
@@ -26,6 +26,18 @@
 
 /*!
  * \internal
+ * \brief Used to link a stasis_state to it's manager
+ */
+struct stasis_state_proxy {
+	AO2_WEAKPROXY();
+	/*! The manager that owns and handles this state */
+	struct stasis_state_manager *manager;
+	/*! A unique id for this state object. */
+	char id[0];
+};
+
+/*!
+ * \internal
  * \brief Associates a stasis topic to its last known published message
  *
  * This object's lifetime is tracked by the number of publishers and subscribers to it.
@@ -38,7 +50,10 @@
 struct stasis_state {
 	/*! The number of state subscribers */
 	unsigned int num_subscribers;
-	/*! The manager that owns and handles this state */
+	/*!
+	 * \brief The manager that owns and handles this state
+	 * \note This reference is owned by stasis_state_proxy
+	 */
 	struct stasis_state_manager *manager;
 	/*! Forwarding information, i.e. this topic to manager's topic */
 	struct stasis_forward *forward;
@@ -52,11 +67,11 @@
 	 */
 	AST_VECTOR(, struct ast_eid) eids;
 	/*! A unique id for this state object. */
-	char id[0];
+	char *id;
 };
 
-AO2_STRING_FIELD_HASH_FN(stasis_state, id);
-AO2_STRING_FIELD_CMP_FN(stasis_state, id);
+AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id);
+AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id);
 
 /*! The number of buckets to use for managed states */
 #define STATE_BUCKETS 57
@@ -112,17 +127,28 @@
 	state->topic = NULL;
 	ao2_cleanup(state->msg);
 	state->msg = NULL;
-	ao2_cleanup(state->manager);
-	state->manager = NULL;
 
 	/* All eids should have been removed */
 	ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
 	AST_VECTOR_FREE(&state->eids);
 }
 
+static void state_proxy_dtor(void *obj) {
+	struct stasis_state_proxy *proxy = obj;
+
+	ao2_cleanup(proxy->manager);
+}
+
+static void state_proxy_sub_cb(void *obj, void *data)
+{
+	struct stasis_state_proxy *proxy = obj;
+
+	ao2_unlink(proxy->manager->states, proxy);
+}
+
 /*!
  * \internal
- * \brief Allocate a stasis state object.
+ * \brief Allocate a stasis state object and add it to the manager.
  *
  * Create and initialize a state structure. It's required that either a state
  * topic, or an id is specified. If a state topic is not given then one will be
@@ -134,45 +160,16 @@
  *
  * \return A stasis_state object or NULL
  * \return NULL on error
+ *
+ * \pre manager->states must be locked.
+ * \pre manager->states does not contain an object matching key \a id.
  */
 static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
-	struct stasis_topic *state_topic, const char *id)
+	struct stasis_topic *state_topic, const char *id,
+	const char *file, int line, const char *func)
 {
-	struct stasis_state *state;
-
-	if (!state_topic) {
-		char *name;
-
-		/* If not given a state topic, then an id is required */
-		ast_assert(id != NULL);
-
-		/*
-		 * To provide further detail and to ensure that the topic is unique within the
-		 * scope of the system we prefix it with the manager's topic name, which should
-		 * itself already be unique.
-		 */
-		if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
-			ast_log(LOG_ERROR, "Unable to create state topic name '%s/%s'\n",
-					stasis_topic_name(manager->all_topic), id);
-			return NULL;
-		}
-
-		state_topic = stasis_topic_create(name);
-
-		if (!state_topic) {
-			ast_log(LOG_ERROR, "Unable to create state topic '%s'\n", name);
-			ast_free(name);
-			return NULL;
-		}
-		ast_free(name);
-	} else {
-		/*
-		 * Since the state topic was passed in, go ahead and bump its reference.
-		 * By doing this here first, it allows us to consistently decrease the reference on
-		 * state allocation error.
-		 */
-		ao2_ref(state_topic, +1);
-	}
+	struct stasis_state_proxy *proxy = NULL;
+	struct stasis_state *state = NULL;
 
 	if (!id) {
 		/* If not given an id, then a state topic is required */
@@ -182,77 +179,87 @@
 		id = state_id_by_topic(manager->all_topic, state_topic);
 	}
 
-	state = ao2_alloc(sizeof(*state) + strlen(id) + 1, state_dtor);
+	state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
 	if (!state) {
-		ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
-				id, stasis_topic_name(manager->all_topic));
-		ao2_ref(state_topic, -1);
-		return NULL;
+		goto error_return;
 	}
 
-	strcpy(state->id, id); /* Safe */
-	state->topic = state_topic; /* ref already bumped above */
+	if (!state_topic) {
+		char *name;
+
+		/*
+		 * To provide further detail and to ensure that the topic is unique within the
+		 * scope of the system we prefix it with the manager's topic name, which should
+		 * itself already be unique.
+		 */
+		if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
+			goto error_return;
+		}
+
+		state->topic = stasis_topic_create(name);
+
+		ast_free(name);
+		if (!state->topic) {
+			goto error_return;
+		}
+	} else {
+		/*
+		 * Since the state topic was passed in, go ahead and bump its reference.
+		 * By doing this here first, it allows us to consistently decrease the reference on
+		 * state allocation error.
+		 */
+		ao2_ref(state_topic, +1);
+		state->topic = state_topic;
+	}
+
+	proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
+	if (!proxy) {
+		goto error_return;
+	}
+
+	strcpy(proxy->id, id); /* Safe */
+
+	state->id = proxy->id;
+	proxy->manager = ao2_bump(manager);
+	state->manager = proxy->manager; /* state->manager is owned by the proxy */
 
 	state->forward = stasis_forward_all(state->topic, manager->all_topic);
 	if (!state->forward) {
-		ast_log(LOG_ERROR, "Unable to add state '%s' forward in manager '%s'\n",
-				id, stasis_topic_name(manager->all_topic));
-		ao2_ref(state, -1);
-		return NULL;
+		goto error_return;
 	}
 
 	if (AST_VECTOR_INIT(&state->eids, 2)) {
-		ast_log(LOG_ERROR, "Unable to initialize eids for state '%s' in manager '%s'\n",
-				id, stasis_topic_name(manager->all_topic));
-		ao2_ref(state, -1);
-		return NULL;
+		goto error_return;
 	}
 
-	state->manager = ao2_bump(manager);
+	if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
+		goto error_return;
+	}
+
+	if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) {
+		goto error_return;
+	}
+
+	if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
+		goto error_return;
+	}
+
+	ao2_ref(proxy, -1);
 
 	return state;
-}
 
-/*!
- * \internal
- * \brief Create a state object, and add it to the manager.
- *
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
- * \param manager The manager to be added to
- * \param state_topic A state topic to be managed (if NULL id is required)
- * \param id The unique id for the state (if NULL state_topic is required)
- *
- * \return The added state object
- * \return NULL on error
- */
-static struct stasis_state *state_add(struct stasis_state_manager *manager,
-	struct stasis_topic *state_topic, const char *id)
-{
-	struct stasis_state *state = state_alloc(manager, state_topic, id);
-
-	if (!state) {
-		return NULL;
-	}
-
-	if (!ao2_link_flags(manager->states, state, OBJ_NOLOCK)) {
-		ast_log(LOG_ERROR, "Unable to add state '%s' to manager '%s'\n",
-				state->id ? state->id : "", stasis_topic_name(manager->all_topic));
-		ao2_ref(state, -1);
-		return NULL;
-	}
-
-	return state;
+error_return:
+	ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
+			id, stasis_topic_name(manager->all_topic));
+	ao2_cleanup(state);
+	ao2_cleanup(proxy);
+	return NULL;
 }
 
 /*!
  * \internal
  * \brief Find a state by id, or create one if not found and add it to the manager.
  *
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
  * \param manager The manager to be added to
  * \param state_topic A state topic to be managed (if NULL id is required)
  * \param id The unique id for the state (if NULL state_topic is required)
@@ -260,18 +267,26 @@
  * \return The added state object
  * \return NULL on error
  */
-static struct stasis_state *state_find_or_add(struct stasis_state_manager *manager,
-	struct stasis_topic *state_topic, const char *id)
+#define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager,
+	struct stasis_topic *state_topic, const char *id,
+	const char *file, int line, const char *func)
 {
 	struct stasis_state *state;
 
+	ao2_lock(manager->states);
 	if (ast_strlen_zero(id)) {
 		id = state_id_by_topic(manager->all_topic, state_topic);
 	}
 
-	state = ao2_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
+	if (!state) {
+		state = state_alloc(manager, state_topic, id, file, line, func);
+	}
 
-	return state ? state : state_add(manager, state_topic, id);
+	ao2_unlock(manager->states);
+
+	return state;
 }
 
 static void state_manager_dtor(void *obj)
@@ -317,7 +332,7 @@
 	}
 
 	manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
-		STATE_BUCKETS, stasis_state_hash_fn, NULL, stasis_state_cmp_fn);
+		STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
 	if (!manager->states) {
 		ao2_ref(manager, -1);
 		return NULL;
@@ -356,10 +371,7 @@
 	struct stasis_topic *topic;
 	struct stasis_state *state;
 
-	ao2_lock(manager->states);
 	state = state_find_or_add(manager, NULL, id);
-	ao2_unlock(manager->states);
-
 	if (!state) {
 		return NULL;
 	}
@@ -369,53 +381,6 @@
 	return topic;
 }
 
-/*!
- * \internal
- * \brief Remove a state from the stasis manager
- *
- * State should only be removed from the manager under the following conditions:
- *
- *   There are no more subscribers to it
- *   There are no more explicit publishers publishing to it
- *   There are no more implicit publishers publishing to it
- *
- * Subscribers and explicit publishers hold a reference to the state object itself, so
- * once a state's reference count drops to 2 (1 for the manager, 1 passed in) then we
- * know there are no more subscribers or explicit publishers. Implicit publishers are
- * tracked by eids, so once that container is empty no more implicit publishers exist
- * for the state either. Only then can a state be removed.
- *
- * \param state The state to remove
- */
-static void state_remove(struct stasis_state *state)
-{
-	ao2_lock(state);
-
-	/*
-	 * The manager's state container also needs to be locked here prior to checking
-	 * the state's reference count, and potentially removing since we don't want its
-	 * count to possibly increase between the check and unlinking.
-	 */
-	ao2_lock(state->manager->states);
-
-	/*
-	 * If there are only 2 references left then it's the one owned by the manager,
-	 * and the one passed in to this function. However, before removing it from the
-	 * manager we need to also check that no eid is associated with the given state.
-	 * If an eid still remains then this means that an implicit publisher is still
-	 * publishing to this state.
-	 */
-	if (ao2_ref(state, 0) == 2 && AST_VECTOR_SIZE(&state->eids) == 0) {
-		ao2_unlink_flags(state->manager->states, state, 0);
-	}
-
-	ao2_unlock(state->manager->states);
-	ao2_unlock(state);
-
-	/* Now it's safe to remove the reference that is held on the given state */
-	ao2_ref(state, -1);
-}
-
 struct stasis_state_subscriber {
 	/*! The stasis state subscribed to */
 	struct stasis_state *state;
@@ -441,7 +406,7 @@
 	--sub->state->num_subscribers;
 	ao2_unlock(sub->state);
 
-	state_remove(sub->state);
+	ao2_ref(sub->state, -1);
 }
 
 struct stasis_state_subscriber *stasis_state_add_subscriber(
@@ -457,14 +422,11 @@
 		return NULL;
 	}
 
-	ao2_lock(manager->states);
 	sub->state = state_find_or_add(manager, NULL, id);
 	if (!sub->state) {
-		ao2_unlock(manager->states);
 		ao2_ref(sub, -1);
 		return NULL;
 	}
-	ao2_unlock(manager->states);
 
 	ao2_lock(sub->state);
 	++sub->state->num_subscribers;
@@ -563,7 +525,7 @@
 {
 	struct stasis_state_publisher *pub = obj;
 
-	state_remove(pub->state);
+	ao2_ref(pub->state, -1);
 }
 
 struct stasis_state_publisher *stasis_state_add_publisher(
@@ -578,14 +540,11 @@
 		return NULL;
 	}
 
-	ao2_lock(manager->states);
 	pub->state = state_find_or_add(manager, NULL, id);
 	if (!pub->state) {
-		ao2_unlock(manager->states);
 		ao2_ref(pub, -1);
 		return NULL;
 	}
-	ao2_unlock(manager->states);
 
 	return pub;
 }
@@ -639,7 +598,10 @@
 	}
 
 	if (i == AST_VECTOR_SIZE(&state->eids)) {
-		AST_VECTOR_APPEND(&state->eids, *eid);
+		if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
+			/* This ensures state cannot be freed if it has any eids */
+			ao2_ref(state, +1);
+		}
 	}
 }
 
@@ -666,6 +628,8 @@
 	for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
 		if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
 			AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
+			/* Balance the reference from state_find_or_add_eid */
+			ao2_ref(state, -1);
 			return;
 		}
 	}
@@ -676,10 +640,7 @@
 {
 	struct stasis_state *state;
 
-	ao2_lock(manager->states);
 	state = state_find_or_add(manager, NULL, id);
-	ao2_unlock(manager->states);
-
 	if (!state) {
 		return;
 	}
@@ -697,7 +658,7 @@
 void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
 	const char *id, const struct ast_eid *eid, struct stasis_message *msg)
 {
-	struct stasis_state *state = ao2_find(manager->states, id, OBJ_SEARCH_KEY);
+	struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
 
 	if (!state) {
 		/*
@@ -721,7 +682,7 @@
 	state_find_and_remove_eid(state, eid);
 	ao2_unlock(state);
 
-	state_remove(state);
+	ao2_ref(state, -1);
 }
 
 int stasis_state_add_observer(struct stasis_state_manager *manager,
@@ -744,10 +705,8 @@
 	AST_VECTOR_RW_UNLOCK(&manager->observers);
 }
 
-static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
+static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
 {
-	struct stasis_state *state = obj;
-	on_stasis_state handler = arg;
 	struct stasis_message *msg;
 	int res;
 
@@ -764,24 +723,41 @@
 	return res;
 }
 
+static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
+{
+	struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+
+	if (state) {
+		int res;
+		res = handle_stasis_state(state, arg, data);
+		ao2_ref(state, -1);
+		return res;
+	}
+
+	return 0;
+}
+
 void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler,
 	void *data)
 {
 	ast_assert(handler != NULL);
 
 	ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
-		handle_stasis_state, handler, data);
+		handle_stasis_state_proxy, handler, data);
 }
 
 static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
 {
-	struct stasis_state *state = obj;
+	struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+	int res = 0;
 
-	if (state->num_subscribers) {
-		return handle_stasis_state(obj, arg, data, flags);
+	if (state && state->num_subscribers) {
+		res = handle_stasis_state(state, arg, data);
 	}
 
-	return 0;
+	ao2_cleanup(state);
+
+	return res;
 }
 
 void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/12911
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 17
Gerrit-Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9
Gerrit-Change-Number: 12911
Gerrit-PatchSet: 3
Gerrit-Owner: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20191004/9ef7a077/attachment-0001.html>


More information about the asterisk-code-review mailing list