[asterisk-commits] mjordan: branch mjordan/trunk-astdb-cluster r432635 - in /team/mjordan/trunk-...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Mar 9 09:13:40 CDT 2015


Author: mjordan
Date: Mon Mar  9 09:13:38 2015
New Revision: 432635

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=432635
Log:
Initial commit

It mostly lives, breathes, and works. At least with PJSIP.

Modified:
    team/mjordan/trunk-astdb-cluster/funcs/func_db.c
    team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
    team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
    team/mjordan/trunk-astdb-cluster/main/db.c
    team/mjordan/trunk-astdb-cluster/main/utils.c
    team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
    team/mjordan/trunk-astdb-cluster/tests/test_db.c

Modified: team/mjordan/trunk-astdb-cluster/funcs/func_db.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/funcs/func_db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/funcs/func_db.c (original)
+++ team/mjordan/trunk-astdb-cluster/funcs/func_db.c Mon Mar  9 09:13:38 2015
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2005-2006, Russell Bryant <russelb at clemson.edu> 
+ * Copyright (C) 2005-2015, Russell Bryant <russelb at clemson.edu> 
  *
  * func_db.c adapted from the old app_db.c, copyright by the following people 
  * Copyright (C) 2005, Mark Spencer <markster at digium.com>
@@ -23,6 +23,7 @@
  * \brief Functions for interaction with the Asterisk database
  *
  * \author Russell Bryant <russelb at clemson.edu>
+ * \author Matt Jordan <mjordan at digium.com>
  *
  * \ingroup functions
  */
@@ -52,6 +53,30 @@
 		<syntax argsep="/">
 			<parameter name="family" required="true" />
 			<parameter name="key" required="true" />
+		</syntax>
+		<description>
+			<para>This function will read from or write a value to the Asterisk database.  On a
+			read, this function returns the corresponding value from the database, or blank
+			if it does not exist.  Reading a database value will also set the variable
+			DB_RESULT.  If you wish to find out if an entry exists, use the DB_EXISTS
+			function.</para>
+		</description>
+		<see-also>
+			<ref type="application">DBdel</ref>
+			<ref type="function">DB_DELETE</ref>
+			<ref type="application">DBdeltree</ref>
+			<ref type="function">DB_EXISTS</ref>
+		</see-also>
+	</function>
+	<function name="DB_SHARED" language="en_US">
+		<synopsis>
+			Create or delete a shared family in the Asterisk database.
+		</synopsis>
+		<syntax argsep="/">
+			<parameter name="action" required="true">
+			</parameter>
+			<parameter name="type">
+			</parameter>
 		</syntax>
 		<description>
 			<para>This function will read from or write a value to the Asterisk database.  On a
@@ -200,14 +225,14 @@
 	buf[0] = '\0';
 
 	if (ast_strlen_zero(parse)) {
-		ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n");
+		ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n");
 		return -1;
 	}
 
 	AST_NONSTANDARD_APP_ARGS(args, parse, '/');
 
 	if (args.argc < 2) {
-		ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n");
+		ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n");
 		return -1;
 	}
 
@@ -335,6 +360,106 @@
 	.write = function_db_delete_write,
 };
 
+static int function_db_shared_exists_read(struct ast_channel *chan,
+	const char *cmd, char *parse, char *buf, size_t len)
+{
+	AST_DECLARE_APP_ARGS(args,
+		AST_APP_ARG(family);
+	);
+
+	buf[0] = '\0';
+
+	if (ast_strlen_zero(parse)) {
+		ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n");
+		return -1;
+	}
+
+	AST_STANDARD_APP_ARGS(args, parse);
+
+	if (args.argc != 1) {
+		ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n");
+		return -1;
+	}
+
+	if (ast_db_is_shared(args.family)) {
+		ast_copy_string(buf, "1", len);
+	} else {
+		ast_copy_string(buf, "0", len);
+	}
+	pbx_builtin_setvar_helper(chan, "DB_RESULT", buf);
+
+	return 0;
+}
+
+static struct ast_custom_function db_shared_exists_function = {
+	.name = "DB_SHARED_EXISTS",
+	.read = function_db_shared_exists_read,
+};
+
+static int function_db_shared_write(struct ast_channel *chan, const char *cmd, char *parse,
+	const char *value)
+{
+	enum ast_db_shared_type share_type;
+
+	AST_DECLARE_APP_ARGS(args,
+		AST_APP_ARG(action);
+		AST_APP_ARG(type);
+	);
+
+	if (ast_strlen_zero(parse)) {
+		ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n");
+		return -1;
+	}
+
+	AST_STANDARD_APP_ARGS(args, parse);
+
+	if (args.argc < 1) {
+		ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n");
+		return -1;
+	}
+
+	if (ast_strlen_zero(value)) {
+		ast_log(LOG_WARNING, "DB_SHARED requires a value, DB_SHARED(<action>[,<type>])=<family>\n");
+		return -1;
+	}
+
+	if (!strcasecmp(args.action, "put")) {
+		if (ast_strlen_zero(args.type) || !strcasecmp(args.type, "global")) {
+			share_type = SHARED_DB_TYPE_GLOBAL;
+		} else if (!strcasecmp(args.type, "unique")) {
+			share_type = SHARED_DB_TYPE_UNIQUE;
+		} else {
+			ast_log(LOG_WARNING, "DB_SHARED: Unknown 'type' %s\n", args.type);
+			return -1;
+		}
+
+		if (ast_db_put_shared(value, share_type)) {
+			/* Generally, failure is benign (key exists) */
+			ast_debug(2, "Failed to create shared family '%s'\n", value);
+		} else {
+			ast_verb(4, "Created %s shared family '%s'",
+				share_type == SHARED_DB_TYPE_GLOBAL ? "GLOBAL" : "UNIQUE",
+				value);
+		}
+	} else if (!strcasecmp(args.action, "delete")) {
+		if (ast_db_del_shared(value)) {
+			/* Generally, failure is benign (key doesn't exist) */
+			ast_debug(2, "Failed to delete shared family '%s'\n", value);
+		} else {
+			ast_verb(4, "Deleted shared family '%s'\n", value);
+		}
+	} else {
+		ast_log(LOG_WARNING, "DB_SHARED: Unknown 'action' %s\n", args.action);
+	}
+
+	return 0;
+}
+
+static struct ast_custom_function db_shared_function = {
+	.name = "DB_SHARED",
+	.write = function_db_shared_write,
+};
+
 static int unload_module(void)
 {
 	int res = 0;
@@ -343,6 +468,8 @@
 	res |= ast_custom_function_unregister(&db_exists_function);
 	res |= ast_custom_function_unregister(&db_delete_function);
 	res |= ast_custom_function_unregister(&db_keys_function);
+	res |= ast_custom_function_unregister(&db_shared_function);
+	res |= ast_custom_function_unregister(&db_shared_exists_function);
 
 	return res;
 }
