Browse code

cnxcc: added Redis support for distributed cnxcc nodes

- works for all monitoring functions

Carlos Ruiz Diaz authored on 31/10/2014 23:20:09
Showing 16 changed files
... ...
@@ -1,6 +1,6 @@
1 1
 # $Id$
2 2
 #
3
-# example module makefile
3
+# cnxcc module makefile
4 4
 #
5 5
 # 
6 6
 # WARNING: do not run this directly, it should be run by the master Makefile
... ...
@@ -9,6 +9,7 @@ include ../../Makefile.defs
9 9
 auto_gen=
10 10
 NAME=cnxcc.so
11 11
 
12
+LIBS=-lhiredis -levent
12 13
 DEFS+=-DOPENSER_MOD_INTERFACE
13 14
 SERLIBPATH=../../lib
14 15
 SER_LIBS+=$(SERLIBPATH)/kmi/kmi
... ...
@@ -1,10 +1,12 @@
1 1
 cnxcc Module
2 2
 
3
-Carlos Ruiz Diaz
3
+Carlos Ruiz Díaz
4 4
 
5 5
    ConexionGroup S.A.
6 6
 
7
-   Copyright © 2013 Carlos Ruiz Diaz, carlos.ruizdiaz@gmail.com
7
+   Copyright © 2013 Carlos Ruiz Díaz, carlos.ruizdiaz@gmail.com
8
+
9
+   Copyright © 2014 Carlos Ruiz Díaz, carlos@latamvoices.com
8 10
      __________________________________________________________________
9 11
 
10 12
    Table of Contents
... ...
@@ -15,10 +17,11 @@ Carlos Ruiz Diaz
15 15
         2. Dependencies
16 16
 
17 17
               2.1. Modules
18
+              2.2. Libraries
18 19
 
19 20
         3. Parameters
20 21
 
21
-              3.1. dlg_flag (integer)
22
+              3.1. redis (integer)
22 23
               3.2. credit_check_period (integer)
23 24
 
24 25
         4. Functions
... ...
@@ -58,10 +61,11 @@ Chapter 1. Admin Guide
58 58
    2. Dependencies
59 59
 
60 60
         2.1. Modules
61
+        2.2. Libraries
61 62
 
62 63
    3. Parameters
63 64
 
64
-        3.1. dlg_flag (integer)
65
+        3.1. redis (integer)
65 66
         3.2. credit_check_period (integer)
66 67
 
67 68
    4. Functions
... ...
@@ -106,7 +110,7 @@ Chapter 1. Admin Guide
106 106
    that are equal to the cost per second of both calls.
107 107
 
108 108
    If your accounting program does not maintain the state of the call in
109
-   real time, this module can provide you that ability.
109
+   real time, this module can provide you with that ability.
110 110
 
111 111
    Cnxcc can also provide more common means of monitoring, i.e., by time
112 112
    limit or by maximum simultaneous calls.
... ...
@@ -114,25 +118,31 @@ Chapter 1. Admin Guide
114 114
 2. Dependencies
115 115
 
116 116
    2.1. Modules
117
+   2.2. Libraries
117 118
 
118 119
 2.1. Modules
119 120
 
120 121
    The following module must be loaded before this module:
121 122
      * dialog
122 123
 
124
+2.2. Libraries
125
+
126
+   The following module must be loaded before this module:
127
+     * hiredis-devel >= 0.11.0
128
+     * libevent-devel >= 2.0.18-2
129
+
123 130
 3. Parameters
124 131
 
125
-   3.1. dlg_flag (integer)
132
+   3.1. redis (integer)
126 133
    3.2. credit_check_period (integer)
127 134
 
128
-3.1.  dlg_flag (integer)
135
+3.1.  redis (integer)
129 136
 
130
-   Flag to indicate if the dialog must be monitored or not. Messages are
131
-   flagged with this value if we call one of the monitoring functions.
137
+   Redis datasource connection information
132 138
 
133 139
    Example 1.1. dlg_flag
134 140
 ...
135
-modparam("cnxcc", "dlg_flag", 29)
141
+modparam("cnxcc", "redis", "addr=127.0.0.1;port=6379;db=1")
136 142
 ...
137 143
 
138 144
 3.2. credit_check_period (integer)
... ...
@@ -18,7 +18,7 @@
18 18
  *
19 19
  * You should have received a copy of the GNU General Public License
20 20
  * along with this program; if not, write to the Free Software
21
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 22
  *
23 23
  */
24 24
 #include <time.h>
... ...
@@ -28,17 +28,17 @@
28 28
 
29 29
 #include "cnxcc.h"
30 30
 
31
-void get_datetime(str *dest)
31
+inline void get_datetime(str *dest)
32 32
 {
33 33
 	timestamp2isodt(dest, get_current_timestamp());
34 34
 }
35 35
 
36
-unsigned int get_current_timestamp()
36
+inline unsigned int get_current_timestamp()
37 37
 {
38 38
 	return time(NULL);
39 39
 }
40 40
 
