Browse code

sbc: added RTP relay rate limiting and counters

Raphael Coeffic authored on 25/02/2013 13:01:11
Showing 5 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,42 @@
1
+#include "RateLimit.h"
2
+#include "AmAppTimer.h"
3
+
4
+#define min(a,b) ((a) < (b) ? (a) : (b))
5
+
6
+RateLimit::RateLimit(unsigned int rate, unsigned int peak, 
7
+		     unsigned int time_base_seconds)
8
+  : rate(rate),
9
+    peak(peak),
10
+    counter(peak)
11
+{
12
+  // wall_clock has a resolution of 20ms
13
+  time_base = (1000 * time_base_seconds) / 20;
14
+  last_update = AmAppTimer::instance()->wall_clock;
15
+}
16
+
17
+bool RateLimit::limit(unsigned int size)
18
+{
19
+  lock();
20
+
21
+  if(AmAppTimer::instance()->wall_clock - last_update 
22
+     > time_base) {
23
+
24
+    update_limit();
25
+  }
26
+
27
+  if(counter <= 0) {
28
+    unlock();
29
+    return true; // limit reached
30
+  }
31
+
32
+  counter -= size;
33
+  unlock();
34
+
35
+  return false; // do not limit
36
+}
37
+
38
+void RateLimit::update_limit()
39
+{
40
+  counter = min(peak, counter+rate);
41
+  last_update = AmAppTimer::instance()->wall_clock;
42
+}
0 43
new file mode 100644
... ...
@@ -0,0 +1,48 @@
1
+#ifndef _RateLimit_h_
2
+#define _RateLimit_h_
3
+
4
+#include "AmThread.h"
5
+#include "atomic_types.h"
6
+#include <sys/types.h>
7
+
8
+class RateLimit
9
+  : protected AmMutex
10
+{
11
+  u_int32_t last_update;
12
+  int rate;
13
+  int peak;
14
+  int counter;
15
+
16
+  unsigned int time_base;
17
+
18
+  void update_limit();
19
+
20
+public:
21
+  // rate: units/time_base
22
+  // peak: units/time_base
23
+  // time_base: seconds
24
+  RateLimit(unsigned int rate, unsigned int peak, 
25
+	    unsigned int time_base);
26
+
27
+  /**
28
+   * returns true if 'size' should be dropped
29
+   */
30
+  bool limit(unsigned int size);
31
+};
32
+
33
+class RateLimitRefCnt
34
+  : public RateLimit,
35
+    public atomic_ref_cnt
36
+{
37
+public:
38
+  // rate: units/time_base
39
+  // peak: units/time_base
40
+  // time_base: seconds
41
+  RateLimitRefCnt(unsigned int rate, unsigned int peak, int time_base)
42
+    : RateLimit(rate,peak,time_base)
43
+  {}
44
+};
45
+
46
+typedef shared_ref_cnt<RateLimitRefCnt> SharedRateLimit;
47
+
48
+#endif
... ...
@@ -160,6 +160,14 @@ SBCCallLeg::SBCCallLeg(const SBCCallProfile& call_profile, AmSipDialog* p_dlg)
160 160
 
161 161
   memset(&call_connect_ts, 0, sizeof(struct timeval));
162 162
   memset(&call_end_ts, 0, sizeof(struct timeval));
163
+
164
+  if(call_profile.rtprelay_bw_limit_rate > 0 &&
165
+     call_profile.rtprelay_bw_limit_peak > 0) {
166
+
167
+    RateLimit* limit = new RateLimit(call_profile.rtprelay_bw_limit_rate,
168
+				     call_profile.rtprelay_bw_limit_peak, 1);
169
+    rtp_relay_rate_limit.reset(limit);
170
+  }
163 171
 }
164 172
 
165 173
 // B leg constructor (from SBCCalleeSession)
... ...
@@ -186,6 +194,10 @@ SBCCallLeg::SBCCallLeg(SBCCallLeg* caller, AmSipDialog* p_dlg)
186 194
     dlg->setExtLocalTag(caller->dlg->getRemoteTag());
187 195
   }
188 196
 
197
+  if(rtp_relay_rate_limit.get()) {
198
+    rtp_relay_rate_limit.reset(new RateLimit(*rtp_relay_rate_limit.get()));
199
+  }
200
+
189 201
   // CC interfaces and variables should be already "evaluated" by A leg, we just
190 202
   // need to load the DI interfaces for us (later they will be initialized with
191 203
   // original INVITE so it must be done in A leg's thread!)
... ...
@@ -240,6 +252,9 @@ void SBCCallLeg::applyAProfile()
240 252
     else {
241 253
       setRtpRelayMode(RTP_Relay);
242 254
     }
