Browse code

SBC: CC: redis blacklist call control module

Stefan Sayer authored on 09/01/2012 09:41:13
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,264 @@
1
+/*
2
+ * Copyright (C) 2011 Stefan Sayer
3
+ *
4
+ * This file is part of SEMS, a free SIP media server.
5
+ *
6
+ * SEMS is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version.
10
+ *
11
+ * For a license to use the SEMS software under conditions
12
+ * other than those described here, or to purchase support for this
13
+ * software, please contact iptel.org by e-mail at the following addresses:
14
+ *    info@iptel.org
15
+ *
16
+ * SEMS is distributed in the hope that it will be useful,
17
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
18
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
+ * GNU General Public License for more details.
20
+ *
21
+ * You should have received a copy of the GNU General Public License
22
+ * along with this program; if not, write to the Free Software
23
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24
+ */
25
+
26
+#include "AmPlugIn.h"
27
+#include "log.h"
28
+#include "AmArg.h"
29
+
30
+#include "BLRedis.h"
31
+
32
+#include "ampi/SBCCallControlAPI.h"
33
+
34
+#include <string.h>
35
+
36
+class CCBLRedisFactory : public AmDynInvokeFactory
37
+{
38
+public:
39
+    CCBLRedisFactory(const string& name)
40
+	: AmDynInvokeFactory(name) {}
41
+
42
+    AmDynInvoke* getInstance(){
43
+	return CCBLRedis::instance();
44
+    }
45
+
46
+    int onLoad(){
47
+      if (CCBLRedis::instance()->onLoad())
48
+	return -1;
49
+
50
+      DBG("REDIS blacklist call control loaded.\n");
51
+
52
+      return 0;
53
+    }
54
+};
55
+
56
+EXPORT_PLUGIN_CLASS_FACTORY(CCBLRedisFactory, MOD_NAME);
57
+
58
+CCBLRedis* CCBLRedis::_instance=0;
59
+
60
+CCBLRedis* CCBLRedis::instance()
61
+{
62
+    if(!_instance)
63
+	_instance = new CCBLRedis();
64
+    return _instance;
65
+}
66
+
67
+CCBLRedis::CCBLRedis()
68
+{
69
+}
70
+
71
+CCBLRedis::~CCBLRedis() { }
72
+
73
+int CCBLRedis::onLoad() {
74
+  AmConfigReader cfg;
75
+
76
+  string redis_server = "127.0.0.1";
77
+  string redis_port = "6379";
78
+  string redis_reconnect_timers = "5,10,20,50,100,500,1000";
79
+  string redis_connections = "10";
80
+  string redis_max_conn_wait = "1000";
81
+
82
+  if(cfg.loadPluginConf(MOD_NAME)) {
83
+    INFO(MOD_NAME "configuration  file not found, assuming default "
84
+	 "configuration is fine\n");
85
+  } else {
86
+    redis_server = cfg.getParameter("redis_server", redis_server);
87
+    redis_port = cfg.getParameter("redis_port", redis_port);
88
+    redis_reconnect_timers =
89
+      cfg.getParameter("redis_reconnect_timers", redis_reconnect_timers);
90
+    redis_connections = cfg.getParameter("redis_connections", redis_connections);
91
+    redis_max_conn_wait = cfg.getParameter("redis_max_conn_wait", redis_max_conn_wait);
92
+  }
93
+
94
+  unsigned int i_redis_connections;
95
+  if (str2i(redis_connections, i_redis_connections)) {
96
+    ERROR("could not understand redis_connections=%s\n", redis_connections.c_str());
97
+    return -1;
98
+  }
99
+
100
+  unsigned int i_redis_port;
101
+  if (str2i(redis_port, i_redis_port)) {
102
+    ERROR("could not understand redis_port=%s\n", redis_port.c_str());
103
+    return -1;
104
+  }
105
+
106
+ unsigned int i_redis_max_conn_wait;
107
+  if (str2i(redis_max_conn_wait, i_redis_max_conn_wait)) {
108
+    ERROR("could not understand redis_max_conn_wait=%s\n", redis_max_conn_wait.c_str());
109
+    return -1;
110
+  }
111
+
112
+ std::vector<unsigned int> reconnect_timers;
113
+ std::vector<string> timeouts_v = explode(redis_reconnect_timers, ",");
114
+  for (std::vector<string>::iterator it=
115
+         timeouts_v.begin(); it != timeouts_v.end(); it++) {
116
+    int r;
117
+    if (!str2int(*it, r)) {
118
+      ERROR("REDIS reconnect timeout '%s' not understood\n",
119
+            it->c_str());
120
+      return -1;
121
+    }
122
+    reconnect_timers.push_back(r);
123
+  }
124
+
125
+  connection_pool.set_config(redis_server, i_redis_port,
126
+			     reconnect_timers, i_redis_max_conn_wait);
127
+  connection_pool.add_connections(i_redis_connections);
128
+  connection_pool.start();
129
+
130
+  return 0;
131
+}
132
+
133
+void CCBLRedis::invoke(const string& method, const AmArg& args, AmArg& ret)
134
+{
135
+  DBG("CCBLRedis: %s(%s)\n", method.c_str(), AmArg::print(args).c_str());
136
+
137
+  if(method == "start"){
138
+    // INFO("--------------------------------------------------------------\n");
139
+    // INFO("Got call control start ltag '%s' start_ts %i.%i\n",
140
+    // 	   args.get(0).asCStr(), args[2][0].asInt(), args[2][1].asInt());
141
+    // INFO("---- dumping CC values ----\n");
142
+    // for (AmArg::ValueStruct::const_iterator it =
143
+    // 	     args.get(CC_API_PARAMS_CFGVALUES).begin();
144
+    //               it != args.get(CC_API_PARAMS_CFGVALUES).end(); it++) {
145
+    // 	INFO("    CDR value '%s' = '%s'\n", it->first.c_str(), it->second.asCStr());
146
+    // }
147
+    // INFO("--------------------------------------------------------------\n");
148
+
149
+    // cc_name, ltag, call profile, timestamps, [[key: val], ...], timer_id
150
+    //args.assertArrayFmt("ssoaui");
151
+    //args[CC_API_PARAMS_TIMESTAMPS].assertArrayFmt("iiiiii");
152
+
153
+    SBCCallProfile* call_profile =
154
+      dynamic_cast<SBCCallProfile*>(args[CC_API_PARAMS_CALL_PROFILE].asObject());
155
+
156
+    start(args[CC_API_PARAMS_CC_NAMESPACE].asCStr(),
157
+	  args[CC_API_PARAMS_LTAG].asCStr(),
158
+	  call_profile,
159
+	  args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_START_SEC].asInt(),
160
+	  args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_START_USEC].asInt(),
161
+	  args[CC_API_PARAMS_CFGVALUES],
162
+	  args[CC_API_PARAMS_TIMERID].asInt(),  ret);
163
+
164
+  } else if(method == "connect"){
165
+    // INFO("--------------------------------------------------------------\n");
166
+    // INFO("Got CDR connect ltag '%s' other_ltag '%s', connect_ts %i.%i\n",
167
+    // 	   args[CC_API_PARAMS_LTAG].asCStr(),
168
+    //           args[CC_API_PARAMS_OTHERID].asCStr(),
169
+    //           args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_CONNECT_SEC].asInt(),
170
+    //           args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_CONNECT_USEC].asInt());
171
+    // INFO("--------------------------------------------------------------\n");
172
+    // cc_name, ltag, call_profile, other_ltag, connect_ts_sec, connect_ts_usec
173
+    // args.assertArrayFmt("ssoas");
174
+    // args[CC_API_PARAMS_TIMESTAMPS].assertArrayFmt("iiiiii");
175
+
176
+    SBCCallProfile* call_profile =
177
+      dynamic_cast<SBCCallProfile*>(args[CC_API_PARAMS_CALL_PROFILE].asObject());
178
+
179
+    connect(args[CC_API_PARAMS_CC_NAMESPACE].asCStr(),
180
+	    args[CC_API_PARAMS_LTAG].asCStr(),
181
+	    call_profile,
182
+	    args[CC_API_PARAMS_OTHERID].asCStr(),
183
+	    args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_CONNECT_SEC].asInt(),
184
+	    args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_CONNECT_USEC].asInt());
185
+
186
+  } else if(method == "end"){
187
+    // INFO("--------------------------------------------------------------\n");
188
+    // INFO("Got CDR end ltag %s end_ts %i.%i\n",
189
+    // 	   args[CC_API_PARAMS_LTAG].asCStr(),
190
+    //           args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_END_SEC].asInt(),
191
+    //           args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_END_USEC].asInt());
192
+    // INFO("--------------------------------------------------------------\n");
193
+
194
+    // cc_name, ltag, call_profile, end_ts_sec, end_ts_usec
195
+    // args.assertArrayFmt("ssoa"); 
196
+    // args[CC_API_PARAMS_TIMESTAMPS].assertArrayFmt("iiiiii");
197
+
198
+    SBCCallProfile* call_profile =
199
+      dynamic_cast<SBCCallProfile*>(args[CC_API_PARAMS_CALL_PROFILE].asObject());
200
+
201
+    end(args[CC_API_PARAMS_CC_NAMESPACE].asCStr(),
202
+	args[CC_API_PARAMS_LTAG].asCStr(),
203
+	call_profile,
204
+	args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_END_SEC].asInt(),
205
+	args[CC_API_PARAMS_TIMESTAMPS][CC_API_TS_END_USEC].asInt()
206
+	);
207
+  } else if(method == "_list"){
208
+    ret.push("start");
209
+    ret.push("connect");
210
+    ret.push("end");
211
+  }
212
+  else
213
+    throw AmDynInvoke::NotImplemented(method);
214
+}
215
+
216
+void CCBLRedis::start(const string& cc_name, const string& ltag,
217
+		       SBCCallProfile* call_profile,
218
+		       int start_ts_sec, int start_ts_usec,
219
+		       const AmArg& values, int timer_id, AmArg& res) {
220
+  // start code here
221
+  res.push(AmArg());
222
+  AmArg& res_cmd = res[0];
223
+
224
+  
225
+  redisContext* c = connection_pool.getActiveConnection();
226
+  if (NULL == c) {
227
+   INFO("no connection to REDIS\n");
228
+    res_cmd[SBC_CC_ACTION] = SBC_CC_REFUSE_ACTION;
229
+    res_cmd[SBC_CC_REFUSE_CODE] = 500;
230
+    res_cmd[SBC_CC_REFUSE_REASON] = "Server Internal Error";  
231
+    return;
232
+  }
233
+
234
+  DBG("using redis connection [%p]\n", c);
235
+
236
+  connection_pool.returnConnection(c);
237
+
238
+  // Drop:
239
+  // res_cmd[SBC_CC_ACTION] = SBC_CC_DROP_ACTION;
240
+
241
+  // res_cmd[SBC_CC_ACTION] = SBC_CC_REFUSE_ACTION;
242
+  // res_cmd[SBC_CC_REFUSE_CODE] = 404;
243
+  // res_cmd[SBC_CC_REFUSE_REASON] = "No, not here";  
244
+
245
+  // Set Timer:
246
+  // DBG("my timer ID will be %i\n", timer_id);
247
+  // res_cmd[SBC_CC_ACTION] = SBC_CC_SET_CALL_TIMER_ACTION;
248
+  // res_cmd[SBC_CC_TIMER_TIMEOUT] = 5;
249
+}
250
+
251
+void CCBLRedis::connect(const string& cc_name, const string& ltag,
252
+			 SBCCallProfile* call_profile,
253
+			 const string& other_tag,
254
+			 int connect_ts_sec, int connect_ts_usec) {
255
+  // connect code here
256
+
257
+}
258
+
259
+void CCBLRedis::end(const string& cc_name, const string& ltag,
260
+		     SBCCallProfile* call_profile,
261
+		     int end_ts_sec, int end_ts_usec) {
262
+  // end code here
263
+
264
+}
0 265
new file mode 100644
... ...
@@ -0,0 +1,60 @@
1
+/*
2
+ * Copyright (C) 2012 Stefan Sayer
3
+ *
4
+ * This file is part of SEMS, a free SIP media server.
5
+ *
6
+ * SEMS is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version.
10
+ *
11
+ * For a license to use the SEMS software under conditions
12
+ * other than those described here, or to purchase support for this
13
+ * software, please contact iptel.org by e-mail at the following addresses:
14
+ *    info@iptel.org
15
+ *
16
+ * SEMS is distributed in the hope that it will be useful,
17
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
18
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
+ * GNU General Public License for more details.
20
+ *
21
+ * You should have received a copy of the GNU General Public License
22
+ * along with this program; if not, write to the Free Software
23
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24
+ */
25
+
26
+#ifndef _CC_BL_REDIS_H
27
+#define _CC_BL_REDIS_H
28
+
29
+#include "AmApi.h"
30
+#include "RedisConnectionPool.h"
31
+
32
+#include "SBCCallProfile.h"
33
+
34
+/**
35
+ * REDIS blacklist query call control module
36
+ */
37
+class CCBLRedis : public AmDynInvoke
38
+{
39
+  static CCBLRedis* _instance;
40
+
41
+  void start(const string& cc_name, const string& ltag, SBCCallProfile* call_profile,
42
+	     int start_ts_sec, int start_ts_usec, const AmArg& values,
43
+	     int timer_id, AmArg& res);
44
+  void connect(const string& cc_name, const string& ltag, SBCCallProfile* call_profile,
45
+	       const string& other_ltag,
46
+	       int connect_ts_sec, int connect_ts_usec);
47
+  void end(const string& cc_name, const string& ltag, SBCCallProfile* call_profile,
48
+	   int end_ts_sec, int end_ts_usec);
49
+
50
+  RedisConnectionPool connection_pool;
51
+
52
+ public:
53
+  CCBLRedis();
54
+  ~CCBLRedis();
55
+  static CCBLRedis* instance();
56
+  void invoke(const string& method, const AmArg& args, AmArg& ret);
57
+  int onLoad();
58
+};
59
+
60
+#endif 
0 61
new file mode 100644
... ...
@@ -0,0 +1,13 @@
1
+plug_in_name = cc_bl_redis
2
+sbc_app_path = ../..
3
+
4
+HIREDIS_DIR?=hiredis/
5
+
6
+module_extra_objs = $(HIREDIS_DIR)libhiredis.a 
7
+module_cflags  = -DMOD_NAME=\"$(plug_in_name)\" -I$(sbc_app_path) -I$(HIREDIS_DIR)
8
+
9
+COREPATH =../../../../core
10
+include $(COREPATH)/plug-in/Makefile.app_module
11
+
12
+$(HIREDIS_DIR)libhiredis.a: $(HIREDIS_DIR)
13
+	$(SH) $(MAKE) -C $(HIREDIS_DIR) libhiredis.a
0 14
new file mode 100644
... ...
@@ -0,0 +1,145 @@
1
+#include "RedisConnectionPool.h"
2
+#include "log.h"
3
+
4
+
5
+RedisConnectionPool::RedisConnectionPool() 
6
+  : total_connections(0), failed_connections(0),
7
+    have_active_connection(false), try_connect(true)
8
+{
9
+}
10
+
11
+RedisConnectionPool::~RedisConnectionPool() {
12
+
13
+  //     redisFree(redis_context);
14
+}
15
+
16
+void RedisConnectionPool::set_config(string& server, unsigned int port, 
17
+				     vector<unsigned int> timers,
18
+				     unsigned int max_conn_wait) {
19
+  redis_server = server;
20
+  redis_port = port;
21
+  retry_timers = timers;
22
+  retry_index = 0;
23
+  max_wait = max_conn_wait;
24
+}
25
+
26
+void RedisConnectionPool::add_connections(unsigned int count) {
27
+  connections_mut.lock();
28
+  failed_connections += count;
29
+  total_connections += count;
30
+  connections_mut.unlock();
31
+  try_connect.set(true);
32
+}
33
+
34
+void RedisConnectionPool::returnConnection(redisContext* c) {
35
+  connections_mut.lock();
36
+  connections.push_back(c);
37
+  size_t active_size = connections.size();
38
+  have_active_connection.set(true);
39
+  connections_mut.unlock();
40
+  DBG("Now %zd active connections\n", active_size);
41
+}
42
+
43
+void RedisConnectionPool::returnFailedConnection(redisContext* c) {
44
+  redisFree(c);
45
+  connections_mut.lock();
46
+  failed_connections++;
47
+  unsigned int inactive_size = failed_connections;
48
+  connections_mut.unlock();
49
+
50
+  DBG("Now %u inactive connections\n", inactive_size);
51
+  retry_index = 0;
52
+  try_connect.set(true);
53
+
54
+  // if this was the last active connection returned, alert waiting
55
+  // threads so they get error back
56
+  have_active_connection.set(true);
57
+}
58
+
59
+redisContext* RedisConnectionPool::getActiveConnection() {
60
+  redisContext* res = NULL;
61
+  while (NULL == res) {
62
+
63
+    connections_mut.lock();
64
+    if (connections.size()) {
65
+      res = connections.front();
66
+      connections.pop_front();
67
+      have_active_connection.set(!connections.empty());
68
+    }
69
+    connections_mut.unlock();
70
+
71
+    if (NULL == res) {
72
+      // check if all connections broken -> return null
73
+      connections_mut.lock();
74
+      bool all_inactive = total_connections == failed_connections;
75
+      connections_mut.unlock();
76
+
77
+      if (all_inactive) {
78
+	DBG("all connections inactive - returning NO connection\n");
79
+	return NULL;
80
+      }
81
+
82
+      // wait until a connection is back
83
+      DBG("waiting for an active connection to return\n");
84
+      if (!have_active_connection.wait_for_to(max_wait)) {
85
+	WARN("timeout waiting for an active connection (waited %ums)\n",
86
+	     max_wait);
87
+	break;
88
+      }
89
+    } else {
90
+      DBG("got active connection [%p]\n", res);
91
+    }
92
+  }
93
+
94
+  return res;
95
+}
96
+
97
+
98
+void RedisConnectionPool::run() {
99
+  DBG("RedisConnectionPool thread starting\n");
100
+  try_connect.set(true);
101
+
102
+  while (true) {
103
+    try_connect.wait_for();
104
+    while (true) {
105
+      connections_mut.lock();
106
+      unsigned int m_failed_connections = failed_connections;
107
+      connections_mut.unlock();
108
+
109
+      if (!m_failed_connections)
110
+	break;
111
+
112
+      // add connections until error occurs
113
+      redisContext* context = redisConnect((char*)redis_server.c_str(), redis_port);
114
+      if (!context->err) {
115
+	DBG("successfully connected to server %s:%u [%p]\n",
116
+	    redis_server.c_str(), redis_port, context);
117
+	returnConnection(context);
118
+	retry_index=0;
119
+	connections_mut.lock();
120
+	failed_connections--;
121
+	connections_mut.unlock();
122
+      } else {
123
+	DBG("connection to %s%u failed: '%s'\n",
124
+	    redis_server.c_str(), redis_port, context->errstr);
125
+	redisFree(context);
126
+	if (retry_timers.size()) {
127
+	  DBG("waiting for retry %u ms (index %u)\n",
128
+	      retry_timers[retry_index], retry_index);
129
+	  usleep(retry_timers[retry_index]*1000);
130
+	  if (retry_index<retry_timers.size()-1)
131
+	    retry_index++;
132
+	} else {
133
+	  DBG("waiting for retry 50 ms\n");
134
+	  usleep(50);
135
+	}
136
+      }
137
+    }
138
+  }
139
+}
140
+
141
+void RedisConnectionPool::on_stop() {
142
+
143
+}
144
+
145
+
0 146
new file mode 100644
... ...
@@ -0,0 +1,53 @@
1
+#ifndef _RedisConnectionPool_h_
2
+#define _RedisConnectionPool_h_
3
+
4
+#include "hiredis.h"
5
+#include "AmThread.h"
6
+
7
+#include <string>
8
+#include <list>
9
+#include <vector>
10
+
11
+using std::string;
12
+using std::list;
13
+using std::vector;
14
+
15
+class RedisConnectionPool
16
+: public AmThread
17
+{
18
+
19
+  list<redisContext*> connections;
20
+  unsigned int total_connections;
21
+  unsigned int failed_connections;
22
+  AmMutex connections_mut;
23
+
24
+  AmCondition<bool> have_active_connection;
25
+  AmCondition<bool> try_connect;
26
+
27
+  vector<unsigned int> retry_timers;
28
+  unsigned int retry_index;
29
+
30
+  string redis_server;
31
+  unsigned int redis_port;
32
+  unsigned int max_wait;  
33
+
34
+ public:
35
+  RedisConnectionPool();
36
+  ~RedisConnectionPool();
37
+
38
+  redisContext* getActiveConnection();
39
+  
40
+  void returnConnection(redisContext* c);
41
+
42
+  void returnFailedConnection(redisContext* c);
43
+
44
+  void set_config(string& server, unsigned int port, 
45
+		  vector<unsigned int> timers, unsigned int max_conn_wait);
46
+
47
+  void add_connections(unsigned int count);
48
+
49
+  void run();
50
+  void on_stop();
51
+};
52
+
53
+#endif
0 54
new file mode 100644
... ...
@@ -0,0 +1,12 @@
1
+
2
+# REDIS server, default 127.0.0.1
3
+#redis_server=localhost
4
+
5
+# REDIS port, default 6379
6
+#redis_port=8094
7
+
8
+# reconnect timers (in milliseconds), separated by comma (,)
9
+#redis_reconnect_timers=5,10,20,50,100,500,1000
10
+
11
+# number of connections, default 10
12
+#redis_connections=10
0 13
\ No newline at end of file