41
-int timestamp2isodt(str *dest, unsigned int timestamp)
41
+inline int timestamp2isodt(str *dest, unsigned int timestamp)
42 42
 {
43 43
 	time_t  		tim;
44 44
 	struct tm 		*tmPtr;
... ...
@@ -18,7 +18,7 @@
18 18
  *
19 19
  * You should have received a copy of the GNU General Public License
20 20
  * along with this program; if not, write to the Free Software
21
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 22
  *
23 23
  */
24 24
 
... ...
@@ -31,9 +31,9 @@
31 31
 #define DATETIME_LENGTH		DATETIME_SIZE - 1
32 32
 
33 33
 
34
-void get_datetime(str *dest);
35
-unsigned int get_current_timestamp();
36
-int timestamp2isodt(str *dest, unsigned int timestamp);
34
+inline void get_datetime(str *dest);
35
+inline unsigned int get_current_timestamp();
36
+inline int timestamp2isodt(str *dest, unsigned int timestamp);
37 37
 double str2double(str *string);
38 38
 
39 39
 #endif /* _CNXCC_H */
... ...
@@ -18,7 +18,7 @@
18 18
  *
19 19
  * You should have received a copy of the GNU General Public License
20 20
  * along with this program; if not, write to the Free Software
21
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 22
  *
23 23
  */
24 24
 
... ...
@@ -30,43 +30,41 @@
30 30
 #include "cnxcc_mod.h"
31 31
 #include "cnxcc.h"
32 32
 #include "cnxcc_check.h"
33
+#include "cnxcc_redis.h"
33 34
 
34 35
 extern data_t _data;
35 36
 
36
-void check_calls_by_money(unsigned int ticks, void *param)
37
-{
38
-	struct str_hash_entry *h_entry 	= NULL,
39
-			  *tmp		= NULL;
40
-	call_t *tmp_call		= NULL;
37
+void check_calls_by_money(unsigned int ticks, void *param) {
38
+	struct str_hash_entry *h_entry = NULL,
39
+	                      *tmp = NULL;
40
+	call_t *tmp_call = NULL;
41 41
 	int i;
42 42
 
43 43
 	lock_get(&_data.money.lock);
44
+
44 45
 	if (_data.money.credit_data_by_client->table)
45 46
 		for(i = 0; i < _data.money.credit_data_by_client->size; i++)
46
-			clist_foreach_safe(&_data.money.credit_data_by_client->table[i], h_entry, tmp, next)
47
-			{
48
-				credit_data_t *credit_data	= (credit_data_t *) h_entry->u.p;
49
-				call_t *call			= NULL;
50
-				double total_consumed_money	= 0;
47
+			clist_foreach_safe(&_data.money.credit_data_by_client->table[i], h_entry, tmp, next) {
48
+				credit_data_t *credit_data = (credit_data_t *) h_entry->u.p;
49
+				call_t *call = NULL;
50
+				double total_consumed_money = 0, consumption_diff = 0/*, distributed_consumption = 0*/;
51 51
 
52
-				if (i > SAFE_ITERATION_THRESHOLD)
53
-				{
52
+/*				if (i > SAFE_ITERATION_THRESHOLD) {
54 53
 					LM_ERR("Too many iterations for this loop: %d\n", i);
55 54
 					break;
56
-				}
55
+				}*/
56
+
57 57
 				lock_get(&credit_data->lock);
58 58
 
59
-				clist_foreach_safe(credit_data->call_list, call, tmp_call, next)
60
-				{
59
+				clist_foreach_safe(credit_data->call_list, call, tmp_call, next) {
61 60
 					int consumed_time = 0;
62 61
 
63 62
 					if (!call->confirmed)
64 63
 						continue;
65 64
 
66
-					consumed_time 		= get_current_timestamp() - call->start_timestamp;
65
+					consumed_time = get_current_timestamp() - call->start_timestamp;
67 66
 
68
-					if (consumed_time > call->money_based.initial_pulse)
69
-					{
67
+					if (consumed_time > call->money_based.initial_pulse) {
70 68
 						call->consumed_amount = (call->money_based.cost_per_second * call->money_based.initial_pulse)
71 69
 												+
72 70
 												call->money_based.cost_per_second *
... ...
@@ -74,10 +72,9 @@ void check_calls_by_money(unsigned int ticks, void *param)
74 74
 												call->money_based.final_pulse;
75 75
 					}
76 76
 
77
-					total_consumed_money	+= call->consumed_amount;
77
+					total_consumed_money += call->consumed_amount;
78 78
 
79
-					if (call->consumed_amount > call->max_amount)
80
-					{
79
+					if (call->consumed_amount > call->max_amount) {
81 80
 						LM_ALERT("[%.*s] call has exhausted its credit. Breaking the loop\n", call->sip_data.callid.len, call->sip_data.callid.s);
82 81
 						break;
83 82
 					}
... ...
@@ -89,26 +86,35 @@ void check_calls_by_money(unsigned int ticks, void *param)
89 89
 																			call->consumed_amount
90 90
 																			);
91 91
 				}
92
-				
93
-				if (credit_data->concurrent_calls == 0)
94
-				{
92
+
93
+				if (credit_data->concurrent_calls == 0) {
95 94
 					lock_release(&credit_data->lock);
96 95
 					continue;
97 96
 				}
98 97
 
99
-				credit_data->consumed_amount	= credit_data->ended_calls_consumed_amount + total_consumed_money;
98
+				if (_data.redis) {
99
+					LM_INFO("ec=%f, ca=%f, ca2=%f", credit_data->ended_calls_consumed_amount, total_consumed_money, credit_data->consumed_amount);
100 100
 
101
-				LM_DBG("Client [%.*s] | Ended-Calls-Credit-Spent: %f  TotalCredit/MaxCredit: %f/%f\n", credit_data->call_list->client_id.len, credit_data->call_list->client_id.s,
102
-																									credit_data->ended_calls_consumed_amount,
103
-																									credit_data->consumed_amount,
104
-																									credit_data->max_amount);
101
+					consumption_diff = credit_data->ended_calls_consumed_amount + total_consumed_money - credit_data->consumed_amount;
102
+					if (consumption_diff > 0)
103
+						redis_incr_by_double(credit_data, "consumed_amount", consumption_diff);
104
+				}
105 105
 
106
-				if (credit_data->consumed_amount >= credit_data->max_amount)
107
-				{
108
-					lock_release(&_data.money.lock);
106
+				credit_data->consumed_amount = credit_data->ended_calls_consumed_amount + total_consumed_money /* + distributed_consumption */;
107
+
108
+				LM_DBG("Client [%.*s] | Ended-Calls-Credit-Spent: %f  TotalCredit/MaxCredit: %f/%f\n",
109
+							credit_data->call_list->client_id.len, credit_data->call_list->client_id.s,
110
+							credit_data->ended_calls_consumed_amount,
111
+							credit_data->consumed_amount,
112
+							credit_data->max_amount);
113
+
114
+				if (credit_data->consumed_amount >= credit_data->max_amount) {
109 115
 					terminate_all_calls(credit_data);
116
+
117
+					// make sure the rest of the servers kill the calls belonging to this customer
118
+					redis_publish_to_kill_list(credit_data);
110 119
 					lock_release(&credit_data->lock);
111
-					return;
120
+					break;
112 121
 				}
113 122
 
114 123
 				lock_release(&credit_data->lock);
... ...
@@ -117,43 +123,40 @@ void check_calls_by_money(unsigned int ticks, void *param)
117 117
 	lock_release(&_data.money.lock);
118 118
 }
119 119
 
120
-void check_calls_by_time(unsigned int ticks, void *param)
121
-{
122
-	struct str_hash_entry *h_entry 	= NULL,
123
-			*tmp		= NULL;
124
-	call_t *tmp_call		= NULL;
120
+void check_calls_by_time(unsigned int ticks, void *param) {
121
+	struct str_hash_entry *h_entry = NULL;
122
+	struct str_hash_entry *tmp = NULL;
123
+	call_t *tmp_call = NULL;
125 124
 	int i;
126 125
 
127 126
 	lock_get(&_data.time.lock);
128 127
 
129 128
 	if (_data.time.credit_data_by_client->table)
130 129
 		for(i = 0; i < _data.time.credit_data_by_client->size; i++)
131
-			clist_foreach_safe(&_data.time.credit_data_by_client->table[i], h_entry, tmp, next)
132
-			{
133
-				credit_data_t *credit_data	= (credit_data_t *) h_entry->u.p;
134
-				call_t *call			= NULL;
135
-				int total_consumed_secs		= 0;
130
+			clist_foreach_safe(&_data.time.credit_data_by_client->table[i], h_entry, tmp, next) {
131
+				credit_data_t *credit_data = (credit_data_t *) h_entry->u.p;
132
+				call_t *call = NULL;
133
+				int total_consumed_secs = 0;
134
+				double consumption_diff = 0/*, distributed_consumption = 0*/;
136 135
 
137 136
 				lock_get(&credit_data->lock);
138 137
 
139
-				if (i > SAFE_ITERATION_THRESHOLD)
138
+				/*if (i > SAFE_ITERATION_THRESHOLD)
140 139
 				{
141
-					LM_ERR("Too many iterations for this loop: %d\n", i);
140
+					LM_ERR("Too many iterations for this loop: %d", i);
142 141
 					break;
143
-				}
142
+				} */
144 143
 
145 144
 				LM_DBG("Iterating through calls of client [%.*s]\n", credit_data->call_list->client_id.len, credit_data->call_list->client_id.s);
146 145
 
147
-				clist_foreach_safe(credit_data->call_list, call, tmp_call, next)
148
-				{
146
+				clist_foreach_safe(credit_data->call_list, call, tmp_call, next) {
149 147
 					if (!call->confirmed)
150 148
 						continue;
151 149
 
152
-					call->consumed_amount	= get_current_timestamp() - call->start_timestamp;
150
+					call->consumed_amount = get_current_timestamp() - call->start_timestamp;
153 151
 					total_consumed_secs	+= call->consumed_amount;
154 152
 
155
-					if (call->consumed_amount > call->max_amount)
156
-					{
153
+					if (call->consumed_amount > call->max_amount) {
157 154
 						LM_ALERT("[%.*s] call has exhausted its time. Breaking the loop\n", call->sip_data.callid.len, call->sip_data.callid.s);
158 155
 						break;
159 156
 					}
... ...
@@ -165,25 +168,31 @@ void check_calls_by_time(unsigned int ticks, void *param)
165 165
 																			);
166 166
 				}
167 167
 
168
-				if (credit_data->concurrent_calls == 0)
169
-				{
168
+				if (credit_data->concurrent_calls == 0) {
170 169
 					lock_release(&credit_data->lock);
171 170
 					continue;
172 171
 				}
173 172
 
174
-				credit_data->consumed_amount	= credit_data->ended_calls_consumed_amount + total_consumed_secs;
173
+				if (_data.redis) {
174
+					consumption_diff = credit_data->ended_calls_consumed_amount + total_consumed_secs - credit_data->consumed_amount;
175
+					if (consumption_diff > 0)
176
+						redis_incr_by_double(credit_data, "consumed_amount", consumption_diff);
177
+				}
178
+
179
+				credit_data->consumed_amount = credit_data->ended_calls_consumed_amount + total_consumed_secs /*+ distributed_consumption*/;
175 180
 
176 181
 				LM_DBG("Client [%.*s] | Ended-Calls-Time: %d  TotalTime/MaxTime: %d/%d\n", credit_data->call_list->client_id.len, credit_data->call_list->client_id.s,
177 182
 																									(int) credit_data->ended_calls_consumed_amount,
178 183
 																									(int) credit_data->consumed_amount,
179 184
 																									(int) credit_data->max_amount);
180 185
 
181
-				if (credit_data->consumed_amount >= credit_data->max_amount)
182
-				{				
183
-					lock_release(&_data.time.lock);
186
+				if (credit_data->consumed_amount >= credit_data->max_amount) {
184 187
 					terminate_all_calls(credit_data);
188
+
189
+					// make sure the rest of the servers kill the calls belonging to this customer
190
+					redis_publish_to_kill_list(credit_data);
185 191
 					lock_release(&credit_data->lock);
186
-					return;
192
+					break;
187 193
 				}
188 194
 
189 195
 				lock_release(&credit_data->lock);
... ...
@@ -18,7 +18,7 @@
18 18
  *
19 19
  * You should have received a copy of the GNU General Public License
20 20
  * along with this program; if not, write to the Free Software
21
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 22
  *
23 23
  */
24 24
 
... ...
@@ -18,7 +18,7 @@
18 18
  *
19 19
  * You should have received a copy of the GNU General Public License
20 20
  * along with this program; if not, write to the Free Software
21
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 22
  *
23 23
  */
24 24
 
... ...
@@ -65,154 +65,155 @@
65 65
 #include "cnxcc_check.h"
66 66
 #include "cnxcc_rpc.h"
67 67
 #include "cnxcc_select.h"
68
+#include "cnxcc_redis.h"
68 69
 
69 70
 MODULE_VERSION
70 71
 
71
-#define HT_SIZE			229
72
-#define MODULE_NAME		"cnxcc"
73
-#define NUMBER_OF_TIMERS	2
72
+#define HT_SIZE 229
73
+#define MODULE_NAME	"cnxcc"
74
+#define NUMBER_OF_TIMERS 2
74 75
 
75
-#define TRUE			1
76
-#define FALSE			0
76
+#define TRUE	1
77
+#define FALSE	!TRUE
77 78
 
78 79
 data_t _data;
79 80
 struct dlg_binds _dlgbinds;
80 81
 
81
-static int fixup_par(void** param, int param_no);
82
+static int __fixup_pvar(void** param, int param_no);
82 83
 
83 84
 /*
84 85
  *  module core functions
85 86
  */
86
-static int mod_init(void);
87
-static int child_init(int);
88
-static int init_hashtable(struct str_hash_table *ht);
87
+static int __mod_init(void);
88
+static int __child_init(int);
89
+static int __init_hashtable(struct str_hash_table *ht);
89 90
 
90 91
 /*
91 92
  * Memory management functions
92 93
  */
93
-static int shm_str_hash_alloc(struct str_hash_table *ht, int size);
94
-static void free_credit_data_hash_entry(struct str_hash_entry *e);
94
+static int __shm_str_hash_alloc(struct str_hash_table *ht, int size);
95
+static void __free_credit_data_hash_entry(struct str_hash_entry *e);
95 96
 
96 97
 /*
97 98
  * PV management functions
98 99
  */
99
-static int pv_parse_calls_param(pv_spec_p sp, str *in);
100
-static int pv_get_calls(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
101
-//static int get_str_pv(struct sip_msg* msg, str *pv_name, str *pvvalue);
100
+static int __pv_parse_calls_param(pv_spec_p sp, str *in);
101
+static int __pv_get_calls(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
102 102
 
103 103
 /*
104 104
  * Billing management functions
105 105
  */
106
-static int set_max_time(struct sip_msg* msg, char* number, char* str2);
107
-static int update_max_time(struct sip_msg* msg, char* number, char* str2);
108
-static int set_max_credit(struct sip_msg* msg, char *str_pv_client, char *str_pv_credit, char *str_pv_cps, char *str_pv_inip, char *str_pv_finp);
109
-static int set_max_channels(struct sip_msg* msg, char* str_pv_client, char* str_pv_max_chan);
110
-static int get_channel_count(struct sip_msg* msg, char* str_pv_client, char* str_pv_max_chan);
111
-static int terminate_all(struct sip_msg* msg, char* str_pv_client);
112
-
113
-static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2]);
114
-static void setup_billing(str *callid, unsigned int h_entry, unsigned int h_id);
115
-static void stop_billing(str *callid);
116
-static int add_call_by_cid(str *cid, call_t *call, credit_type_t type);
117
-static credit_data_t *get_or_create_credit_data_entry(str *client_id, credit_type_t type);
118
-static call_t *alloc_new_call_by_time(credit_data_t *credit_data, struct sip_msg *msg, int max_secs);
119
-static call_t *alloc_new_call_by_money(credit_data_t *credit_data, struct sip_msg *msg, double credit, double cost_per_second, int initial_pulse, int final_pulse);
120
-static void notify_call_termination(sip_data_t *data);
121
-static void free_call(call_t *call);
122
-static int has_to_tag(struct sip_msg *msg);
106
+static int __set_max_time(struct sip_msg* msg, char* number, char* str2);
107
+static int __update_max_time(struct sip_msg* msg, char* number, char* str2);
108
+static int __set_max_credit(struct sip_msg* msg, char *str_pv_client, char *str_pv_credit,
109
+                            char *str_pv_cps, char *str_pv_inip, char *str_pv_finp);
110
+static int __set_max_channels(struct sip_msg* msg, char* str_pv_client, char* str_pv_max_chan);
111
+static int __get_channel_count(struct sip_msg* msg, char* str_pv_client, char* str_pv_max_chan);
112
+static int __terminate_all(struct sip_msg* msg, char* str_pv_client);
113
+
114
+static void __start_billing(str *callid, str *from_uri, str *to_uri, str tags[2]);
115
+static void __setup_billing(str *callid, unsigned int h_entry, unsigned int h_id);
116
+static void __stop_billing(str *callid);
117
+static int __add_call_by_cid(str *cid, call_t *call, credit_type_t type);
118
+static call_t *__alloc_new_call_by_time(credit_data_t *credit_data, struct sip_msg *msg, int max_secs);
119
+static call_t *__alloc_new_call_by_money(credit_data_t *credit_data, struct sip_msg *msg, double credit,
120
+		                                 double cost_per_second, int initial_pulse, int final_pulse);
121
+static void __notify_call_termination(sip_data_t *data);
122
+static void __free_call(call_t *call);
123
+static int __has_to_tag(struct sip_msg *msg);
124
+static credit_data_t *__alloc_new_credit_data(str *client_id, credit_type_t type);
125
+static credit_data_t *__get_or_create_credit_data_entry(str *client_id, credit_type_t type);
123 126
 
124 127
 /*
125 128
  * MI interface
126 129
  */
127
-static struct mi_root *mi_credit_control_stats(struct mi_root *tree, void *param);
130
+static struct mi_root *__mi_credit_control_stats(struct mi_root *tree, void *param);
128 131
 
129 132
 /*
130 133
  * Dialog management callback functions
131 134
  */
132
-static void dialog_terminated_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
133
-static void dialog_confirmed_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
134
-static void dialog_created_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
135
-
136
-static pv_export_t mod_pvs[] =
137
-{
138
-	{ {"cnxcc", sizeof("cnxcc")-1 }, PVT_OTHER, pv_get_calls, 0,
139
-		                pv_parse_calls_param, 0, 0, 0 },
135
+static void __dialog_terminated_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
136
+static void __dialog_confirmed_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
137
+static void __dialog_created_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params);
138
+
139
+static pv_export_t mod_pvs[] = {
140
+	{ {"cnxcc", sizeof("cnxcc")-1 }, PVT_OTHER, __pv_get_calls, 0, __pv_parse_calls_param, 0, 0, 0 },
140 141
 	{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
141 142
 };
142 143
 
143
-static cmd_export_t cmds[] =
144
-{
145
-	{"cnxcc_set_max_time",   (cmd_function) set_max_time, 2, fixup_pvar_pvar, fixup_free_pvar_pvar, ANY_ROUTE},
146
-	{"cnxcc_update_max_time",   (cmd_function) update_max_time, 2, fixup_pvar_pvar, fixup_free_pvar_pvar, ANY_ROUTE},
147
-	{"cnxcc_set_max_credit",   (cmd_function) set_max_credit, 5, fixup_par, NULL, ANY_ROUTE},
148
-	{"cnxcc_set_max_channels",   (cmd_function) set_max_channels, 2, fixup_pvar_pvar, NULL, ANY_ROUTE},
149
-	{"cnxcc_get_channel_count",   (cmd_function) get_channel_count, 2, fixup_pvar_pvar, NULL, ANY_ROUTE},
150
-	{"cnxcc_terminate_all",   (cmd_function) terminate_all, 1, fixup_pvar_null, NULL, ANY_ROUTE},
151
-
144
+static cmd_export_t cmds[] = {
145
+	{"cnxcc_set_max_time", (cmd_function) __set_max_time, 2, fixup_pvar_pvar,
146
+        fixup_free_pvar_pvar, ANY_ROUTE},
147
+    {"cnxcc_update_max_time", (cmd_function) __update_max_time, 2, fixup_pvar_pvar,
148
+        fixup_free_pvar_pvar, ANY_ROUTE},
149
+    {"cnxcc_set_max_credit", (cmd_function) __set_max_credit, 5, __fixup_pvar,
150
+        NULL, ANY_ROUTE},
151
+    {"cnxcc_set_max_channels", (cmd_function) __set_max_channels, 2, fixup_pvar_pvar,
152
+        NULL, ANY_ROUTE},
153
+    {"cnxcc_get_channel_count", (cmd_function) __get_channel_count, 2, fixup_pvar_pvar,
154
+        NULL, ANY_ROUTE},
155
+    {"cnxcc_terminate_all", (cmd_function) __terminate_all, 1, fixup_pvar_null,
156
+        NULL, ANY_ROUTE},
152 157
 	{0,0,0,0,0,0}
153 158
 };
154 159
 
155
-static param_export_t params[] =
156
-{
157
-	{"dlg_flag",  			INT_PARAM,			&_data.ctrl_flag	},
158
-	{"credit_check_period",  	INT_PARAM,			&_data.check_period	},
160
+static param_export_t params[] = {
161
+	{"dlg_flag", INT_PARAM,	&_data.ctrl_flag },
162
+	{"credit_check_period", INT_PARAM,	&_data.check_period },
163
+	{"redis", STR_PARAM, &_data.redis_cnn_str.s },
159 164
 	{ 0, 0, 0 }
160 165
 };
161 166
 
162
-static const char* rpc_active_clients_doc[2] =
163
-{
167
+static const char* rpc_active_clients_doc[2] = {
164 168
 	"List of clients with active calls",
165
-	0
169
+	NULL
166 170
 };
167 171
 
168
-static const char* rpc_check_client_stats_doc[2] =
169
-{
172
+static const char* rpc_check_client_stats_doc[2] = {
170 173
 	"Check specific client calls",
171
-	0
174
+	NULL
172 175
 };
173 176
 
174
-static const char* rpc_kill_call_doc[2] =
175
-{
177
+static const char* rpc_kill_call_doc[2] = {
176 178
 	"Kill call using its call ID",
177
-	0
179
+	NULL
178 180
 };
179 181
 
180
-rpc_export_t ul_rpc[] =
181
-{
182
-    {"cnxcc.active_clients",	rpc_active_clients,	rpc_active_clients_doc,	0},
183
-    {"cnxcc.check_client",		rpc_check_client_stats,	rpc_check_client_stats_doc,	0},
184
-    {"cnxcc.kill_call",			rpc_kill_call,	rpc_kill_call_doc,	0},
185
-    {0, 0, 0, 0}
182
+rpc_export_t ul_rpc[] = {
183
+    { "cnxcc.active_clients", rpc_active_clients, rpc_active_clients_doc,	0},
184
+    { "cnxcc.check_client", rpc_check_client_stats, rpc_check_client_stats_doc,	0},
185
+    { "cnxcc.kill_call", rpc_kill_call, rpc_kill_call_doc, 0},
186
+    { NULL, NULL, NULL, 0}
186 187
 };
187 188
 
188 189
 /* selects declaration */
189 190
 select_row_t sel_declaration[] = {
190
-        { NULL, SEL_PARAM_STR, STR_STATIC_INIT("cnxcc"), sel_root, SEL_PARAM_EXPECTED},
191
-        { sel_root, SEL_PARAM_STR, STR_STATIC_INIT("channels"), sel_channels, SEL_PARAM_EXPECTED|CONSUME_NEXT_STR|FIXUP_CALL},
192
-        { sel_channels, SEL_PARAM_STR, STR_STATIC_INIT("count"), sel_channels_count, 0},
193
-
191
+        { NULL, SEL_PARAM_STR, STR_STATIC_INIT("cnxcc"), sel_root,
192
+        		SEL_PARAM_EXPECTED},
193
+        { sel_root, SEL_PARAM_STR, STR_STATIC_INIT("channels"), sel_channels,
194
+        		SEL_PARAM_EXPECTED|CONSUME_NEXT_STR|FIXUP_CALL},
195
+        { sel_channels, SEL_PARAM_STR, STR_STATIC_INIT("count"), sel_channels_count,
196
+        		0},
194 197
         { NULL, SEL_PARAM_STR, STR_NULL, NULL, 0}
195 198
 };
196 199
 
197 200
 /** module exports */
198
-struct module_exports exports =
199
-{
201
+struct module_exports exports = {
200 202
 	MODULE_NAME,
201
-	DEFAULT_DLFLAGS,/* dlopen flags */
203
+	DEFAULT_DLFLAGS, /* dlopen flags */
202 204
 	cmds,
203 205
 	params,
204
-	0,          	/* exported statistics */
205
-	0, 	    	/* exported MI functions */
206
-	mod_pvs,	/* exported pseudo-variables */
207
-	0,          	/* extra processes */
208
-	mod_init,   	/* module initialization function */
209
-	0,
210
-	0,
211
-	child_init	/* per-child init function */
206
+	NULL,          	/* exported statistics */
207
+	NULL, 		    /* exported MI functions */
208
+	mod_pvs,  	    /* exported pseudo-variables */
209
+	NULL,          	/* extra processes */
210
+	__mod_init,   	/* module initialization function */
211
+	NULL,
212
+	NULL,
213
+	__child_init	/* per-child init function */
212 214
 };
213 215
 
214
-static int fixup_par(void** param, int param_no)
215
-{
216
+static int __fixup_pvar(void** param, int param_no) {
216 217
 	str var;
217 218
 
218 219
 	var.s	= (char *) *param;
... ...
@@ -233,8 +234,10 @@ static int fixup_par(void** param, int param_no)
233 233
 	return 0;
234 234
 }
235 235
 
236
-static int mod_init(void)
237
-{
236
+static int __mod_init(void) {
237
+	int len;
238
+	char *chr;
239
+
238 240
 	LM_INFO("Loading " MODULE_NAME " module\n");
239 241
 
240 242
 	_data.cs_route_number = route_get(&event_rt, "cnxcc:call-shutdown");
... ...
@@ -242,53 +245,53 @@ static int mod_init(void)
242 242
 	if (_data.cs_route_number < 0)
243 243
 		LM_INFO("No cnxcc:call-shutdown event route found\n");
244 244
 
245
-	if (_data.cs_route_number > 0 && event_rt.rlist[_data.cs_route_number] == NULL)
246
-	{
245
+	if (_data.cs_route_number > 0 && event_rt.rlist[_data.cs_route_number] == NULL) {
247 246
 		LM_INFO("cnxcc:call-shutdown route is empty\n");
248 247
 		_data.cs_route_number	= -1;
249 248
 	}
250 249
 
251
-	if (_data.check_period <= 0)
252
-	{
250
+	if (_data.check_period <= 0) {
253 251
 		LM_INFO("credit_check_period cannot be less than 1 second\n");
254 252
 		return -1;
255 253
 	}
256 254
 
257
-	_data.time.credit_data_by_client	= shm_malloc(sizeof(struct str_hash_table));
258
-	_data.time.call_data_by_cid 		= shm_malloc(sizeof(struct str_hash_table));
259
-	_data.money.credit_data_by_client	= shm_malloc(sizeof(struct str_hash_table));
260
-	_data.money.call_data_by_cid 		= shm_malloc(sizeof(struct str_hash_table));
255
+	if (_data.redis_cnn_str.s)
256
+		_data.redis_cnn_str.len = strlen(_data.redis_cnn_str.s);
257
+
258
+	_data.time.credit_data_by_client = shm_malloc(sizeof(struct str_hash_table));
259
+	_data.time.call_data_by_cid = shm_malloc(sizeof(struct str_hash_table));
260
+	_data.money.credit_data_by_client = shm_malloc(sizeof(struct str_hash_table));
261
+	_data.money.call_data_by_cid = shm_malloc(sizeof(struct str_hash_table));
261 262
 	_data.channel.credit_data_by_client	= shm_malloc(sizeof(struct str_hash_table));
262
-	_data.channel.call_data_by_cid 		= shm_malloc(sizeof(struct str_hash_table));
263
+	_data.channel.call_data_by_cid = shm_malloc(sizeof(struct str_hash_table));
263 264
 
264
-	_data.stats				= (stats_t *) shm_malloc(sizeof(stats_t));
265
+	_data.stats = (stats_t *) shm_malloc(sizeof(stats_t));
265 266
 
266
-	if (!_data.stats)
267
-	{
267
+	if (!_data.stats) {
268 268
 		LM_ERR("Error allocating shared memory stats\n");
269 269
 		return -1;
270 270
 	}
271 271
 
272
-	_data.stats->active	= 0;
273
-	_data.stats->dropped	= 0;
274
-	_data.stats->total	= 0;
272
+	_data.stats->active = 0;
273
+	_data.stats->dropped = 0;
274
+	_data.stats->total = 0;
275 275
 
276
-	if (init_hashtable(_data.time.credit_data_by_client) != 0)
276
+	if (__init_hashtable(_data.time.credit_data_by_client) != 0)
277 277
 		return -1;
278 278
 
279
-	if (init_hashtable(_data.time.call_data_by_cid) != 0)
279
+	if (__init_hashtable(_data.time.call_data_by_cid) != 0)
280 280
 		return -1;
281 281
 
282
-	if (init_hashtable(_data.money.credit_data_by_client) != 0)
282
+	if (__init_hashtable(_data.money.credit_data_by_client) != 0)
283 283
 		return -1;
284 284
 
285
-	if (init_hashtable(_data.money.call_data_by_cid) != 0)
285
+	if (__init_hashtable(_data.money.call_data_by_cid) != 0)
286 286
 		return -1;
287 287
 
288
-	if (init_hashtable(_data.channel.credit_data_by_client) != 0)
288
+	if (__init_hashtable(_data.channel.credit_data_by_client) != 0)
289 289
 		return -1;
290 290
 
291
-	if (init_hashtable(_data.channel.call_data_by_cid) != 0)
291
+	if (__init_hashtable(_data.channel.call_data_by_cid) != 0)
292 292
 		return -1;
293 293
 
294 294
 	lock_init(&_data.lock);
... ...
@@ -296,7 +299,7 @@ static int mod_init(void)
296 296
 	lock_init(&_data.money.lock);
297 297
 	lock_init(&_data.channel.lock);
298 298
 
299
-	register_mi_cmd(mi_credit_control_stats, "cnxcc_stats", NULL, NULL, 0);
299
+	register_mi_cmd(__mi_credit_control_stats, "cnxcc_stats", NULL, NULL, 0);
300 300
 
301 301
 	/*
302 302
 	 * One for time based monitoring
... ...
@@ -304,102 +307,154 @@ static int mod_init(void)
304 304
 	 */
305 305
 	register_dummy_timers(NUMBER_OF_TIMERS);
306 306
 
307
-	if (rpc_register_array(ul_rpc) != 0)
308
-	{
307
+	if (rpc_register_array(ul_rpc) != 0) {
309 308
 		LM_ERR("Failed registering RPC commands\n");
310 309
 		return -1;
311 310
 	}
312 311
 
313
-	if (load_dlg_api(&_dlgbinds) != 0)
314
-	{
312
+	if (load_dlg_api(&_dlgbinds) != 0) {
315 313
 		LM_ERR("Error loading dialog API\n");
316 314
 	    return -1;
317 315
 	}
318 316
 
319
-	_dlgbinds.register_dlgcb(NULL, DLGCB_CREATED, dialog_created_callback, NULL, NULL);
317
+	_dlgbinds.register_dlgcb(NULL, DLGCB_CREATED, __dialog_created_callback, NULL, NULL);
320 318
 
321
-	 register_select_table(sel_declaration);
319
+	register_select_table(sel_declaration);
320
+
321
+	// redis configuration setup
322
+	if (_data.redis_cnn_str.len <= 0)
323
+		return 0;
324
+
325
+	// replace ";" for " ", so we can use a simpler pattern in sscanf()
326
+	for(chr = _data.redis_cnn_str.s; *chr; chr++)
327
+		if (*chr == ';')
328
+			*chr = ' ';
329
+
330
+	memset(_data.redis_cnn_info.host, 0, sizeof(_data.redis_cnn_info.host));
331
+	sscanf(_data.redis_cnn_str.s, "addr=%s port=%d db=%d", _data.redis_cnn_info.host,
332
+                                                           &_data.redis_cnn_info.port,
333
+                                                           &_data.redis_cnn_info.db);
334
+
335
+	len = strlen(_data.redis_cnn_info.host);
336
+	//
337
+	// Redis modparam validations
338
+	//
339
+	if (len == 0) {
340
+		LM_ERR("invalid host address [%s]", _data.redis_cnn_info.host);
341
+		return -1;
342
+	}
343
+
344
+	if (_data.redis_cnn_info.port <= 0) {
345
+		LM_ERR("invalid port number [%d]", _data.redis_cnn_info.port);
346
+		return -1;
347
+	}
348
+
349
+	if (_data.redis_cnn_info.db < 0) {
350
+		LM_ERR("invalid db number [%d]",_data.redis_cnn_info.db);
351
+		return -1;
352
+	}
322 353
 
354
+	LM_INFO("Redis connection info: ip=[%s], port=[%d], database=[%d]", _data.redis_cnn_info.host,
355
+			                                                            _data.redis_cnn_info.port,
356
+			                                                            _data.redis_cnn_info.db);
357
+
358
+	register_procs(3/* 2 timers + 1 redis async receiver */);
323 359
 	return 0;
324 360
 }
325 361
 
326
-static int child_init(int rank)
327
-{
362
+static int __child_init(int rank) {
363
+	int pid = 0;
364
+
365
+	if (rank!=PROC_INIT && rank!=PROC_MAIN && rank!=PROC_TCP_MAIN) {
366
+		if (_data.redis_cnn_str.len <= 0)
367
+			return 0;
368
+
369
+		_data.redis = redis_connect(_data.redis_cnn_info.host,
370
+                                        _data.redis_cnn_info.port,
371
+                                        _data.redis_cnn_info.db);
372
+		return (!_data.redis) ? -1 : 0;
373
+	}
374
+
328 375
 	if (rank != PROC_MAIN)
329 376
 		return 0;
330 377
 
378
+	if(fork_dummy_timer(PROC_TIMER, "CNXCC TB TIMER", 1, check_calls_by_money, NULL, _data.check_period) < 0) {
379
+		LM_ERR("Failed registering TB TIMER routine as process\n");
380
+		return -1;
381
+	}
331 382
 
332
-	if(fork_dummy_timer(PROC_TIMER, "CNXCC TB TIMER", 1,
333
-			check_calls_by_money, NULL, _data.check_period) < 0)
334
-	{
335
-		LM_ERR("failed to register TB TIMER routine as process\n");
383
+	if(fork_dummy_timer(PROC_TIMER, "CNXCC MB TIMER", 1, check_calls_by_time, NULL, _data.check_period) < 0) {
384
+		LM_ERR("Failed registering MB TIMER routine as process\n");
336 385
 		return -1;
337 386
 	}
338 387
 
339
-	if(fork_dummy_timer(PROC_TIMER, "CNXCC MB TIMER", 1, check_calls_by_time, NULL, _data.check_period) < 0)
340
-	{
341
-		LM_ERR("failed to register MB TIMER routine as process\n");
388
+	if (_data.redis_cnn_str.len <= 0)
389
+		return 0;
390
+
391
+
392
+	pid = fork_process(PROC_NOCHLDINIT, "Redis Async receiver", 1);
393
+
394
+	if (pid < 0) {
395
+		LM_ERR("error forking Redis receiver\n");
342 396
 		return -1;
343 397
 	}
398
+	else if (pid == 0) {
399
+		_data.redis = redis_connect_async(_data.redis_cnn_info.host,
400
+                                          _data.redis_cnn_info.port,
401
+                                          _data.redis_cnn_info.db);
402
+
403
+		return (!_data.redis) ? -1 : 0;;
404
+	}
344 405
 
345 406
 	return 0;
346 407
 }
347 408
 
348
-static int init_hashtable(struct str_hash_table *ht)
349
-{
350
-	if (shm_str_hash_alloc(ht, HT_SIZE) != 0)
351
-	{
409
+static int __init_hashtable(struct str_hash_table *ht) {
410
+	if (__shm_str_hash_alloc(ht, HT_SIZE) != 0) {
352 411
 		LM_ERR("Error allocating shared memory hashtable\n");
353 412
 		return -1;
354 413
 	}
355 414
 
356 415
 	str_hash_init(ht);
357
-
358 416
 	return 0;
359 417
 }
360 418
 
361
-static void dialog_created_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params)
362
-{
363
-	struct sip_msg *msg	= NULL;
419
+static void __dialog_created_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params) {
420
+	struct sip_msg *msg = NULL;
364 421
 
365
-	msg	= params->direction == SIP_REPLY ? params->rpl : params->req;
422
+	msg = params->direction == SIP_REPLY ? params->rpl : params->req;
366 423
 
367
-	if (msg == NULL)
368
-	{
424
+	if (msg == NULL) {
369 425
 		LM_ERR("Error getting direction of SIP msg\n");
370 426
 		return;
371 427
 	}
372 428
 
373
-	if (isflagset(msg, _data.ctrl_flag) == -1)
374
-	{
429
+	if (isflagset(msg, _data.ctrl_flag) == -1) {
375 430
 		LM_DBG("Flag is not set for this message. Ignoring\n");
376 431
 		return;
377 432
 	}
378 433
 
379 434
 	LM_DBG("Dialog created for CID [%.*s]\n", cell->callid.len, cell->callid.s);
380 435
 
381
-	_dlgbinds.register_dlgcb(cell, DLGCB_CONFIRMED, dialog_confirmed_callback, NULL, NULL);
382
-	_dlgbinds.register_dlgcb(cell, DLGCB_TERMINATED|DLGCB_FAILED|DLGCB_EXPIRED, dialog_terminated_callback, NULL, NULL);
436
+	_dlgbinds.register_dlgcb(cell, DLGCB_CONFIRMED, __dialog_confirmed_callback, NULL, NULL);
437
+	_dlgbinds.register_dlgcb(cell, DLGCB_TERMINATED|DLGCB_FAILED|DLGCB_EXPIRED, __dialog_terminated_callback, NULL, NULL);
383 438
 
384
-	setup_billing(&cell->callid, cell->h_entry, cell->h_id);
439
+	__setup_billing(&cell->callid, cell->h_entry, cell->h_id);
385 440
 }
386 441
 
387
-static void dialog_confirmed_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params)
388
-{
442
+static void __dialog_confirmed_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params) {
389 443
 	LM_DBG("Dialog confirmed for CID [%.*s]\n", cell->callid.len, cell->callid.s);
390 444
 
391
-	start_billing(&cell->callid, &cell->from_uri, &cell->to_uri, cell->tag);
445
+	__start_billing(&cell->callid, &cell->from_uri, &cell->to_uri, cell->tag);
392 446
 }
393 447
 
394
-static void dialog_terminated_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params)
395
-{
448
+static void __dialog_terminated_callback(struct dlg_cell *cell, int type, struct dlg_cb_params *params) {
396 449
 	LM_DBG("Dialog terminated for CID [%.*s]\n", cell->callid.len, cell->callid.s);
397 450
 
398
-	stop_billing(&cell->callid);
451
+	__stop_billing(&cell->callid);
399 452
 }
400 453
 
401
-static void notify_call_termination(sip_data_t *data)
402
-{
454
+static void __notify_call_termination(sip_data_t *data) {
403 455
 	struct run_act_ctx ra_ctx;
404 456
 	struct sip_msg *msg;
405 457
 
... ...
@@ -407,8 +462,7 @@ static void notify_call_termination(sip_data_t *data)
407 407
 		return;
408 408
 
409 409
 	if (faked_msg_init_with_dlg_info(&data->callid, &data->from_uri, &data->from_tag,
410
-				&data->to_uri, &data->to_tag, &msg) != 0)
411
-	{
410
+					&data->to_uri, &data->to_tag, &msg) != 0) {
412 411
 		LM_ERR("[%.*s]: error generating faked sip message\n", data->callid.len, data->callid.s);
413 412
 		return;
414 413
 	}
... ...
@@ -420,20 +474,18 @@ static void notify_call_termination(sip_data_t *data)
420 420
 		LM_ERR("Error executing cnxcc:call-shutdown route\n");
421 421
 }
422 422
 
423
-int try_get_credit_data_entry(str *client_id, credit_data_t **credit_data)
424
-{
423
+int try_get_credit_data_entry(str *client_id, credit_data_t **credit_data) {
425 424
 	struct str_hash_entry *cd_entry	= NULL;
426
-	hash_tables_t *hts		= NULL;
427
-	*credit_data			= NULL;
425
+	hash_tables_t *hts = NULL;
426
+	*credit_data = NULL;
428 427
 
429 428
 	/* by money */
430
-	hts				= &_data.money;
429
+	hts = &_data.money;
431 430
 	lock_get(&hts->lock);
432 431
 
433
-	cd_entry			= str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
432
+	cd_entry = str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
434 433
 
435
-	if (cd_entry != NULL)
436
-	{
434
+	if (cd_entry != NULL) {
437 435
 		*credit_data	= cd_entry->u.p;
438 436
 		lock_release(&hts->lock);
439 437
 		return 0;
... ...
@@ -442,13 +494,12 @@ int try_get_credit_data_entry(str *client_id, credit_data_t **credit_data)
442 442
 	lock_release(&hts->lock);
443 443
 
444 444
 	/* by time */
445
-	hts				= &_data.time;
445
+	hts = &_data.time;
446 446
 	lock_get(&hts->lock);
447 447
 
448
-	cd_entry			= str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
448
+	cd_entry = str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
449 449
 
450
-	if (cd_entry != NULL)
451
-	{
450
+	if (cd_entry != NULL) {
452 451
 		*credit_data	= cd_entry->u.p;
453 452
 		lock_release(&hts->lock);
454 453
 		return 0;
... ...
@@ -457,37 +508,34 @@ int try_get_credit_data_entry(str *client_id, credit_data_t **credit_data)
457 457
 	lock_release(&hts->lock);
458 458
 
459 459
 	/* by channel */
460
-	hts				= &_data.channel;
460
+	hts = &_data.channel;
461 461
 	lock_get(&hts->lock);
462 462
 
463
-	cd_entry			= str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
463
+	cd_entry = str_hash_get(hts->credit_data_by_client, client_id->s, client_id->len);
464 464
 
465
-	if (cd_entry != NULL)
466
-	{
465
+	if (cd_entry != NULL) {
467 466
 		*credit_data	= cd_entry->u.p;
468 467
 		lock_release(&hts->lock);
469 468
 		return 0;
470 469
 	}
471 470
 
472 471
 	lock_release(&hts->lock);
473
-
474 472
 	return -1;
475 473
 }
476 474
 
477
-int try_get_call_entry(str *callid, call_t **call, hash_tables_t **hts)
478
-{
479
-	struct str_hash_entry *call_entry	= NULL;
480
-	*call					= NULL;
475
+int try_get_call_entry(str *callid, call_t **call, hash_tables_t **hts) {
476
+	struct str_hash_entry *call_entry = NULL;
477
+
478
+	*call = NULL;
481 479
 
482 480
 	/* by money */
483
-	*hts				= &_data.money;
481
+	*hts = &_data.money;
484 482
 	lock_get(&(*hts)->lock);
485 483
 
486
-	call_entry			= str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
484
+	call_entry = str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
487 485
 
488
-	if (call_entry != NULL)
489
-	{
490
-		*call	= call_entry->u.p;
486
+	if (call_entry != NULL) {
487
+		*call = call_entry->u.p;
491 488
 		lock_release(&(*hts)->lock);
492 489
 		return 0;
493 490
 	}
... ...
@@ -495,14 +543,13 @@ int try_get_call_entry(str *callid, call_t **call, hash_tables_t **hts)
495 495
 	lock_release(&(*hts)->lock);
496 496
 
497 497
 	/* by time */
498
-	*hts				= &_data.time;
498
+	*hts = &_data.time;
499 499
 	lock_get(&(*hts)->lock);
500 500
 
501
-	call_entry			= str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
501
+	call_entry = str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
502 502
 
503
-	if (call_entry != NULL)
504
-	{
505
-		*call	= call_entry->u.p;
503
+	if (call_entry != NULL) {
504
+		*call = call_entry->u.p;
506 505
 		lock_release(&(*hts)->lock);
507 506
 		return 0;
508 507
 	}
... ...
@@ -510,80 +557,73 @@ int try_get_call_entry(str *callid, call_t **call, hash_tables_t **hts)
510 510
 	lock_release(&(*hts)->lock);
511 511
 
512 512
 	/* by channel */
513
-	*hts				= &_data.channel;
513
+	*hts = &_data.channel;
514 514
 	lock_get(&(*hts)->lock);
515 515
 
516
-	call_entry			= str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
516
+	call_entry = str_hash_get((*hts)->call_data_by_cid, callid->s, callid->len);
517 517
 
518
-	if (call_entry != NULL)
519
-	{
520
-		*call	= call_entry->u.p;
518
+	if (call_entry != NULL) {
519
+		*call = call_entry->u.p;
521 520
 		lock_release(&(*hts)->lock);
522 521
 		return 0;
523 522
 	}
524 523
 
525 524
 	lock_release(&(*hts)->lock);
526
-
527 525
 	return -1;
528 526
 }
529 527
 
530
-static void stop_billing(str *callid)
531
-{
532
-	struct str_hash_entry *cd_entry		= NULL;
533
-	call_t *call				= NULL;
534
-	hash_tables_t *hts			= NULL;
535
-	credit_data_t *credit_data		= NULL;
528
+static void __stop_billing(str *callid) {
529
+	struct str_hash_entry *cd_entry	= NULL;
530
+	call_t *call			= NULL;
531
+	hash_tables_t *hts		= NULL;
532
+	credit_data_t *credit_data	= NULL;
536 533
 
537 534
 	/*
538 535
 	 * Search call data by call-id
539 536
 	 */
540
-	if (try_get_call_entry(callid, &call, &hts) != 0)
541
-	{
537
+	if (try_get_call_entry(callid, &call, &hts) != 0) {
542 538
 		LM_ERR("Call [%.*s] not found\n", callid->len, callid->s);
543 539
 		return;
544 540
 	}
545 541
 
546
-	if (call == NULL)
547
-	{
542
+	if (call == NULL) {
548 543
 		LM_ERR("[%.*s] call pointer is null\n", callid->len, callid->s);
549 544
 		return;
550 545
 	}
551 546
 
552
-	if (hts == NULL)
553
-	{
547
+	if (hts == NULL) {
554 548
 		LM_ERR("[%.*s] result hashtable pointer is null\n", callid->len, callid->s);
555 549
 		return;
556 550
 	}
557 551
 
558 552
 	lock_get(&hts->lock);
553
+
559 554
 	/*
560 555
 	 * Search credit_data by client_id
561 556
 	 */
562
-	cd_entry	= str_hash_get(hts->credit_data_by_client, call->client_id.s, call->client_id.len);
557
+	cd_entry = str_hash_get(hts->credit_data_by_client, call->client_id.s, call->client_id.len);
563 558
 
564
-	if (cd_entry == NULL)
565
-	{
559
+	if (cd_entry == NULL) {
566 560
 		LM_ERR("Credit data not found for CID [%.*s], client-ID [%.*s]\n", callid->len, callid->s, call->client_id.len, call->client_id.s);
567 561
 		lock_release(&hts->lock);
568 562
 		return;
569 563
 	}
570 564
 
571
-	credit_data	= (credit_data_t *) cd_entry->u.p;
565
+	credit_data = (credit_data_t *) cd_entry->u.p;
572 566
 
573
-	if (credit_data == NULL)
574
-	{
567
+	if (credit_data == NULL) {
575 568
 		LM_ERR("[%.*s]: credit_data pointer is null\n", callid->len, callid->s);
576 569
 		lock_release(&hts->lock);
577 570
 		return;
578 571
 	}
579 572
 
580 573
 	lock_release(&hts->lock);
574
+
581 575
 	/*
582 576
 	 * Update calls statistics
583 577
 	 */
584 578
 	lock_get(&_data.lock);
585 579
 
586
-;
587 580
 	_data.stats->active--;
588 581
 	_data.stats->total--;
589 582
 
... ...
@@ -596,21 +636,26 @@ static void stop_billing(str *callid)
596 596
 	/*
597 597
 	 * This call just ended and we need to remove it from the summ.
598 598
 	 */
599
-	if (call->confirmed)
600
-	{
599
+	if (call->confirmed) {
601 600
 		credit_data->concurrent_calls--;
602 601
 		credit_data->ended_calls_consumed_amount += call->consumed_amount;
602
+
603
+		if (_data.redis) {
604
+			redis_incr_by_int(credit_data, "concurrent_calls", -1);
605
+			redis_incr_by_double(credit_data, "ended_calls_consumed_amount", call->consumed_amount);
606
+		}
603 607
 	}
604 608
 
605 609
 	credit_data->number_of_calls--;
606 610
 
607
-	if (credit_data->concurrent_calls < 0)
608
-	{
611
+	if (_data.redis)
612
+		redis_incr_by_int(credit_data, "number_of_calls", -1);
613
+
614
+	if (credit_data->concurrent_calls < 0) {
609 615
 		LM_ERR("[BUG]: number of concurrent calls dropped to negative value: %d\n", credit_data->concurrent_calls);
610 616
 	}
611 617
 
612
-	if (credit_data->number_of_calls < 0)
613
-	{
618
+	if (credit_data->number_of_calls < 0) {
614 619
 		LM_ERR("[BUG]: number of calls dropped to negative value: %d\n", credit_data->number_of_calls);
615 620
 	}
616 621
 
... ...
@@ -618,17 +663,23 @@ static void stop_billing(str *callid)
618 618
 	 * Remove (and free) the call from the list of calls of the current credit_data
619 619
 	 */
620 620
 	clist_rm(call, next, prev);
621
-	free_call(call);
621
+	__free_call(call);
622 622
 
623 623
 	/*
624 624
 	 * In case there are no active calls for a certain client, we remove the client-id from the hash table.
625 625
 	 * This way, we can save memory for useful clients.
626 626
 	 */
627
-	if (credit_data->number_of_calls == 0)
628
-	{
627
+	if (credit_data->number_of_calls == 0) {
629 628
 		LM_DBG("Removing client [%.*s] and its calls from the list\n", credit_data->call_list->client_id.len, credit_data->call_list->client_id.s);
630 629
 
630
+		credit_data->deallocating = 1;
631 631
 		lock(&hts->lock);
632
+
633
+		if (_data.redis) {
634
+			redis_clean_up_if_last(credit_data);
635
+			shm_free(credit_data->str_id);
636
+		}
637
+
632 638
 		/*
633 639
 		 * Remove the credit_data_t from the hash table
634 640
 		 */
... ...
@@ -650,7 +701,7 @@ static void stop_billing(str *callid)
650 650
 		/*
651 651
 		 * Free the whole entry
652 652
 		 */
653
-		free_credit_data_hash_entry(cd_entry);
653
+		__free_credit_data_hash_entry(cd_entry);
654 654
 
655 655
 		/*
656 656
 		 * return without releasing the acquired lock over credit_data. Why? Because we just freed it.
... ...
@@ -661,8 +712,7 @@ static void stop_billing(str *callid)
661 661
 	lock_release(&credit_data->lock);
662 662
 }
663 663
 
664
-static void setup_billing(str *callid, unsigned int h_entry, unsigned int h_id)
665
-{
664
+static void __setup_billing(str *callid, unsigned int h_entry, unsigned int h_id) {
666 665
 	call_t *call		= NULL;
667 666
 	hash_tables_t *hts	= NULL;
668 667
 
... ...
@@ -673,20 +723,17 @@ static void setup_billing(str *callid, unsigned int h_entry, unsigned int h_id)
673 673
 	/*
674 674
 	 * Search call data by call-id
675 675
 	 */
676
-	if (try_get_call_entry(callid, &call, &hts) != 0)
677
-	{
676
+	if (try_get_call_entry(callid, &call, &hts) != 0) {
678 677
 		LM_ERR("Call [%.*s] not found\n", callid->len, callid->s);
679 678
 		return;
680 679
 	}
681 680
 
682
-	if (call == NULL)
683
-	{
681
+	if (call == NULL) {
684 682
 		LM_ERR("[%.*s] call pointer is null\n", callid->len, callid->s);
685 683
 		return;
686 684
 	}
687 685
 
688
-	if (hts == NULL)
689
-	{
686
+	if (hts == NULL) {
690 687
 		LM_ERR("[%.*s] result hashtable pointer is null\n", callid->len, callid->s);
691 688
 		return;
692 689
 	}
... ...
@@ -703,20 +750,19 @@ static void setup_billing(str *callid, unsigned int h_entry, unsigned int h_id)
703 703
 
704 704
 	lock_get(&call->lock);
705 705
 
706
-	call->dlg_h_entry		= h_entry;
707
-	call->dlg_h_id			= h_id;
706
+	call->dlg_h_entry	= h_entry;
707
+	call->dlg_h_id		= h_id;
708 708
 
709 709
 	LM_DBG("Call [%.*s] from client [%.*s], created\n", callid->len, callid->s, call->client_id.len, call->client_id.s);
710 710
 
711 711
 	lock_release(&call->lock);
712 712
 }
713 713
 
714
-static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
715
-{
716
-	struct str_hash_entry *cd_entry		= NULL;
717
-	call_t *call				= NULL;
718
-	hash_tables_t *hts			= NULL;
719
-	credit_data_t *credit_data		= NULL;
714
+static void __start_billing(str *callid, str *from_uri, str *to_uri, str tags[2]) {
715
+	struct str_hash_entry *cd_entry	= NULL;
716
+	call_t *call			= NULL;
717
+	hash_tables_t *hts		= NULL;
718
+	credit_data_t *credit_data	= NULL;
720 719
 
721 720
 	LM_DBG("Billing started for call [%.*s]\n", callid->len, callid->s);
722 721
 
... ...
@@ -725,21 +771,18 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
725 725
 	/*
726 726
 	 * Search call data by call-id
727 727
 	 */
728
-	if (try_get_call_entry(callid, &call, &hts) != 0)
729
-	{
728
+	if (try_get_call_entry(callid, &call, &hts) != 0) {
730 729
 		LM_ERR("Call [%.*s] not found\n", callid->len, callid->s);
731 730
 		return;
732 731
 	}
733 732
 
734
-	if (call == NULL)
735
-	{
733
+	if (call == NULL) {
736 734
 		LM_ERR("[%.*s] call pointer is null\n", callid->len, callid->s);
737 735
 		return;
738 736
 	}
739 737
 
740
-	if (hts == NULL)
741
-	{
742
-		LM_ERR("[%.*s] result hashtable pointer is null\n", callid->len, callid->s);
738
+	if (hts == NULL) {
739
+		LM_ERR("[%.*s] result hashtable pointer is null", callid->len, callid->s);
743 740
 		return;
744 741
 	}
745 742
 
... ...
@@ -748,19 +791,17 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
748 748
 	/*
749 749
 	 * Search credit_data by client_id
750 750
 	 */
751
-	cd_entry			= str_hash_get(hts->credit_data_by_client, call->client_id.s, call->client_id.len);
751
+	cd_entry = str_hash_get(hts->credit_data_by_client, call->client_id.s, call->client_id.len);
752 752
 
753
-	if (cd_entry == NULL)
754
-	{
753
+	if (cd_entry == NULL) {
755 754
 		LM_ERR("Credit data not found for CID [%.*s], client-ID [%.*s]\n", callid->len, callid->s, call->client_id.len, call->client_id.s);
756 755
 		lock_release(&hts->lock);
757 756
 		return;
758 757
 	}
759 758
 
760
-	credit_data	= (credit_data_t *) cd_entry->u.p;
759
+	credit_data = (credit_data_t *) cd_entry->u.p;
761 760
 
762
-	if (credit_data == NULL)
763
-	{
761
+	if (credit_data == NULL) {
764 762
 		LM_ERR("[%.*s]: credit_data pointer is null\n", callid->len, callid->s);
765 763
 		lock_release(&hts->lock);
766 764
 		return;
... ...
@@ -777,21 +818,30 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
777 777
 	 */
778 778
 	credit_data->concurrent_calls++;
779 779
 
780
-	if (credit_data->max_amount == 0)
780
+	if (_data.redis)
781
+		redis_incr_by_int(credit_data, "concurrent_calls", 1);
782
+
783
+	if (credit_data->max_amount == 0) {
781 784
 		credit_data->max_amount	= call->max_amount; // first time setup
782 785
 
783
-	if (call->max_amount > credit_data->max_amount)
784
-	{
785
-		LM_ALERT("Maximum-speak-time/credit changed, maybe a credit reload? %f > %f. Client [%.*s]\n", call->max_amount, credit_data->max_amount,
786
+		if (_data.redis)
787
+			redis_insert_double_value(credit_data, "max_amount", credit_data->max_amount);
788
+	}
789
+
790
+	if (call->max_amount > credit_data->max_amount) {
791
+		LM_ALERT("Maximum-talk-time/credit changed, maybe a credit reload? %f > %f. Client [%.*s]\n", call->max_amount, credit_data->max_amount,
786 792
 																							call->client_id.len, call->client_id.s);
787 793
 
794
+
795
+		if (_data.redis)
796
+			redis_insert_double_value(credit_data, "max_amount", call->max_amount - credit_data->max_amount);
797
+
788 798
 		credit_data->max_amount += call->max_amount - credit_data->max_amount;
789 799
 	}
790 800
 
791 801
 	/*
792 802
 	 * Update max_amount, discounting what was already consumed by other calls of the same client
793 803
 	 */
794
-
795 804
 	call->max_amount = credit_data->max_amount - credit_data->consumed_amount;
796 805
 
797 806
 	lock_release(&credit_data->lock);
... ...
@@ -801,8 +851,7 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
801 801
 	/*
802 802
 	 * Store from-tag value
803 803
 	 */
804
-	if (shm_str_dup(&call->sip_data.from_tag, &tags[0]) != 0)
805
-	{
804
+	if (shm_str_dup(&call->sip_data.from_tag, &tags[0]) != 0) {
806 805
 		LM_ERR("No more pkg memory\n");
807 806
 		goto exit;
808 807
 	}
... ...
@@ -810,21 +859,19 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
810 810
 	/*
811 811
 	 * Store to-tag value
812 812
 	 */
813
-	if (shm_str_dup(&call->sip_data.to_tag, &tags[1]) != 0)
814
-	{
813
+	if (shm_str_dup(&call->sip_data.to_tag, &tags[1]) != 0) {
815 814
 		LM_ERR("No more pkg memory\n");
816 815
 		goto exit;
817 816
 	}
818 817
 
819 818
 	if(shm_str_dup(&call->sip_data.from_uri, from_uri) != 0 ||
820
-	   shm_str_dup(&call->sip_data.to_uri  , to_uri)   != 0)
821
-	{
819
+	   shm_str_dup(&call->sip_data.to_uri  , to_uri)   != 0) {
822 820
 		LM_ERR("No more pkg memory\n");
823 821
 		goto exit;
824 822
 	}
825 823
 
826 824
 	call->start_timestamp	= get_current_timestamp();
827
-	call->confirmed			= TRUE;
825
+	call->confirmed		= TRUE;
828 826
 
829 827
 	LM_DBG("Call [%.*s] from client [%.*s], confirmed. from=<%.*s>;tag=%.*s, to=<%.*s>;tag=%.*s\n",
830 828
 			callid->len, callid->s, call->client_id.len, call->client_id.s,
... ...
@@ -832,26 +879,24 @@ static void start_billing(str *callid, str *from_uri, str *to_uri, str tags[2])
832 832
 			call->sip_data.from_tag.len, call->sip_data.from_tag.s,
833 833
 			call->sip_data.to_uri.len, call->sip_data.to_uri.s,
834 834
 			call->sip_data.to_tag.len, call->sip_data.to_tag.s);
835
-
836 835
 exit:
837 836
 	lock_release(&call->lock);
838 837
 }
839 838
 
839
+// must be called with lock held on credit_data
840
+void terminate_all_calls(credit_data_t *credit_data) {
841
+	call_t *call = NULL,
842
+           *tmp = NULL;