@@ -355,6 +482,8 @@
 	res |= ast_custom_function_register(&db_exists_function);
 	res |= ast_custom_function_register_escalating(&db_delete_function, AST_CFE_READ);
 	res |= ast_custom_function_register(&db_keys_function);
+	res |= ast_custom_function_register_escalating(&db_shared_function, AST_CFE_WRITE);
+	res |= ast_custom_function_register(&db_shared_exists_function);
 
 	return res;
 }

Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h Mon Mar  9 09:13:38 2015
@@ -28,11 +28,73 @@
 extern "C" {
 #endif
 
+#include "asterisk/utils.h"
+
+enum ast_db_shared_type {
+	/* Items in the shared family are common across all Asterisk instances */
+	SHARED_DB_TYPE_GLOBAL = 0,
+	/*! Items in the shared family are made unique across all Asterisk instances */
+	SHARED_DB_TYPE_UNIQUE,
+};
+
 struct ast_db_entry {
 	struct ast_db_entry *next;
 	char *key;
 	char data[0];
 };
+
+struct stasis_topic;
+struct stasis_message_type;
+
+struct ast_db_shared_family {
+	/*! How the family is shared */
+	enum ast_db_shared_type share_type;
+	/*! Entries in the family, if appropriate */
+	struct ast_db_entry *entries;
+	/*! The name of the shared family */
+	char name[0];
+};
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value);
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type);
+
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid);
+
+void ast_db_refresh_shared(void);
+
+/*! \addtogroup StasisTopicsAndMessages
+ * @{
+ */
+
+struct stasis_topic *ast_db_cluster_topic(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message sent from this Asterisk instance
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_put_shared_type(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message received from some external source
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_del_shared_type(void);
+
+/* }@ */
+
+/*!
+ * \brief @@@@
+ */
+int ast_db_put_shared(const char *family, enum ast_db_shared_type);
+
+int ast_db_del_shared(const char *family);
+
+int ast_db_is_shared(const char *family);
 
 /*! \brief Get key value specified by family/key */
 int ast_db_get(const char *family, const char *key, char *value, int valuelen);

Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h Mon Mar  9 09:13:38 2015
@@ -922,7 +922,7 @@
  * \brief Convert an EID to a string
  * \since 1.6.1
  */
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid);
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid);
 
 /*!
  * \brief Convert a string into an EID

Modified: team/mjordan/trunk-astdb-cluster/main/db.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/db.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/db.c Mon Mar  9 09:13:38 2015
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2015, Digium, Inc.
  *
  * Mark Spencer <markster at digium.com>
  *
@@ -22,9 +22,6 @@
  *
  * \author Mark Spencer <markster at digium.com>
  *
- * \note DB3 is licensed under Sleepycat Public License and is thus incompatible
- * with GPL.  To avoid having to make another exception (and complicate
- * licensing even further) we elect to use DB1 which is BSD licensed
  */
 
 /*** MODULEINFO
@@ -45,11 +42,12 @@
 #include <dirent.h>
 #include <sqlite3.h>
 
-#include "asterisk/channel.h"
 #include "asterisk/file.h"
+#include "asterisk/utils.h"
+#include "asterisk/astdb.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/app.h"
-#include "asterisk/dsp.h"
-#include "asterisk/astdb.h"
 #include "asterisk/cli.h"
 #include "asterisk/utils.h"
 #include "asterisk/manager.h"
@@ -114,7 +112,16 @@
 static int doexit;
 static int dosync;
 
+/*! \brief A container of families to share across Asterisk instances */
+static struct ao2_container *shared_families;
+
+static struct stasis_topic *db_cluster_topic;
+
+static struct stasis_message_router *message_router;
+
 static void db_sync(void);
