[asterisk-commits] mjordan: branch mjordan/trunk-astdb-cluster r432635 - in /team/mjordan/trunk-...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Mar 9 09:13:40 CDT 2015
Author: mjordan
Date: Mon Mar 9 09:13:38 2015
New Revision: 432635
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=432635
Log:
Initial commit
It mostly lives, breathes, and works. At least with PJSIP.
Modified:
team/mjordan/trunk-astdb-cluster/funcs/func_db.c
team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
team/mjordan/trunk-astdb-cluster/main/db.c
team/mjordan/trunk-astdb-cluster/main/utils.c
team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
team/mjordan/trunk-astdb-cluster/tests/test_db.c
Modified: team/mjordan/trunk-astdb-cluster/funcs/func_db.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/funcs/func_db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/funcs/func_db.c (original)
+++ team/mjordan/trunk-astdb-cluster/funcs/func_db.c Mon Mar 9 09:13:38 2015
@@ -1,7 +1,7 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 2005-2006, Russell Bryant <russelb at clemson.edu>
+ * Copyright (C) 2005-2015, Russell Bryant <russelb at clemson.edu>
*
* func_db.c adapted from the old app_db.c, copyright by the following people
* Copyright (C) 2005, Mark Spencer <markster at digium.com>
@@ -23,6 +23,7 @@
* \brief Functions for interaction with the Asterisk database
*
* \author Russell Bryant <russelb at clemson.edu>
+ * \author Matt Jordan <mjordan at digium.com>
*
* \ingroup functions
*/
@@ -52,6 +53,30 @@
<syntax argsep="/">
<parameter name="family" required="true" />
<parameter name="key" required="true" />
+ </syntax>
+ <description>
+ <para>This function will read from or write a value to the Asterisk database. On a
+ read, this function returns the corresponding value from the database, or blank
+ if it does not exist. Reading a database value will also set the variable
+ DB_RESULT. If you wish to find out if an entry exists, use the DB_EXISTS
+ function.</para>
+ </description>
+ <see-also>
+ <ref type="application">DBdel</ref>
+ <ref type="function">DB_DELETE</ref>
+ <ref type="application">DBdeltree</ref>
+ <ref type="function">DB_EXISTS</ref>
+ </see-also>
+ </function>
+ <function name="DB_SHARED" language="en_US">
+ <synopsis>
+ Create or delete a shared family in the Asterisk database.
+ </synopsis>
+ <syntax argsep="/">
+ <parameter name="action" required="true">
+ </parameter>
+ <parameter name="type">
+ </parameter>
</syntax>
<description>
<para>This function will read from or write a value to the Asterisk database. On a
@@ -200,14 +225,14 @@
buf[0] = '\0';
if (ast_strlen_zero(parse)) {
- ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n");
+ ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n");
return -1;
}
AST_NONSTANDARD_APP_ARGS(args, parse, '/');
if (args.argc < 2) {
- ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n");
+ ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n");
return -1;
}
@@ -335,6 +360,106 @@
.write = function_db_delete_write,
};
+static int function_db_shared_exists_read(struct ast_channel *chan,
+ const char *cmd, char *parse, char *buf, size_t len)
+{
+ AST_DECLARE_APP_ARGS(args,
+ AST_APP_ARG(family);
+ );
+
+ buf[0] = '\0';
+
+ if (ast_strlen_zero(parse)) {
+ ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n");
+ return -1;
+ }
+
+ AST_STANDARD_APP_ARGS(args, parse);
+
+ if (args.argc != 1) {
+ ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n");
+ return -1;
+ }
+
+ if (ast_db_is_shared(args.family)) {
+ ast_copy_string(buf, "1", len);
+ } else {
+ ast_copy_string(buf, "0", len);
+ }
+ pbx_builtin_setvar_helper(chan, "DB_RESULT", buf);
+
+ return 0;
+}
+
+static struct ast_custom_function db_shared_exists_function = {
+ .name = "DB_SHARED_EXISTS",
+ .read = function_db_shared_exists_read,
+};
+
+static int function_db_shared_write(struct ast_channel *chan, const char *cmd, char *parse,
+ const char *value)
+{
+ enum ast_db_shared_type share_type;
+
+ AST_DECLARE_APP_ARGS(args,
+ AST_APP_ARG(action);
+ AST_APP_ARG(type);
+ );
+
+ if (ast_strlen_zero(parse)) {
+ ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n");
+ return -1;
+ }
+
+ AST_STANDARD_APP_ARGS(args, parse);
+
+ if (args.argc < 1) {
+ ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n");
+ return -1;
+ }
+
+ if (ast_strlen_zero(value)) {
+ ast_log(LOG_WARNING, "DB_SHARED requires a value, DB_SHARED(<action>[,<type>])=<family>\n");
+ return -1;
+ }
+
+ if (!strcasecmp(args.action, "put")) {
+ if (ast_strlen_zero(args.type) || !strcasecmp(args.type, "global")) {
+ share_type = SHARED_DB_TYPE_GLOBAL;
+ } else if (!strcasecmp(args.type, "unique")) {
+ share_type = SHARED_DB_TYPE_UNIQUE;
+ } else {
+ ast_log(LOG_WARNING, "DB_SHARED: Unknown 'type' %s\n", args.type);
+ return -1;
+ }
+
+ if (ast_db_put_shared(value, share_type)) {
+ /* Generally, failure is benign (key exists) */
+ ast_debug(2, "Failed to create shared family '%s'\n", value);
+ } else {
+ ast_verb(4, "Created %s shared family '%s'",
+ share_type == SHARED_DB_TYPE_GLOBAL ? "GLOBAL" : "UNIQUE",
+ value);
+ }
+ } else if (!strcasecmp(args.action, "delete")) {
+ if (ast_db_del_shared(value)) {
+ /* Generally, failure is benign (key doesn't exist) */
+ ast_debug(2, "Failed to delete shared family '%s'\n", value);
+ } else {
+ ast_verb(4, "Deleted shared family '%s'\n", value);
+ }
+ } else {
+ ast_log(LOG_WARNING, "DB_SHARED: Unknown 'action' %s\n", args.action);
+ }
+
+ return 0;
+}
+
+static struct ast_custom_function db_shared_function = {
+ .name = "DB_SHARED",
+ .write = function_db_shared_write,
+};
+
static int unload_module(void)
{
int res = 0;
@@ -343,6 +468,8 @@
res |= ast_custom_function_unregister(&db_exists_function);
res |= ast_custom_function_unregister(&db_delete_function);
res |= ast_custom_function_unregister(&db_keys_function);
+ res |= ast_custom_function_unregister(&db_shared_function);
+ res |= ast_custom_function_unregister(&db_shared_exists_function);
return res;
}
@@ -355,6 +482,8 @@
res |= ast_custom_function_register(&db_exists_function);
res |= ast_custom_function_register_escalating(&db_delete_function, AST_CFE_READ);
res |= ast_custom_function_register(&db_keys_function);
+ res |= ast_custom_function_register_escalating(&db_shared_function, AST_CFE_WRITE);
+ res |= ast_custom_function_register(&db_shared_exists_function);
return res;
}
Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h Mon Mar 9 09:13:38 2015
@@ -28,11 +28,73 @@
extern "C" {
#endif
+#include "asterisk/utils.h"
+
+enum ast_db_shared_type {
+ /* Items in the shared family are common across all Asterisk instances */
+ SHARED_DB_TYPE_GLOBAL = 0,
+ /*! Items in the shared family are made unique across all Asterisk instances */
+ SHARED_DB_TYPE_UNIQUE,
+};
+
struct ast_db_entry {
struct ast_db_entry *next;
char *key;
char data[0];
};
+
+struct stasis_topic;
+struct stasis_message_type;
+
+struct ast_db_shared_family {
+ /*! How the family is shared */
+ enum ast_db_shared_type share_type;
+ /*! Entries in the family, if appropriate */
+ struct ast_db_entry *entries;
+ /*! The name of the shared family */
+ char name[0];
+};
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value);
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type);
+
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid);
+
+void ast_db_refresh_shared(void);
+
+/*! \addtogroup StasisTopicsAndMessages
+ * @{
+ */
+
+struct stasis_topic *ast_db_cluster_topic(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message sent from this Asterisk instance
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_put_shared_type(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message received from some external source
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_del_shared_type(void);
+
+/* }@ */
+
+/*!
+ * \brief @@@@
+ */
+int ast_db_put_shared(const char *family, enum ast_db_shared_type);
+
+int ast_db_del_shared(const char *family);
+
+int ast_db_is_shared(const char *family);
/*! \brief Get key value specified by family/key */
int ast_db_get(const char *family, const char *key, char *value, int valuelen);
Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h Mon Mar 9 09:13:38 2015
@@ -922,7 +922,7 @@
* \brief Convert an EID to a string
* \since 1.6.1
*/
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid);
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid);
/*!
* \brief Convert a string into an EID
Modified: team/mjordan/trunk-astdb-cluster/main/db.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/db.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/db.c Mon Mar 9 09:13:38 2015
@@ -1,7 +1,7 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2015, Digium, Inc.
*
* Mark Spencer <markster at digium.com>
*
@@ -22,9 +22,6 @@
*
* \author Mark Spencer <markster at digium.com>
*
- * \note DB3 is licensed under Sleepycat Public License and is thus incompatible
- * with GPL. To avoid having to make another exception (and complicate
- * licensing even further) we elect to use DB1 which is BSD licensed
*/
/*** MODULEINFO
@@ -45,11 +42,12 @@
#include <dirent.h>
#include <sqlite3.h>
-#include "asterisk/channel.h"
#include "asterisk/file.h"
+#include "asterisk/utils.h"
+#include "asterisk/astdb.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
#include "asterisk/app.h"
-#include "asterisk/dsp.h"
-#include "asterisk/astdb.h"
#include "asterisk/cli.h"
#include "asterisk/utils.h"
#include "asterisk/manager.h"
@@ -114,7 +112,16 @@
static int doexit;
static int dosync;
+/*! \brief A container of families to share across Asterisk instances */
+static struct ao2_container *shared_families;
+
+static struct stasis_topic *db_cluster_topic;
+
+static struct stasis_message_router *message_router;
+
static void db_sync(void);
+
+#define SHARED_FAMILY "__asterisk_shared_family"
#define DEFINE_SQL_STATEMENT(stmt,sql) static sqlite3_stmt *stmt; \
const char stmt##_sql[] = sql;
@@ -199,6 +206,76 @@
return res;
}
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value)
+{
+ struct ast_db_entry *entry;
+
+ entry = ast_malloc(sizeof(*entry) + strlen(key) + strlen(value) + 2);
+ if (!entry) {
+ return NULL;
+ }
+ entry->next = NULL;
+ entry->key = entry->data + strlen(value) + 1;
+ strcpy(entry->data, value); /* safe */
+ strcpy(entry->key, key); /* safe */
+
+ return entry;
+}
+
+static void shared_db_family_dtor(void *obj)
+{
+ struct ast_db_shared_family *family = obj;
+
+ ast_db_freetree(family->entries);
+}
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type)
+{
+ struct ast_db_shared_family *shared_family;
+
+ shared_family = ao2_alloc_options(sizeof(*shared_family) + strlen(family) + 1,
+ shared_db_family_dtor, OBJ_NOLOCK);
+ if (!shared_family) {
+ return NULL;
+ }
+ strcpy(shared_family->name, family); /* safe */
+ shared_family->share_type = share_type;
+
+ return shared_family;
+}
+
+static struct ast_db_shared_family *db_shared_family_clone(const struct ast_db_shared_family *shared_family)
+{
+ struct ast_db_shared_family *clone;
+
+ clone = ast_db_shared_family_alloc(shared_family->name, shared_family->share_type);
+
+ return clone;
+}
+
+static int db_shared_family_sort_fn(const void *obj_left, const void *obj_right, int flags)
+{
+ const struct ast_db_shared_family *left = obj_left;
+ const struct ast_db_shared_family *right = obj_right;
+ const char *right_key = obj_right;
+ int cmp;
+
+ switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+ default:
+ case OBJ_POINTER:
+ right_key = right->name;
+ /* Fall through */
+ case OBJ_KEY:
+ cmp = strcmp(left->name, right_key);
+ break;
+ case OBJ_PARTIAL_KEY:
+ cmp = strncmp(left->name, right_key, strlen(right_key));
+ break;
+ }
+ return cmp;
+}
+
static int db_create_astdb(void)
{
@@ -308,7 +385,152 @@
return db_execute_sql("ROLLBACK", NULL, NULL);
}
-int ast_db_put(const char *family, const char *key, const char *value)
+static int db_put_common(const char *family, const char *key, const char *value, int share);
+
+int ast_db_put_shared(const char *family, enum ast_db_shared_type share_type)
+{
+ struct ast_db_shared_family *shared_family;
+
+ if (ast_strlen_zero(family)) {
+ return -1;
+ }
+
+ ao2_lock(shared_families);
+
+ shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (shared_family) {
+ ao2_ref(shared_family, -1);
+ ao2_unlock(shared_families);
+ return -1;
+ }
+
+ shared_family = ast_db_shared_family_alloc(family, share_type);
+ if (!shared_family) {
+ ao2_unlock(shared_families);
+ return -1;
+ }
+
+ ao2_link_flags(shared_families, shared_family, OBJ_NOLOCK);
+
+ db_put_common(SHARED_FAMILY, shared_family->name,
+ share_type == SHARED_DB_TYPE_UNIQUE ? "UNIQUE" : "GLOBAL", 0);
+
+ ao2_ref(shared_family, -1);
+
+ ao2_unlock(shared_families);
+
+ return 0;
+}
+
+int ast_db_is_shared(const char *family)
+{
+ struct ast_db_shared_family *shared_family;
+ int res = 0;
+
+ shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY);
+ if (shared_family) {
+ res = 1;
+ ao2_ref(shared_family, -1);
+ }
+
+ return res;
+}
+
+static int db_put_shared(const char *family, const char *key, const char *value)
+{
+ struct ast_db_shared_family *shared_family;
+ struct ast_db_shared_family *clone;
+
+ /* See if we are shared */
+ shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY);
+ if (!shared_family) {
+ return 0;
+ }
+
+ /* Create a Stasis message for the new item */
+ clone = db_shared_family_clone(shared_family);
+ if (!clone) {
+ ao2_ref(shared_family, -1);
+ return -1;
+ }
+ clone->entries = ast_db_entry_create(key, value);
+ if (!clone->entries) {
+ ao2_ref(shared_family, -1);
+ ao2_ref(clone, -1);
+ return -1;
+ }
+
+ /* Publish */
+ ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL);
+
+ ao2_ref(shared_family, -1);
+
+ return 0;
+}
+
+static int db_del_shared(const char *family, const char *key)
+{
+ struct ast_db_shared_family *shared_family;
+ struct ast_db_shared_family *clone;
+
+ /* See if we are shared */
+ shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY);
+ if (!shared_family) {
+ return 0;
+ }
+
+ if (ast_strlen_zero(key)) {
+ clone = ao2_bump(shared_family);
+ } else {
+ clone = db_shared_family_clone(shared_family);
+ if (!clone) {
+ ao2_ref(shared_family, -1);
+ return -1;
+ }
+ clone->entries = ast_db_entry_create(key, "");
+ if (!clone->entries) {
+ ao2_ref(shared_family, -1);
+ ao2_ref(clone, -1);
+ return -1;
+ }
+ }
+
+ /* Publish */
+ ast_db_publish_shared_message(ast_db_del_shared_type(), clone, NULL);
+
+ ao2_ref(shared_family, -1);
+
+ return 0;
+}
+
+static int db_del_common(const char *family, const char *key, int share);
+
+int ast_db_del_shared(const char *family)
+{
+ struct ast_db_shared_family *shared_family;
+ int res = 0;
+
+ if (ast_strlen_zero(family)) {
+ return -1;
+ }
+
+ ao2_lock(shared_families);
+
+ shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (shared_family) {
+ ao2_unlink_flags(shared_families, shared_family, OBJ_NOLOCK);
+ db_del_common(SHARED_FAMILY, shared_family->name, 0);
+ ao2_ref(shared_family, -1);
+ } else {
+ res = -1;
+ }
+
+ ao2_unlock(shared_families);
+
+ return res;
+}
+
+static int db_put_common(const char *family, const char *key, const char *value, int share)
{
char fullkey[MAX_DB_FIELD];
size_t fullkey_len;
@@ -335,9 +557,17 @@
sqlite3_reset(put_stmt);
db_sync();
+ if (share) {
+ db_put_shared(family, key, value);
+ }
ast_mutex_unlock(&dblock);
return res;
+}
+
+int ast_db_put(const char *family, const char *key, const char *value)
+{
+ return db_put_common(family, key, value, 1);
}
/*!
@@ -410,7 +640,7 @@
return db_get_common(family, key, out, -1);
}
-int ast_db_del(const char *family, const char *key)
+static int db_del_common(const char *family, const char *key, int share)
{
char fullkey[MAX_DB_FIELD];
size_t fullkey_len;
@@ -433,12 +663,20 @@
}
sqlite3_reset(del_stmt);
db_sync();
+ if (share) {
+ db_del_shared(family, key);
+ }
ast_mutex_unlock(&dblock);
- return res;
-}
-
-int ast_db_deltree(const char *family, const char *keytree)
+ return res;
+}
+
+int ast_db_del(const char *family, const char *key)
+{
+ return db_del_common(family, key, 1);
+}
+
+static int db_deltree_common(const char *family, const char *keytree, int share)
{
sqlite3_stmt *stmt = deltree_stmt;
char prefix[MAX_DB_FIELD];
@@ -468,9 +706,17 @@
res = sqlite3_changes(astdb);
sqlite3_reset(stmt);
db_sync();
+ if (share) {
+ db_del_shared(prefix, NULL);
+ }
ast_mutex_unlock(&dblock);
- return res;
+ return res;
+}
+
+int ast_db_deltree(const char *family, const char *keytree)
+{
+ return db_deltree_common(family, keytree, 1);
}
struct ast_db_entry *ast_db_gettree(const char *family, const char *keytree)
@@ -508,13 +754,10 @@
if (!(value_s = (const char *) sqlite3_column_text(stmt, 1))) {
break;
}
- if (!(cur = ast_malloc(sizeof(*cur) + strlen(key_s) + strlen(value_s) + 2))) {
+ cur = ast_db_entry_create(key_s, value_s);
+ if (!cur) {
break;
}
- cur->next = NULL;
- cur->key = cur->data + strlen(value_s) + 1;
- strcpy(cur->data, value_s);
- strcpy(cur->key, key_s);
if (last) {
last->next = cur;
} else {
@@ -748,14 +991,26 @@
while (sqlite3_step(showkey_stmt) == SQLITE_ROW) {
const char *key_s, *value_s;
+ char *family_s;
+ char *delim;
+
if (!(key_s = (const char *) sqlite3_column_text(showkey_stmt, 0))) {
break;
}
if (!(value_s = (const char *) sqlite3_column_text(showkey_stmt, 1))) {
break;
}
+ family_s = ast_strdup(key_s);
+ if (!family_s) {
+ break;
+ }
+ delim = strchr(family_s + 1, '/');
+ *delim = '\0';
+
++counter;
- ast_cli(a->fd, "%-50s: %-25s\n", key_s, value_s);
+ ast_cli(a->fd, "%-50s: %-25s %s\n", key_s, value_s,
+ ast_db_is_shared(family_s + 1) ? "(S)" : "");
+ ast_free(family_s);
}
sqlite3_reset(showkey_stmt);
ast_mutex_unlock(&dblock);
@@ -984,6 +1239,197 @@
return NULL;
}
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid)
+{
+ struct stasis_message *message;
+
+ /* Aggregate doesn't really apply to the AstDB; as such, if we aren't
+ * provided an EID use our own.
+ */
+ if (!eid) {
+ eid = &ast_eid_default;
+ }
+
+ message = stasis_message_create_full(type, shared_family, eid);
+ if (!message) {
+ return -1;
+ }
+
+ stasis_publish(ast_db_cluster_topic(), message);
+
+ return 0;
+}
+
+void ast_db_refresh_shared(void)
+{
+ struct ao2_iterator it_shared_families;
+ struct ast_db_shared_family *shared_family;
+
+ it_shared_families = ao2_iterator_init(shared_families, 0);
+ while ((shared_family = ao2_iterator_next(&it_shared_families))) {
+ struct ast_db_shared_family *clone;
+
+ clone = db_shared_family_clone(shared_family);
+ if (!clone) {
+ ao2_ref(shared_family, -1);
+ continue;
+ }
+
+ clone->entries = ast_db_gettree(shared_family->name, "");
+ if (!clone->entries) {
+ ao2_ref(clone, -1);
+ ao2_ref(shared_family, -1);
+ continue;
+ }
+
+ ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL);
+
+ ao2_ref(clone, -1);
+ ao2_ref(shared_family, -1);
+ }
+ ao2_iterator_destroy(&it_shared_families);
+}
+
+static struct ast_event *db_del_shared_type_to_event(struct stasis_message *message)
+{
+ return NULL;
+}
+
+static struct ast_json *db_entries_to_json(struct ast_db_entry *entry)
+{
+ struct ast_json *json;
+ struct ast_db_entry *cur;
+
+ json = ast_json_array_create();
+ if (!json) {
+ return NULL;
+ }
+
+ for (cur = entry; cur; cur = cur->next) {
+ struct ast_json *json_entry;
+
+ json_entry = ast_json_pack("{s: s, s: s}",
+ "key", cur->key,
+ "data", cur->data);
+ if (!json_entry) {
+ ast_json_unref(json);
+ return NULL;
+ }
+
+ if (ast_json_array_append(json, json_entry)) {
+ ast_json_unref(json);
+ return NULL;
+ }
+ }
+
+ return json;
+}
+
+static struct ast_json *db_shared_family_to_json(struct stasis_message *message,
+ const struct stasis_message_sanitizer *sanitize)
+{
+ struct stasis_message_type *type = stasis_message_type(message);
+ struct ast_db_shared_family *shared_family;
+
+ shared_family = stasis_message_data(message);
+ if (!shared_family) {
+ return NULL;
+ }
+
+ return ast_json_pack("{s: s, s: s, s: s, s: o}",
+ "verb", type == ast_db_put_shared_type() ? "put" : "delete",
+ "family", shared_family->name,
+ "share_type", shared_family->share_type == SHARED_DB_TYPE_UNIQUE ? "unique" : "global",
+ "entries", shared_family->entries ? db_entries_to_json(shared_family->entries) : ast_json_null());
+}
+
+static struct ast_event *db_put_shared_type_to_event(struct stasis_message *message)
+{
+ return NULL;
+}
+
+struct stasis_topic *ast_db_cluster_topic(void)
+{
+ return db_cluster_topic;
+}
+
+STASIS_MESSAGE_TYPE_DEFN(ast_db_put_shared_type,
+ .to_event = db_put_shared_type_to_event,
+ .to_json = db_shared_family_to_json,
+ );
+STASIS_MESSAGE_TYPE_DEFN(ast_db_del_shared_type,
+ .to_event = db_del_shared_type_to_event,
+ .to_json = db_shared_family_to_json,
+ );
+
+static void db_put_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct ast_db_shared_family *shared_family;
+ struct ast_db_shared_family *shared_check;
+ struct ast_db_entry *cur;
+ const struct ast_eid *eid;
+ char *family_id;
+
+ shared_family = stasis_message_data(message);
+ if (!shared_family) {
+ return;
+ }
+
+ eid = stasis_message_eid(message);
+ if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+ return;
+ }
+
+ /* Don't update if we don't have this area shared on this server */
+ shared_check = ao2_find(shared_families, shared_family->name, OBJ_KEY);
+ if (!shared_check) {
+ return;
+ }
+ ao2_ref(shared_check, -1);
+
+ if (shared_family->share_type == SHARED_DB_TYPE_UNIQUE) {
+ char eid_workspace[20];
+
+ /* Length is family + '/' + EID length (20) + 1 */
+ family_id = ast_alloca(strlen(shared_family->name) + 22);
+ ast_eid_to_str(eid_workspace, sizeof(eid_workspace), eid);
+ sprintf(family_id, "%s/%s", eid_workspace, shared_family->name); /* safe */
+ } else {
+ family_id = shared_family->name;
+ }
+
+ for (cur = shared_family->entries; cur; cur = cur->next) {
+ db_put_common(family_id, cur->key, cur->data, 0);
+ }
+}
+
+static void db_del_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct ast_db_shared_family *shared_family;
+ struct ast_db_entry *cur;
+ const struct ast_eid *eid;
+
+ shared_family = stasis_message_data(message);
+ if (!shared_family) {
+ return;
+ }
+
+ eid = stasis_message_eid(message);
+ if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+ return;
+ }
+
+ cur = shared_family->entries;
+ if (!cur) {
+ db_deltree_common(shared_family->name, NULL, 0);
+ return;
+ }
+
+ for (; cur; cur = cur->next) {
+ db_del_common(shared_family->name, cur->key, 0);
+ }
+}
+
/*!
* \internal
* \brief Clean up resources on Asterisk shutdown
@@ -995,6 +1441,11 @@
ast_manager_unregister("DBPut");
ast_manager_unregister("DBDel");
ast_manager_unregister("DBDelTree");
+
+ ao2_cleanup(db_cluster_topic);
+ db_cluster_topic = NULL;
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_db_put_shared_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_db_del_shared_type);
/* Set doexit to 1 to kill thread. db_sync must be called with
* mutex held. */
@@ -1005,6 +1456,10 @@
pthread_join(syncthread, NULL);
ast_mutex_lock(&dblock);
+
+ ao2_ref(shared_families, -1);
+ shared_families = NULL;
+
clean_statements();
if (sqlite3_close(astdb) == SQLITE_OK) {
astdb = NULL;
@@ -1014,12 +1469,43 @@
int astdb_init(void)
{
+ shared_families = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT | AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+ db_shared_family_sort_fn, NULL);
+ if (!shared_families) {
+ return -1;
+ }
+
+ db_cluster_topic = stasis_topic_create("ast_db_cluster_topic");
+ if (!db_cluster_topic) {
+ ao2_ref(shared_families, -1);
+ return -1;
+ }
+
+ STASIS_MESSAGE_TYPE_INIT(ast_db_put_shared_type);
+ STASIS_MESSAGE_TYPE_INIT(ast_db_del_shared_type);
+
+ message_router = stasis_message_router_create_pool(ast_db_cluster_topic());
+ if (!message_router) {
+ ao2_ref(db_cluster_topic, -1);
+ ao2_ref(shared_families, -1);
+ return -1;
+ }
+ stasis_message_router_add(message_router, ast_db_put_shared_type(),
+ db_put_shared_msg_cb, NULL);
+ stasis_message_router_add(message_router, ast_db_del_shared_type(),
+ db_del_shared_msg_cb, NULL);
+
if (db_init()) {
+ ao2_ref(db_cluster_topic, -1);
+ ao2_ref(shared_families, -1);
return -1;
}
ast_cond_init(&dbcond, NULL);
if (ast_pthread_create_background(&syncthread, NULL, db_sync_thread, NULL)) {
+ ao2_ref(db_cluster_topic, -1);
+ ao2_ref(shared_families, -1);
return -1;
}
Modified: team/mjordan/trunk-astdb-cluster/main/utils.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/utils.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/utils.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/utils.c Mon Mar 9 09:13:38 2015
@@ -2691,7 +2691,7 @@
}
#endif /* defined(AST_DEVMODE) */
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid)
{
int x;
char *os = s;
Modified: team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c (original)
+++ team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c Mon Mar 9 09:13:38 2015
@@ -36,6 +36,7 @@
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/app.h"
+#include "asterisk/astdb.h"
/*** DOCUMENTATION
<configInfo name="res_pjsip_publish_asterisk" language="en_US">
@@ -58,6 +59,9 @@
<configOption name="mailboxstate_publish">
<synopsis>Optional name of a publish item that can be used to publish a request for full mailbox state information.</synopsis>
</configOption>
+ <configOption name="dbstate_publish">
+ <synopsis>Optional name of a publish item that can be used to publish a request for full AstDB state information.</synopsis>
+ </configOption>
<configOption name="device_state" default="no">
<synopsis>Whether we should permit incoming device state events.</synopsis>
</configOption>
@@ -70,6 +74,12 @@
<configOption name="mailbox_state_filter">
<synopsis>Optional regular expression used to filter what mailboxes we accept events for.</synopsis>
</configOption>
+ <configOption name="db_state" default="no">
+ <synopsis>Whether we should permit incoming AstDB state events.</synopsis>
+ </configOption>
+ <configOption name="db_state_filter">
+ <synopsis>Optional regular expression used to filter what AstDB families we accept events for.</synopsis>
+ </configOption>
<configOption name="type">
<synopsis>Must be of type 'asterisk-publication'.</synopsis>
</configOption>
@@ -102,6 +112,18 @@
unsigned int mailbox_state_filter;
};
+/*! \brief Structure which contains Asterisk AstDB publisher state information */
+struct asterisk_db_publisher_state {
+ /*! \brief The publish client to send PUBLISH messages on */
+ struct ast_sip_outbound_publish_client *client;
+ /*! \brief AstDB subscription */
+ struct stasis_subscription *db_state_subscription;
+ /*! \brief Regex used for filtering outbound db families */
+ regex_t db_state_regex;
+ /*! \brief AstDB families should be filtered */
+ unsigned int db_state_filter;
+};
+
/*! \brief Structure which contains Asterisk publication information */
struct asterisk_publication_config {
/*! \brief Sorcery object details */
@@ -112,6 +134,8 @@
AST_STRING_FIELD(devicestate_publish);
/*! \brief Optional name of a mailbox state publish item, used to request the remote side update us */
AST_STRING_FIELD(mailboxstate_publish);
+ /*! \brief Optional name of an AstDB publish item, used to request the remote side update us */
+ AST_STRING_FIELD(dbstate_publish);
);
/*! \brief Accept inbound device state events */
unsigned int device_state;
@@ -125,6 +149,12 @@
regex_t mailbox_state_regex;
/*! \brief Mailbox state should be filtered */
unsigned int mailbox_state_filter;
+ /*! \brief Accept inbound AstDB state events */
+ unsigned int db_state;
+ /*! \brief Regex used for filtering inbound AstDB state */
+ regex_t db_state_regex;
+ /*! \brief AstDB state should be filtered */
+ unsigned int db_state_filter;
};
/*! \brief Destroy callback for Asterisk devicestate publisher state information from datastore */
@@ -281,6 +311,78 @@
ast_json_unref(json);
}
+/*!
+ * \brief Callback function for db state events
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void asterisk_publisher_dbstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+{
+ struct ast_datastore *datastore = data;
+ struct asterisk_db_publisher_state *publisher_state = datastore->data;
+ struct ast_json *json_db;
+ struct ast_json *json;
+ const struct ast_eid *eid;
+ char eid_str[20];
+ struct ast_db_shared_family *shared_family;
+ char *text;
+ struct ast_sip_body body = {
+ .type = "application",
+ .subtype = "json",
+ };
+
+ if (!stasis_subscription_is_subscribed(sub)) {
+ return;
+ }
+
+ eid = stasis_message_eid(msg);
+ if (!eid || ast_eid_cmp(&ast_eid_default, eid)) {
+ /* If the event is aggregate, unknown, or didn't originate from this
+ * server, don't send it out. */
+ return;
+ }
+
+ shared_family = stasis_message_data(msg);
+ if (!shared_family) {
+ return;
+ }
+
+ if (publisher_state->db_state_filter && regexec(&publisher_state->db_state_regex, shared_family->name, 0, NULL, 0)) {
+ /* Outgoing AstDB state is filtered and the family wasn't allowed */
+ return;
+ }
+
+ json_db = stasis_message_to_json(msg, NULL);
+ if (!json_db) {
+ return;
+ }
+
+
+ ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+ json = ast_json_pack(
+ "{ s: s, s: s, s: o }",
+ "type", "dbstate",
+ "eid", eid_str,
+ "dbstate", json_db);
+ if (!json) {
+ ast_json_unref(json_db);
+ return;
+ }
+
+ text = ast_json_dump_string(json);
+ if (!text) {
+ ast_json_unref(json);
+ return;
+ }
+ body.body_text = text;
+
+ ast_sip_publish_client_send(publisher_state->client, &body);
+
+ ast_json_free(text);
+ ast_json_unref(json);
+}
+
static int cached_devstate_cb(void *obj, void *arg, int flags)
{
struct stasis_message *msg = obj;
@@ -469,6 +571,76 @@
.stop_publishing = asterisk_stop_mwi_publishing,
};
+static int asterisk_start_db_publishing(struct ast_sip_outbound_publish *configuration,
+ struct ast_sip_outbound_publish_client *client)
+{
+ RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
+ struct asterisk_db_publisher_state *publisher_state;
+ const char *value;
+
+ datastore = ast_sip_publish_client_alloc_datastore(&asterisk_mwi_publisher_state_datastore, "asterisk-db-publisher");
+ if (!datastore) {
+ return -1;
+ }
+
+ publisher_state = ast_calloc(1, sizeof(*publisher_state));
+ if (!publisher_state) {
+ return -1;
+ }
+ datastore->data = publisher_state;
+
+ value = ast_sorcery_object_get_extended(configuration, "db_state_filter");
+ if (!ast_strlen_zero(value)) {
+ if (build_regex(&publisher_state->db_state_regex, value)) {
+ return -1;
+ }
+ publisher_state->db_state_filter = 1;
+ }
+
+ publisher_state->client = ao2_bump(client);
+
+ if (ast_sip_publish_client_add_datastore(client, datastore)) {
+ return -1;
+ }
+
+ publisher_state->db_state_subscription = stasis_subscribe(ast_db_cluster_topic(),
+ asterisk_publisher_dbstate_cb, ao2_bump(datastore));
+ if (!publisher_state->db_state_subscription) {
+ ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher");
+ ao2_ref(datastore, -1);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int asterisk_stop_db_publishing(struct ast_sip_outbound_publish_client *client)
+{
+ RAII_VAR(struct ast_datastore *, datastore, ast_sip_publish_client_get_datastore(client, "asterisk-db-publisher"),
+ ao2_cleanup);
+ struct asterisk_db_publisher_state *publisher_state;
+
+ if (!datastore) {
+ return 0;
+ }
+
+ publisher_state = datastore->data;
+ if (publisher_state->db_state_subscription) {
+ stasis_unsubscribe_and_join(publisher_state->db_state_subscription);
+ ao2_ref(datastore, -1);
+ }
+
+ ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher");
+
+ return 0;
+}
+
+struct ast_sip_event_publisher_handler asterisk_db_publisher_handler = {
+ .event_name = "asterisk-db",
+ .start_publishing = asterisk_start_db_publishing,
+ .stop_publishing = asterisk_stop_db_publishing,
+};
+
static int asterisk_publication_new(struct ast_sip_endpoint *endpoint, const char *resource, const char *event_configuration)
{
RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
@@ -545,6 +717,114 @@
mailbox = strsep(&item_id, "@");
ast_publish_mwi_state_full(mailbox, item_id, new_msgs, old_msgs, NULL, pubsub_eid);
+
+ return 0;
+}
+
+static int asterisk_publication_dbstate(struct ast_sip_publication *pub, struct asterisk_publication_config *config,
+ struct ast_eid *pubsub_eid, struct ast_json *json)
+{
+ struct ast_json *json_db = ast_json_object_get(json, "dbstate");
+ struct ast_json *json_entries;
+ struct stasis_message_type *type;
+ struct ast_db_shared_family *shared_family;
+ struct ast_db_entry *entry = NULL;
+ struct ast_db_entry *cur = NULL;
+ enum ast_db_shared_type share_type;
+ const char *family;
+ const char *verb;
+ const char *str_share_type;
+ int i;
+
+ if (!json_db) {
+ ast_debug(2, "Received AstDB state event with no 'dbstate' body\n");
+ return 0;
+ }
+
+ if (!config->db_state) {
+ ast_debug(2, "Received AstDB state event for resource '%s' but it is not configured to accept them\n",
+ ast_sorcery_object_get_id(config));
+ return 0;
+ }
+
+ family = ast_json_string_get(ast_json_object_get(json_db, "family"));
+ if (ast_strlen_zero(family)) {
+ ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'family'\n",
+ ast_sorcery_object_get_id(config));
+ return -1;
+ }
+
+ verb = ast_json_string_get(ast_json_object_get(json_db, "verb"));
+ if (ast_strlen_zero(verb)) {
+ ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'verb'\n",
+ ast_sorcery_object_get_id(config));
+ return -1;
+ } else if (!strcasecmp(verb, "put")) {
+ type = ast_db_put_shared_type();
+ } else if (!strcasecmp(verb, "delete")) {
+ type = ast_db_del_shared_type();
+ } else {
+ ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n",
+ ast_sorcery_object_get_id(config), verb);
+ return -1;
+ }
+
+ str_share_type = ast_json_string_get(ast_json_object_get(json_db, "share_type"));
+ if (ast_strlen_zero(str_share_type)) {
+ ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'share_type'\n",
+ ast_sorcery_object_get_id(config));
+ return -1;
+ } else if (!strcasecmp(str_share_type, "global")) {
+ share_type = SHARED_DB_TYPE_GLOBAL;
+ } else if (!strcasecmp(str_share_type, "unique")) {
+ share_type = SHARED_DB_TYPE_UNIQUE;
+ } else {
+ ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n",
+ ast_sorcery_object_get_id(config), str_share_type);
+ return -1;
+ }
+
+ json_entries = ast_json_object_get(json_db, "entries");
+ for (i = 0; i < ast_json_array_size(json_entries); i++) {
+ struct ast_db_entry *temp;
+ struct ast_json *json_entry;
+ const char *key;
+ const char *data;
+
+ json_entry = ast_json_array_get(json_entries, i);
+ if (!json_entry) {
+ continue;
+ }
+ key = ast_json_string_get(ast_json_object_get(json_entry, "key"));
+ data = ast_json_string_get(ast_json_object_get(json_entry, "data"));
+
+ if (ast_strlen_zero(key) || !data) {
+ continue;
+ }
+
+ temp = ast_db_entry_create(key, data);
+ if (!temp) {
+ ast_db_freetree(entry);
+ return -1;
+ }
+
+ if (cur) {
+ cur->next = temp;
+ cur = temp;
+ } else {
+ entry = cur = temp;
+ }
+ }
+
+ shared_family = ast_db_shared_family_alloc(family, share_type);
+ if (!shared_family) {
+ ast_db_freetree(entry);
+ return -1;
+ }
+ shared_family->entries = entry;
+
+ ast_db_publish_shared_message(type, shared_family, pubsub_eid);
+ ao2_ref(shared_family, -1);
return 0;
}
@@ -733,6 +1013,75 @@
return res;
}
+static int asterisk_publication_db_refresh(struct ast_sip_publication *pub,
+ struct asterisk_publication_config *config, struct ast_eid *pubsub_eid, struct ast_json *json)
+{
+ if (ast_strlen_zero(config->dbstate_publish)) {
+ return 0;
+ }
+
+ ast_db_refresh_shared();
+
+ return 0;
+}
+
+static int asterisk_publication_db_state_change(struct ast_sip_publication *pub, pjsip_msg_body *body,
+ enum ast_sip_publish_state state)
+{
+ RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
+ ast_sip_publication_get_event_configuration(pub)), ao2_cleanup);
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ const char *eid, *type;
+ struct ast_eid pubsub_eid;
+ int res = -1;
+
+ /* If no configuration exists for this publication it has most likely been removed, so drop this immediately */
+ if (!config) {
+ return -1;
+ }
+
+ /* If no body exists this is a refresh and can be ignored */
+ if (!body) {
+ return 0;
+ }
+
[... 780 lines stripped ...]
More information about the asterisk-commits
mailing list