255
+
256
+    // copy stats counters
257
+    rtp_pegs = call_profile.aleg_rtp_counters;
243 258
   }
244 259
 }
245 260
 
... ...
@@ -334,10 +349,13 @@ void SBCCallLeg::applyBProfile()
334 349
     DBG("%s\n",rtp_relay_force_symmetric_rtp ?
335 350
 	"forcing symmetric RTP (passive mode)":
336 351
 	"disabled symmetric RTP (normal mode)");
337
-  }
338 352
 
339
-  setRtpRelayTransparentSeqno(call_profile.rtprelay_transparent_seqno);
340
-  setRtpRelayTransparentSSRC(call_profile.rtprelay_transparent_ssrc);
353
+    setRtpRelayTransparentSeqno(call_profile.rtprelay_transparent_seqno);
354
+    setRtpRelayTransparentSSRC(call_profile.rtprelay_transparent_ssrc);
355
+
356
+    // copy stats counters
357
+    rtp_pegs = call_profile.bleg_rtp_counters;
358
+  }
341 359
 
342 360
   // was read from caller but reading directly from profile now
343 361
   if (!call_profile.callid.empty()) 
... ...
@@ -1166,6 +1184,24 @@ void SBCCallLeg::onBLegRefused(const AmSipReply& reply)
1166 1184
   }
1167 1185
 }
1168 1186
 
1187
+
1188
+bool SBCCallLeg::onBeforeRTPRelay(AmRtpPacket* p, sockaddr_storage* remote_addr)
1189
+{
1190
+  if(rtp_relay_rate_limit.get() &&
1191
+     rtp_relay_rate_limit->limit(p->getBufferSize()))
1192
+    return false; // drop
1193
+
1194
+  return true; // relay
1195
+}
1196
+
1197
+void SBCCallLeg::onAfterRTPRelay(AmRtpPacket* p, sockaddr_storage* remote_addr)
1198
+{
1199
+  for(list<atomic_int*>::iterator it = rtp_pegs.begin();
1200
+      it != rtp_pegs.end(); ++it) {
1201
+    (*it)->inc(p->getBufferSize());
1202
+  }
1203
+}
1204
+
1169 1205
 //////////////////////////////////////////////////////////////////////////////////////////
1170 1206
 // body filtering
1171 1207
 
... ...
@@ -4,6 +4,8 @@
4 4
 #include "SBC.h"
5 5
 #include "ExtendedCCInterface.h"
6 6
 #include "sbc_events.h"
7
+#include "RateLimit.h"
8
+#include "MeasCounter.h"
7 9
 
8 10
 class PayloadIdMapping
9 11
 {
... ...
@@ -55,6 +57,12 @@ class SBCCallLeg : public CallLeg, public CredentialHolder
55 57
    * only when CCStart was called */
56 58
   bool cc_started;
57 59
 
60
+  // Rate limiting
61
+  auto_ptr<RateLimit> rtp_relay_rate_limit;
62
+  
63
+  // Measurements
64
+  list<atomic_int*> rtp_pegs;
65
+
58 66
   void fixupCCInterface(const string& val, CCInterface& cc_if);
59 67
 
60 68
   /** handler called when the second leg is connected */
... ...
@@ -30,6 +30,7 @@
30 30
 #include "HeaderFilter.h"
31 31
 #include "ampi/UACAuthAPI.h"
32 32
 #include "ParamReplacer.h"
33
+#include "atomic_types.h"
33 34
 
34 35
 #include <set>
35 36
 #include <string>
... ...
@@ -162,6 +163,12 @@ struct SBCCallProfile
162 163
   string aleg_rtprelay_interface;
163 164
   int aleg_rtprelay_interface_value;
164 165
 
166
+  int rtprelay_bw_limit_rate;
167
+  int rtprelay_bw_limit_peak;
168
+
169
+  list<atomic_int*> aleg_rtp_counters;
170
+  list<atomic_int*> bleg_rtp_counters;
171
+
165 172
   string outbound_interface;
166 173
   int outbound_interface_value;
167 174
 
... ...
@@ -255,6 +262,8 @@ struct SBCCallProfile
255 262
     rtprelay_transparent_ssrc(true),
256 263
     rtprelay_interface_value(-1),
257 264
     aleg_rtprelay_interface_value(-1),
265
+    rtprelay_bw_limit_rate(-1),
266
+    rtprelay_bw_limit_peak(-1),
258 267
     outbound_interface_value(-1),
259 268
     contact_hiding(false), 
260 269
     reg_caching(false)