+
+#define SHARED_FAMILY "__asterisk_shared_family"
 
 #define DEFINE_SQL_STATEMENT(stmt,sql) static sqlite3_stmt *stmt; \
 	const char stmt##_sql[] = sql;
@@ -199,6 +206,76 @@
 
 	return res;
 }
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value)
+{
+	struct ast_db_entry *entry;
+
+	entry = ast_malloc(sizeof(*entry) + strlen(key) + strlen(value) + 2);
+	if (!entry) {
+		return NULL;
+	}
+	entry->next = NULL;
+	entry->key = entry->data + strlen(value) + 1;
+	strcpy(entry->data, value); /* safe */
+	strcpy(entry->key, key); /* safe */
+
+	return entry;
+}
+
+static void shared_db_family_dtor(void *obj)
+{
+	struct ast_db_shared_family *family = obj;
+
+	ast_db_freetree(family->entries);
+}
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type)
+{
+	struct ast_db_shared_family *shared_family;
+
+	shared_family = ao2_alloc_options(sizeof(*shared_family) + strlen(family) + 1,
+		shared_db_family_dtor, OBJ_NOLOCK);
+	if (!shared_family) {
+		return NULL;
+	}
+	strcpy(shared_family->name, family); /* safe */
+	shared_family->share_type = share_type;
+
+	return shared_family;
+}
+
+static struct ast_db_shared_family *db_shared_family_clone(const struct ast_db_shared_family *shared_family)
+{
+	struct ast_db_shared_family *clone;
+
+	clone = ast_db_shared_family_alloc(shared_family->name, shared_family->share_type);
+
+	return clone;
+}
+
+static int db_shared_family_sort_fn(const void *obj_left, const void *obj_right, int flags)
+{
+	const struct ast_db_shared_family *left = obj_left;
+	const struct ast_db_shared_family *right = obj_right;
+	const char *right_key = obj_right;
+	int cmp;
+
+	switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+	default:
+	case OBJ_POINTER:
+		right_key = right->name;
+		/* Fall through */
+	case OBJ_KEY:
+		cmp = strcmp(left->name, right_key);
+		break;
+	case OBJ_PARTIAL_KEY:
+		cmp = strncmp(left->name, right_key, strlen(right_key));
+		break;
+	}
+	return cmp;
+}
+
 
 static int db_create_astdb(void)
 {
@@ -308,7 +385,152 @@
 	return db_execute_sql("ROLLBACK", NULL, NULL);
 }
 
-int ast_db_put(const char *family, const char *key, const char *value)
+static int db_put_common(const char *family, const char *key, const char *value, int share);
+
+int ast_db_put_shared(const char *family, enum ast_db_shared_type share_type)
+{
+	struct ast_db_shared_family *shared_family;
+
+	if (ast_strlen_zero(family)) {
+		return -1;
+	}
+
+	ao2_lock(shared_families);
+
+	shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (shared_family) {
+		ao2_ref(shared_family, -1);
+		ao2_unlock(shared_families);
+		return -1;
+	}
+
+	shared_family = ast_db_shared_family_alloc(family, share_type);
+	if (!shared_family) {
+		ao2_unlock(shared_families);
+		return -1;
+	}
+
+	ao2_link_flags(shared_families, shared_family, OBJ_NOLOCK);
+
+	db_put_common(SHARED_FAMILY, shared_family->name,
+		share_type == SHARED_DB_TYPE_UNIQUE ? "UNIQUE" : "GLOBAL", 0);
+
+	ao2_ref(shared_family, -1);
+
+	ao2_unlock(shared_families);
+
+	return 0;
+}
+
+int ast_db_is_shared(const char *family)
+{
+	struct ast_db_shared_family *shared_family;
+	int res = 0;
+
+	shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY);
+	if (shared_family) {
+		res = 1;
+		ao2_ref(shared_family, -1);
+	}
+
+	return res;
+}
+
+static int db_put_shared(const char *family, const char *key, const char *value)
+{
+	struct ast_db_shared_family *shared_family;
+	struct ast_db_shared_family *clone;
+
+	/* See if we are shared */
+	shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY);
+	if (!shared_family) {
+		return 0;
+	}
+
+	/* Create a Stasis message for the new item */
+	clone = db_shared_family_clone(shared_family);
+	if (!clone) {
+		ao2_ref(shared_family, -1);
+		return -1;
+	}
+	clone->entries = ast_db_entry_create(key, value);
+	if (!clone->entries) {
+		ao2_ref(shared_family, -1);
+		ao2_ref(clone, -1);
+		return -1;
+	}
+
+	/* Publish */
+	ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL);
+
+	ao2_ref(shared_family, -1);
+
+	return 0;
+}
+
+static int db_del_shared(const char *family, const char *key)
+{
+	struct ast_db_shared_family *shared_family;
+	struct ast_db_shared_family *clone;
+
+	/* See if we are shared */
+	shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY);
+	if (!shared_family) {
+		return 0;
+	}
+
+	if (ast_strlen_zero(key)) {
+		clone = ao2_bump(shared_family);
+	} else {
+		clone = db_shared_family_clone(shared_family);
+		if (!clone) {
+			ao2_ref(shared_family, -1);
+			return -1;
+		}
+		clone->entries = ast_db_entry_create(key, "");
+		if (!clone->entries) {
+			ao2_ref(shared_family, -1);
+			ao2_ref(clone, -1);
+			return -1;
+		}
+	}
+
+	/* Publish */
+	ast_db_publish_shared_message(ast_db_del_shared_type(), clone, NULL);
+
+	ao2_ref(shared_family, -1);
+
+	return 0;
+}
+
+static int db_del_common(const char *family, const char *key, int share);
+
+int ast_db_del_shared(const char *family)
+{
+	struct ast_db_shared_family *shared_family;
+	int res = 0;
+
+	if (ast_strlen_zero(family)) {
+		return -1;
+	}
+
+	ao2_lock(shared_families);
+
+	shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (shared_family) {
+		ao2_unlink_flags(shared_families, shared_family, OBJ_NOLOCK);
+		db_del_common(SHARED_FAMILY, shared_family->name, 0);
+		ao2_ref(shared_family, -1);
+	} else {
+		res = -1;
+	}
+
+	ao2_unlock(shared_families);
+
+	return res;
+}
+
+static int db_put_common(const char *family, const char *key, const char *value, int share)
 {
 	char fullkey[MAX_DB_FIELD];
 	size_t fullkey_len;
@@ -335,9 +557,17 @@
 
 	sqlite3_reset(put_stmt);
 	db_sync();
+	if (share) {
+		db_put_shared(family, key, value);
+	}
 	ast_mutex_unlock(&dblock);
 
 	return res;
+}
+
+int ast_db_put(const char *family, const char *key, const char *value)
+{
+	return db_put_common(family, key, value, 1);
 }
 
 /*!
@@ -410,7 +640,7 @@
 	return db_get_common(family, key, out, -1);
 }
 
-int ast_db_del(const char *family, const char *key)
+static int db_del_common(const char *family, const char *key, int share)
 {
 	char fullkey[MAX_DB_FIELD];
 	size_t fullkey_len;
@@ -433,12 +663,20 @@
 	}
 	sqlite3_reset(del_stmt);
 	db_sync();
+	if (share) {
+		db_del_shared(family, key);
+	}
 	ast_mutex_unlock(&dblock);
 
-	return res;
-}
-
-int ast_db_deltree(const char *family, const char *keytree)
+	return res;	
+}
+
+int ast_db_del(const char *family, const char *key)
+{
+	return db_del_common(family, key, 1);
+}
+
+static int db_deltree_common(const char *family, const char *keytree, int share)
 {
 	sqlite3_stmt *stmt = deltree_stmt;
 	char prefix[MAX_DB_FIELD];
@@ -468,9 +706,17 @@
 	res = sqlite3_changes(astdb);
 	sqlite3_reset(stmt);
 	db_sync();
+	if (share) {
+		db_del_shared(prefix, NULL);
+	}
 	ast_mutex_unlock(&dblock);
 
-	return res;
+	return res;	
+}
+
+int ast_db_deltree(const char *family, const char *keytree)
+{
+	return db_deltree_common(family, keytree, 1);
 }
 
 struct ast_db_entry *ast_db_gettree(const char *family, const char *keytree)
@@ -508,13 +754,10 @@
 		if (!(value_s = (const char *) sqlite3_column_text(stmt, 1))) {
 			break;
 		}
-		if (!(cur = ast_malloc(sizeof(*cur) + strlen(key_s) + strlen(value_s) + 2))) {
+		cur = ast_db_entry_create(key_s, value_s);
+		if (!cur) {
 			break;
 		}
-		cur->next = NULL;
-		cur->key = cur->data + strlen(value_s) + 1;
-		strcpy(cur->data, value_s);
-		strcpy(cur->key, key_s);
 		if (last) {
 			last->next = cur;
 		} else {
@@ -748,14 +991,26 @@
 
 	while (sqlite3_step(showkey_stmt) == SQLITE_ROW) {
 		const char *key_s, *value_s;
+		char *family_s;
+		char *delim;
+
 		if (!(key_s = (const char *) sqlite3_column_text(showkey_stmt, 0))) {
 			break;
 		}
 		if (!(value_s = (const char *) sqlite3_column_text(showkey_stmt, 1))) {
 			break;
 		}
+		family_s = ast_strdup(key_s);
+		if (!family_s) {
+			break;
+		}
+		delim = strchr(family_s + 1, '/');
+		*delim = '\0';
+
 		++counter;
-		ast_cli(a->fd, "%-50s: %-25s\n", key_s, value_s);
+		ast_cli(a->fd, "%-50s: %-25s %s\n", key_s, value_s,
+			ast_db_is_shared(family_s + 1) ? "(S)" : "");
+		ast_free(family_s);
 	}
 	sqlite3_reset(showkey_stmt);
 	ast_mutex_unlock(&dblock);
@@ -984,6 +1239,197 @@
 	return NULL;
 }
 
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid)
+{
+	struct stasis_message *message;
+
+	/* Aggregate doesn't really apply to the AstDB; as such, if we aren't
+	 * provided an EID use our own.
+	 */
+	if (!eid) {
+		eid = &ast_eid_default;
+	}
+
+	message = stasis_message_create_full(type, shared_family, eid);
+	if (!message) {
+		return -1;
+	}
+
+	stasis_publish(ast_db_cluster_topic(), message);
+
+	return 0;
+}
+
+void ast_db_refresh_shared(void)
+{
+	struct ao2_iterator it_shared_families;
+	struct ast_db_shared_family *shared_family;
+
+	it_shared_families = ao2_iterator_init(shared_families, 0);
+	while ((shared_family = ao2_iterator_next(&it_shared_families))) {
+		struct ast_db_shared_family *clone;
+
+		clone = db_shared_family_clone(shared_family);
+		if (!clone) {
+			ao2_ref(shared_family, -1);
+			continue;
+		}
+
+		clone->entries = ast_db_gettree(shared_family->name, "");
+		if (!clone->entries) {
+			ao2_ref(clone, -1);
+			ao2_ref(shared_family, -1);
+			continue;
+		}
+
+		ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL);
+
+		ao2_ref(clone, -1);
+		ao2_ref(shared_family, -1);
+	}
+	ao2_iterator_destroy(&it_shared_families);	
+}
+
+static struct ast_event *db_del_shared_type_to_event(struct stasis_message *message)
+{
+	return NULL;
+}
+
+static struct ast_json *db_entries_to_json(struct ast_db_entry *entry)
+{
+	struct ast_json *json;
+	struct ast_db_entry *cur;
+
+	json = ast_json_array_create();
+	if (!json) {
+		return NULL;
+	}
+
+	for (cur = entry; cur; cur = cur->next) {
+		struct ast_json *json_entry;
+
+		json_entry = ast_json_pack("{s: s, s: s}",
+			"key", cur->key,
+			"data", cur->data);
+		if (!json_entry) {
+			ast_json_unref(json);
+			return NULL;
+		}
+
+		if (ast_json_array_append(json, json_entry)) {
+			ast_json_unref(json);
+			return NULL;
+		}
+	}
+
+	return json;
+}
+
+static struct ast_json *db_shared_family_to_json(struct stasis_message *message,
+	const struct stasis_message_sanitizer *sanitize)
+{
+	struct stasis_message_type *type = stasis_message_type(message);
+	struct ast_db_shared_family *shared_family;
+
+	shared_family = stasis_message_data(message);
+	if (!shared_family) {
+		return NULL;
+	}
+
+	return ast_json_pack("{s: s, s: s, s: s, s: o}",
+		"verb", type == ast_db_put_shared_type() ? "put" : "delete",
+		"family", shared_family->name,
+		"share_type", shared_family->share_type == SHARED_DB_TYPE_UNIQUE ? "unique" : "global",
+		"entries", shared_family->entries ? db_entries_to_json(shared_family->entries) : ast_json_null());
+}
+
+static struct ast_event *db_put_shared_type_to_event(struct stasis_message *message)
+{
+	return NULL;
+}
+
+struct stasis_topic *ast_db_cluster_topic(void)
+{
+	return db_cluster_topic;
+}
+
+STASIS_MESSAGE_TYPE_DEFN(ast_db_put_shared_type,
+		.to_event = db_put_shared_type_to_event,
+		.to_json = db_shared_family_to_json,
+	);
+STASIS_MESSAGE_TYPE_DEFN(ast_db_del_shared_type,
+		.to_event = db_del_shared_type_to_event,
+		.to_json = db_shared_family_to_json,
+	);
+
+static void db_put_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	struct ast_db_shared_family *shared_family;
+	struct ast_db_shared_family *shared_check;
+	struct ast_db_entry *cur;
+	const struct ast_eid *eid;
+	char *family_id;
+
+	shared_family = stasis_message_data(message);
+	if (!shared_family) {
+		return;
+	}
+
+	eid = stasis_message_eid(message);
+	if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+		return;
+	}
+
+	/* Don't update if we don't have this area shared on this server */
+	shared_check = ao2_find(shared_families, shared_family->name, OBJ_KEY);
+	if (!shared_check) {
+		return;
+	}
+	ao2_ref(shared_check, -1);
+
+	if (shared_family->share_type == SHARED_DB_TYPE_UNIQUE) {
+		char eid_workspace[20];
+
+		/* Length is family + '/' + EID length (20) + 1 */
+		family_id = ast_alloca(strlen(shared_family->name) + 22);
+		ast_eid_to_str(eid_workspace, sizeof(eid_workspace), eid);
+		sprintf(family_id, "%s/%s", eid_workspace, shared_family->name); /* safe */
+	} else {
+		family_id = shared_family->name;
+	}
+
+	for (cur = shared_family->entries; cur; cur = cur->next) {
+		db_put_common(family_id, cur->key, cur->data, 0);
+	}
+}
+
+static void db_del_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	struct ast_db_shared_family *shared_family;
+	struct ast_db_entry *cur;
+	const struct ast_eid *eid;
+
+	shared_family = stasis_message_data(message);
+	if (!shared_family) {
+		return;
+	}
+
+	eid = stasis_message_eid(message);
+	if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+		return;
+	}
+
+	cur = shared_family->entries;
+	if (!cur) {
+		db_deltree_common(shared_family->name, NULL, 0);
+		return;
+	}
+
+	for (; cur; cur = cur->next) {
+		db_del_common(shared_family->name, cur->key, 0);
+	}
+}
+
 /*!
  * \internal
  * \brief Clean up resources on Asterisk shutdown
@@ -995,6 +1441,11 @@
 	ast_manager_unregister("DBPut");
 	ast_manager_unregister("DBDel");
 	ast_manager_unregister("DBDelTree");
+
+	ao2_cleanup(db_cluster_topic);
+	db_cluster_topic = NULL;
+	STASIS_MESSAGE_TYPE_CLEANUP(ast_db_put_shared_type);
+	STASIS_MESSAGE_TYPE_CLEANUP(ast_db_del_shared_type);
 
 	/* Set doexit to 1 to kill thread. db_sync must be called with
 	 * mutex held. */
@@ -1005,6 +1456,10 @@
 
 	pthread_join(syncthread, NULL);
 	ast_mutex_lock(&dblock);
+
+	ao2_ref(shared_families, -1);
+	shared_families = NULL;
+
 	clean_statements();
 	if (sqlite3_close(astdb) == SQLITE_OK) {
 		astdb = NULL;
@@ -1014,12 +1469,43 @@
 
 int astdb_init(void)
 {
+	shared_families = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+		AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT | AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+		db_shared_family_sort_fn, NULL);
+	if (!shared_families) {
+		return -1;
+	}
+
+	db_cluster_topic = stasis_topic_create("ast_db_cluster_topic");
+	if (!db_cluster_topic) {
+		ao2_ref(shared_families, -1);
+		return -1;
+	}
+
+	STASIS_MESSAGE_TYPE_INIT(ast_db_put_shared_type);
+	STASIS_MESSAGE_TYPE_INIT(ast_db_del_shared_type);
+
+	message_router = stasis_message_router_create_pool(ast_db_cluster_topic());
+	if (!message_router) {
+		ao2_ref(db_cluster_topic, -1);
+		ao2_ref(shared_families, -1);
+		return -1;
+	}
+	stasis_message_router_add(message_router, ast_db_put_shared_type(),
+		db_put_shared_msg_cb, NULL);
+	stasis_message_router_add(message_router, ast_db_del_shared_type(),
+		db_del_shared_msg_cb, NULL);
+
 	if (db_init()) {
+		ao2_ref(db_cluster_topic, -1);
+		ao2_ref(shared_families, -1);
 		return -1;
 	}
 
 	ast_cond_init(&dbcond, NULL);
 	if (ast_pthread_create_background(&syncthread, NULL, db_sync_thread, NULL)) {
+		ao2_ref(db_cluster_topic, -1);
+		ao2_ref(shared_families, -1);
 		return -1;
 	}
 

Modified: team/mjordan/trunk-astdb-cluster/main/utils.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/utils.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/utils.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/utils.c Mon Mar  9 09:13:38 2015
@@ -2691,7 +2691,7 @@
 }
 #endif	/* defined(AST_DEVMODE) */
 
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid)
 {
 	int x;
 	char *os = s;

Modified: team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c (original)
+++ team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c Mon Mar  9 09:13:38 2015
@@ -36,6 +36,7 @@
 #include "asterisk/module.h"
 #include "asterisk/logger.h"
 #include "asterisk/app.h"
+#include "asterisk/astdb.h"
 
 /*** DOCUMENTATION
 	<configInfo name="res_pjsip_publish_asterisk" language="en_US">
@@ -58,6 +59,9 @@
 				<configOption name="mailboxstate_publish">
 					<synopsis>Optional name of a publish item that can be used to publish a request for full mailbox state information.</synopsis>
 				</configOption>
+				<configOption name="dbstate_publish">
+					<synopsis>Optional name of a publish item that can be used to publish a request for full AstDB state information.</synopsis>
+				</configOption>
 				<configOption name="device_state" default="no">
 					<synopsis>Whether we should permit incoming device state events.</synopsis>
 				</configOption>
@@ -70,6 +74,12 @@
 				<configOption name="mailbox_state_filter">
 					<synopsis>Optional regular expression used to filter what mailboxes we accept events for.</synopsis>
 				</configOption>
+				<configOption name="db_state" default="no">
+					<synopsis>Whether we should permit incoming AstDB state events.</synopsis>
+				</configOption>
+				<configOption name="db_state_filter">
+					<synopsis>Optional regular expression used to filter what AstDB families we accept events for.</synopsis>
+				</configOption>
 				<configOption name="type">
 					<synopsis>Must be of type 'asterisk-publication'.</synopsis>
 				</configOption>
@@ -102,6 +112,18 @@
 	unsigned int mailbox_state_filter;
 };
 
+/*! \brief Structure which contains Asterisk AstDB publisher state information */
+struct asterisk_db_publisher_state {
+	/*! \brief The publish client to send PUBLISH messages on */
+	struct ast_sip_outbound_publish_client *client;
+	/*! \brief AstDB subscription */
+	struct stasis_subscription *db_state_subscription;
+	/*! \brief Regex used for filtering outbound db families */
+	regex_t db_state_regex;
+	/*! \brief AstDB families should be filtered */
+	unsigned int db_state_filter;
+};
+
 /*! \brief Structure which contains Asterisk publication information */
 struct asterisk_publication_config {
 	/*! \brief Sorcery object details */
@@ -112,6 +134,8 @@
 		AST_STRING_FIELD(devicestate_publish);
 		/*! \brief Optional name of a mailbox state publish item, used to request the remote side update us */
 		AST_STRING_FIELD(mailboxstate_publish);
+		/*! \brief Optional name of an AstDB publish item, used to request the remote side update us */
+		AST_STRING_FIELD(dbstate_publish);
 	);
 	/*! \brief Accept inbound device state events */
 	unsigned int device_state;
@@ -125,6 +149,12 @@
 	regex_t mailbox_state_regex;
 	/*! \brief Mailbox state should be filtered */
 	unsigned int mailbox_state_filter;
+	/*! \brief Accept inbound AstDB state events */
+	unsigned int db_state;
+	/*! \brief Regex used for filtering inbound AstDB state */
+	regex_t db_state_regex;
+	/*! \brief AstDB state should be filtered */
+	unsigned int db_state_filter;
 };
 
 /*! \brief Destroy callback for Asterisk devicestate publisher state information from datastore */
@@ -281,6 +311,78 @@
 	ast_json_unref(json);
 }
 
+/*!
+ * \brief Callback function for db state events
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void asterisk_publisher_dbstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+{
+	struct ast_datastore *datastore = data;
+	struct asterisk_db_publisher_state *publisher_state = datastore->data;
+	struct ast_json *json_db;
+	struct ast_json *json;
+	const struct ast_eid *eid;
+	char eid_str[20];
+	struct ast_db_shared_family *shared_family;
+	char *text;
+	struct ast_sip_body body = {
+		.type = "application",
+		.subtype = "json",
+	};
+
+	if (!stasis_subscription_is_subscribed(sub)) {
+		return;
+	}
+
+	eid = stasis_message_eid(msg);
+	if (!eid || ast_eid_cmp(&ast_eid_default, eid)) {
+		/* If the event is aggregate, unknown, or didn't originate from this
+		 * server, don't send it out. */
+		return;		
+	}
+
+	shared_family = stasis_message_data(msg);
+	if (!shared_family) {
+		return;
+	}
+
+	if (publisher_state->db_state_filter && regexec(&publisher_state->db_state_regex, shared_family->name, 0, NULL, 0)) {
+		/* Outgoing AstDB state is filtered and the family wasn't allowed */
+		return;
+	}
+
+	json_db = stasis_message_to_json(msg, NULL);
+	if (!json_db) {
+		return;
+	}
+
+
+	ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+	json = ast_json_pack(
+		"{ s: s, s: s, s: o }",
+		"type", "dbstate",
+		"eid", eid_str,
+		"dbstate", json_db);
+	if (!json) {
+		ast_json_unref(json_db);
+		return;
+	}
+
+	text = ast_json_dump_string(json);
+	if (!text) {
+		ast_json_unref(json);
+		return;
+	}
+	body.body_text = text;
+
+	ast_sip_publish_client_send(publisher_state->client, &body);
+
+	ast_json_free(text);
+	ast_json_unref(json);
+}
+
 static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
 	struct stasis_message *msg = obj;
@@ -469,6 +571,76 @@
 	.stop_publishing = asterisk_stop_mwi_publishing,
 };
 
+static int asterisk_start_db_publishing(struct ast_sip_outbound_publish *configuration,
+	struct ast_sip_outbound_publish_client *client)
+{
+	RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
+	struct asterisk_db_publisher_state *publisher_state;
+	const char *value;
+
+	datastore = ast_sip_publish_client_alloc_datastore(&asterisk_mwi_publisher_state_datastore, "asterisk-db-publisher");
+	if (!datastore) {
+		return -1;
+	}
+
+	publisher_state = ast_calloc(1, sizeof(*publisher_state));
+	if (!publisher_state) {
+		return -1;
+	}
+	datastore->data = publisher_state;
+
+	value = ast_sorcery_object_get_extended(configuration, "db_state_filter");
+	if (!ast_strlen_zero(value)) {
+		if (build_regex(&publisher_state->db_state_regex, value)) {
+			return -1;
+		}
+		publisher_state->db_state_filter = 1;
+	}
+
+	publisher_state->client = ao2_bump(client);
+
+	if (ast_sip_publish_client_add_datastore(client, datastore)) {
+		return -1;
+	}
+
+	publisher_state->db_state_subscription = stasis_subscribe(ast_db_cluster_topic(),
+		asterisk_publisher_dbstate_cb, ao2_bump(datastore));
+	if (!publisher_state->db_state_subscription) {
+		ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher");
+		ao2_ref(datastore, -1);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int asterisk_stop_db_publishing(struct ast_sip_outbound_publish_client *client)
+{
+	RAII_VAR(struct ast_datastore *, datastore, ast_sip_publish_client_get_datastore(client, "asterisk-db-publisher"),
+		ao2_cleanup);
+	struct asterisk_db_publisher_state *publisher_state;
+
+	if (!datastore) {
+		return 0;
+	}
+
+	publisher_state = datastore->data;
+	if (publisher_state->db_state_subscription) {
+		stasis_unsubscribe_and_join(publisher_state->db_state_subscription);
+		ao2_ref(datastore, -1);
+	}
+
+	ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher");
+
+	return 0;
+}
+
+struct ast_sip_event_publisher_handler asterisk_db_publisher_handler = {
+	.event_name = "asterisk-db",
+	.start_publishing = asterisk_start_db_publishing,
+	.stop_publishing = asterisk_stop_db_publishing,
+};
+
 static int asterisk_publication_new(struct ast_sip_endpoint *endpoint, const char *resource, const char *event_configuration)
 {
 	RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
@@ -545,6 +717,114 @@
 	mailbox = strsep(&item_id, "@");
 
 	ast_publish_mwi_state_full(mailbox, item_id, new_msgs, old_msgs, NULL, pubsub_eid);
+
+	return 0;
+}
+
+static int asterisk_publication_dbstate(struct ast_sip_publication *pub, struct asterisk_publication_config *config,
+	struct ast_eid *pubsub_eid, struct ast_json *json)
+{
+	struct ast_json *json_db = ast_json_object_get(json, "dbstate");
+	struct ast_json *json_entries;
+	struct stasis_message_type *type;
+	struct ast_db_shared_family *shared_family;
+	struct ast_db_entry *entry = NULL;
+	struct ast_db_entry *cur = NULL;
+	enum ast_db_shared_type share_type;
+	const char *family;
+	const char *verb;
+	const char *str_share_type;
+	int i;
+
+	if (!json_db) {
+		ast_debug(2, "Received AstDB state event with no 'dbstate' body\n");
+		return 0;
+	}
+
+	if (!config->db_state) {
+		ast_debug(2, "Received AstDB state event for resource '%s' but it is not configured to accept them\n",
+			ast_sorcery_object_get_id(config));
+		return 0;
+	}
+
+	family = ast_json_string_get(ast_json_object_get(json_db, "family"));
+	if (ast_strlen_zero(family)) {
+		ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'family'\n",
+			ast_sorcery_object_get_id(config));
+		return -1;
+	}
+
+	verb = ast_json_string_get(ast_json_object_get(json_db, "verb"));
+	if (ast_strlen_zero(verb)) {
+		ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'verb'\n",
+			ast_sorcery_object_get_id(config));
+		return -1;
+	} else if (!strcasecmp(verb, "put")) {
+		type = ast_db_put_shared_type();
+	} else if (!strcasecmp(verb, "delete")) {
+		type = ast_db_del_shared_type();
+	} else {
+		ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n",
+			ast_sorcery_object_get_id(config), verb);
+		return -1;
+	}
+
+	str_share_type = ast_json_string_get(ast_json_object_get(json_db, "share_type"));
+	if (ast_strlen_zero(str_share_type)) {
+		ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'share_type'\n",
+			ast_sorcery_object_get_id(config));
+		return -1;
+	} else if (!strcasecmp(str_share_type, "global")) {
+		share_type = SHARED_DB_TYPE_GLOBAL;
+	} else if (!strcasecmp(str_share_type, "unique")) {
+		share_type = SHARED_DB_TYPE_UNIQUE;
+	} else {
+		ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n",
+			ast_sorcery_object_get_id(config), str_share_type);
+		return -1;
+	}
+
+	json_entries = ast_json_object_get(json_db, "entries");
+	for (i = 0; i < ast_json_array_size(json_entries); i++) {
+		struct ast_db_entry *temp;
+		struct ast_json *json_entry;
+		const char *key;
+		const char *data;
+
+		json_entry = ast_json_array_get(json_entries, i);
+		if (!json_entry) {
+			continue;
+		}
+		key = ast_json_string_get(ast_json_object_get(json_entry, "key"));
+		data = ast_json_string_get(ast_json_object_get(json_entry, "data"));
+
+		if (ast_strlen_zero(key) || !data) {
+			continue;
+		}
+
+		temp = ast_db_entry_create(key, data);
+		if (!temp) {
+			ast_db_freetree(entry);
+			return -1;
+		}
+
+		if (cur) {
+			cur->next = temp;
+			cur = temp;
+		} else {
+			entry = cur = temp;
+		}
+	}
+
+	shared_family = ast_db_shared_family_alloc(family, share_type);
+	if (!shared_family) {
+		ast_db_freetree(entry);
+		return -1;
+	}
+	shared_family->entries = entry;
+
+	ast_db_publish_shared_message(type, shared_family, pubsub_eid);
+	ao2_ref(shared_family, -1);
 
 	return 0;
 }
@@ -733,6 +1013,75 @@
 	return res;
 }
 
+static int asterisk_publication_db_refresh(struct ast_sip_publication *pub,
+	struct asterisk_publication_config *config, struct ast_eid *pubsub_eid, struct ast_json *json)
+{
+	if (ast_strlen_zero(config->dbstate_publish)) {
+		return 0;
+	}
+
+	ast_db_refresh_shared();
+
+	return 0;
+}
+
+static int asterisk_publication_db_state_change(struct ast_sip_publication *pub, pjsip_msg_body *body,
+			enum ast_sip_publish_state state)
+{
+	RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
+		ast_sip_publication_get_event_configuration(pub)), ao2_cleanup);
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+	const char *eid, *type;
+	struct ast_eid pubsub_eid;
+	int res = -1;
+
+	/* If no configuration exists for this publication it has most likely been removed, so drop this immediately */
+	if (!config) {
+		return -1;
+	}
+
+	/* If no body exists this is a refresh and can be ignored */
+	if (!body) {
+		return 0;
+	}
+

[... 780 lines stripped ...]



More information about the asterisk-commits mailing list