Browse code

sbc: support for RTP/RTCP packet logging

Václav Kubart authored on 22/04/2013 10:12:45 • Raphael Coeffic committed on 22/05/2013 19:50:18
Showing 11 changed files
... ...
@@ -152,7 +152,8 @@ SBCCallLeg::SBCCallLeg(const SBCCallProfile& call_profile, AmSipDialog* p_dlg)
152 152
     call_profile(call_profile),
153 153
     cc_timer_id(SBC_TIMER_ID_CALL_TIMERS_START),
154 154
     ext_cc_timer_id(SBC_TIMER_ID_CALL_TIMERS_END + 1),
155
-    cc_started(false)
155
+    cc_started(false),
156
+    logger(NULL)
156 157
 {
157 158
   set_sip_relay_only(false);
158 159
   dlg->setRel100State(Am100rel::REL100_IGNORED);
... ...
@@ -178,7 +179,8 @@ SBCCallLeg::SBCCallLeg(SBCCallLeg* caller, AmSipDialog* p_dlg)
178 179
   : auth(NULL),
179 180
     call_profile(caller->getCallProfile()),
180 181
     CallLeg(caller,p_dlg),
181
-    cc_started(false)
182
+    cc_started(false),
183
+    logger(NULL)
182 184
 {
183 185
   // FIXME: do we want to inherit cc_vars from caller?
184 186
   // Can be pretty dangerous when caller stored pointer to object - we should
... ...
@@ -209,6 +211,8 @@ SBCCallLeg::SBCCallLeg(SBCCallLeg* caller, AmSipDialog* p_dlg)
209 211
   }
210 212
 
211 213
   initCCModules();
214
+
215
+  setLogger(caller->getLogger());
212 216
 }
213 217
 
214 218
 void SBCCallLeg::onStart()
... ...
@@ -433,6 +437,7 @@ SBCCallLeg::~SBCCallLeg()
433 437
 {
434 438
   if (auth)
435 439
     delete auth;
440
+  if (logger) dec_ref(logger);
436 441
 }
437 442
 
438 443
 void SBCCallLeg::onBeforeDestroy()
... ...
@@ -1002,23 +1007,12 @@ bool SBCCallLeg::CCStart(const AmSipRequest& req) {
1002 1007
       return false;
1003 1008
     }
1004 1009
 
1005
-    if (!dlg->getMsgLogger() && !call_profile.msg_logger_path.empty()) {
1010
+    if (!logger && !call_profile.msg_logger_path.empty()) {
1011
+      // open the logger if not already opened
1006 1012
       ParamReplacerCtx ctx;
1007 1013
       string log_path = ctx.replaceParameters(call_profile.msg_logger_path,
1008 1014
 					      "msg_logger_path",req);
1009
-      file_msg_logger* logger = new pcap_logger();
1010
-      if(!logger->open(log_path.c_str())) {
1011
-        req.tt.lock_bucket();
1012
-        const sip_trans* t = req.tt.get_trans();
1013
-        if (t) {
1014
-          sip_msg* msg = t->msg;
1015
-          logger->log(msg->buf,msg->len,&msg->remote_ip,
1016
-              &msg->local_ip,msg->u.request->method_str);
1017
-        }
1018
-        req.tt.unlock_bucket();
1019
-        dlg->setMsgLogger(logger);
1020
-      }
1021
-      else delete logger;
1015
+      if (openLogger(log_path)) logRequest(req);
1022 1016
     }
1023 1017
 
1024 1018
     // evaluate ret
... ...
@@ -1648,3 +1642,50 @@ void SBCCallLeg::createHoldRequest(AmSdp &sdp)
1648 1642
   }
1649 1643
   CallLeg::createHoldRequest(sdp);
1650 1644
 }
1645
+
1646
+void SBCCallLeg::setMediaSession(AmB2BMedia *new_session)
1647
+{
1648
+  if (new_session && call_profile.log_rtp) new_session->setRtpLogger(logger);
1649
+  CallLeg::setMediaSession(new_session);
1650
+}
1651
+
1652
+bool SBCCallLeg::openLogger(const std::string &path)
1653
+{
1654
+  file_msg_logger *log = new pcap_logger();
1655
+
1656
+  if(log->open(path.c_str()) != 0) {
1657
+    // open error
1658
+    delete log;
1659
+    return false;
1660
+  }
1661
+
1662
+  // opened successfully
1663
+  setLogger(log);
1664
+  return true;
1665
+}
1666
+
1667
+void SBCCallLeg::setLogger(msg_logger *_logger)
1668
+{
1669
+  if (logger) dec_ref(logger); // release the old one
1670
+
1671
+  logger = _logger;
1672
+  inc_ref(logger);
1673
+  if (call_profile.log_sip) dlg->setMsgLogger(logger);
1674
+
1675
+  AmB2BMedia *m = getMediaSession();
1676
+  if (m && call_profile.log_rtp) m->setRtpLogger(logger);
1677
+}
1678
+
1679
+void SBCCallLeg::logRequest(const AmSipRequest &req)
1680
+{
1681
+  if (!call_profile.log_sip) return;
1682
+
1683
+  req.tt.lock_bucket();
1684
+  const sip_trans* t = req.tt.get_trans();
1685
+  if (t) {
1686
+    sip_msg* msg = t->msg;
1687
+    logger->log(msg->buf,msg->len,&msg->remote_ip,
1688
+        &msg->local_ip,msg->u.request->method_str);
1689
+  }
1690
+  req.tt.unlock_bucket();
1691
+}
... ...
@@ -63,6 +63,13 @@ class SBCCallLeg : public CallLeg, public CredentialHolder
63 63
   // Measurements
64 64
   list<atomic_int*> rtp_pegs;
65 65
 
66
+  /** common logger for RTP/RTCP and SIP packets */
67
+  msg_logger *logger;
68
+
69
+  bool openLogger(const std::string &path);
70
+  void logRequest(const AmSipRequest &req);
71
+  void setLogger(msg_logger *_logger);
72
+
66 73
   void fixupCCInterface(const string& val, CCInterface& cc_if);
67 74
 
68 75
   /** handler called when the second leg is connected */
... ...
@@ -172,6 +179,8 @@ class SBCCallLeg : public CallLeg, public CredentialHolder
172 179
   // timers accessible from CC modules
173 180
   int startTimer(double timeout) { setTimer(ext_cc_timer_id, timeout); return ext_cc_timer_id++; }
174 181
 
182
+  virtual void setMediaSession(AmB2BMedia *new_session);
183
+
175 184
  protected:
176 185
 
177 186
   void setOtherId(const AmSipReply& reply);
... ...
@@ -192,6 +201,8 @@ class SBCCallLeg : public CallLeg, public CredentialHolder
192 201
   virtual void createHoldRequest(AmSdp &sdp);
193 202
 
194 203
   int applySSTCfg(AmConfigReader& sst_cfg, const AmSipRequest* p_req);
204
+
205
+  msg_logger *getLogger() { return logger; }
195 206
 };
196 207
 
197 208
 #endif
... ...
@@ -362,6 +362,8 @@ bool SBCCallProfile::readFromConfiguration(const string& name,
362 362
   if (!transcoder.readConfig(cfg)) return false;
363 363
 
364 364
   msg_logger_path = cfg.getParameter("msg_logger_path");
365
+  log_rtp = cfg.getParameter("log_rtp","no") == "yes";
366
+  log_sip = cfg.getParameter("log_sip","yes") == "yes";
365 367
 
366 368
   reg_caching = cfg.getParameter("enable_reg_caching","no") == "yes";
367 369
   min_reg_expires = cfg.getParameterInt("min_reg_expires",0);
... ...
@@ -252,6 +252,8 @@ struct SBCCallProfile
252 252
 
253 253
   // message logging feature
254 254
   string msg_logger_path;
255
+  bool log_rtp;
256
+  bool log_sip;
255 257
 
256 258
   SBCCallProfile()
257 259
   : auth_enabled(false),
... ...
@@ -268,7 +270,9 @@ struct SBCCallProfile
268 270
     rtprelay_bw_limit_peak(-1),
269 271
     outbound_interface_value(-1),
270 272
     contact_hiding(false), 
271
-    reg_caching(false)
273
+    reg_caching(false),
274
+    log_rtp(false),
275
+    log_sip(false)
272 276
   { }
273 277
 
274 278
   ~SBCCallProfile()
... ...
@@ -5,6 +5,7 @@
5 5
 #include <strings.h>
6 6
 #include "AmB2BSession.h"
7 7
 #include "AmRtpReceiver.h"
8
+#include "sip/msg_logger.h"
8 9
 
9 10
 #include <algorithm>
10 11
 
... ...
@@ -495,6 +496,11 @@ AmB2BMedia::AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b):
495 496
 { 
496 497
 }
497 498
 
499
+AmB2BMedia::~AmB2BMedia()
500
+{
501
+  if (logger) dec_ref(logger);
502
+}
503
+
498 504
 void AmB2BMedia::changeSession(bool a_leg, AmB2BSession *new_session)
499 505
 {
500 506
   mutex.lock();
... ...
@@ -552,6 +558,9 @@ void AmB2BMedia::changeSessionUnsafe(bool a_leg, AmB2BSession *new_session)
552 558
       }
553 559
     }
554 560
 
561
+    // reset logger (needed if a stream changes)
562
+    i->setLogger(logger);
563
+
555 564
     // return back for processing if needed
556 565
     if (processing_started) {
557 566
       i->a.resumeStreamProcessing();
... ...
@@ -618,7 +627,10 @@ void AmB2BMedia::clearAudio(bool a_leg)
618 627
   // forget sessions to avoid using them once clearAudio is called
619 628
   changeSessionUnsafe(a_leg, NULL);
620 629
 
621
-  if (!a && !b) audio.clear(); // both legs cleared
630
+  if (!a && !b) {
631
+    audio.clear(); // both legs cleared
632
+    // FIXME: release relay_streams!
633
+  }
622 634
 
623 635
   mutex.unlock();
624 636
 }
... ...
@@ -651,6 +663,7 @@ void AmB2BMedia::createStreams(const AmSdp &sdp)
651 663
       if (create_audio) {
652 664
         AudioStreamPair pair(a, b, idx);
653 665
         audio.push_back(pair);
666
+        audio.back().setLogger(logger);
654 667
       }
655 668
       else if (++astreams == audio.end()) create_audio = true; // we went through the last audio stream
656 669
     }
... ...
@@ -663,6 +676,7 @@ void AmB2BMedia::createStreams(const AmSdp &sdp)
663 676
     {
664 677
       if (create_relay) {
665 678
 	relay_streams.push_back(new RelayStreamPair(a, b));
679
+        relay_streams.back()->setLogger(logger);
666 680
       }
667 681
       else if (++rstreams == relay_streams.end()) create_relay = true; // we went through the last relay stream
668 682
     }
... ...
@@ -1147,3 +1161,14 @@ void AmB2BMedia::createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer,
1147 1161
 
1148 1162
   mutex.unlock();
1149 1163
 }
1164
+
1165
+void AmB2BMedia::setRtpLogger(msg_logger* _logger)
1166
+{
1167
+  if (logger) dec_ref(logger);
1168
+  logger = _logger;
1169
+  if (logger) inc_ref(logger);
1170
+
1171
+  // walk through all the streams and use logger for them
1172
+  for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) i->setLogger(logger);
1173
+  for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) (*j)->setLogger(logger);
1174
+}
... ...
@@ -188,6 +188,8 @@ class AudioStreamData {
188 188
     void mute(bool set_mute);
189 189
     void setInput(AmAudio *_in) { in = _in; }
190 190
     AmAudio *getInput() { return in; }
191
+
192
+    void setLogger(msg_logger *logger) { if (stream) stream->setLogger(logger); }
191 193
 };
192 194
 
193 195
 /** \brief Class for control over media relaying and transcoding in a B2B session.
... ...
@@ -271,11 +273,13 @@ class AmB2BMedia: public AmMediaSession
271 273
       AudioStreamData a, b;
272 274
       int media_idx;
273 275
       AudioStreamPair(AmB2BSession *_a, AmB2BSession *_b, int _media_idx): a(_a), b(_b), media_idx(_media_idx) { }
276
+      void setLogger(msg_logger *logger) { a.setLogger(logger); b.setLogger(logger); }
274 277
     };
275 278
 
276 279
     struct RelayStreamPair {
277 280
       AmRtpStream a, b;
278 281
       RelayStreamPair(AmB2BSession *_a, AmB2BSession *_b);
282
+      void setLogger(msg_logger *logger) { a.setLogger(logger); b.setLogger(logger); }
279 283
     };
280 284
 
281 285
     typedef std::vector<AudioStreamPair>::iterator AudioStreamIterator;
... ...
@@ -324,8 +328,11 @@ class AmB2BMedia: public AmMediaSession
324 328
     void setMuteFlag(bool a_leg, bool set);
325 329
     void changeSessionUnsafe(bool a_leg, AmB2BSession *new_session);
326 330
 
331
+    msg_logger* logger; // log RTP traffic
332
+
327 333
   public:
328 334
     AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b);
335
+    virtual ~AmB2BMedia();
329 336
 
330 337
     void changeSession(bool a_leg, AmB2BSession *new_session);
331 338
 
... ...
@@ -416,6 +423,8 @@ class AmB2BMedia: public AmMediaSession
416 423
 
417 424
     void setFirstStreamInput(bool a_leg, AmAudio *in);
418 425
     void createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer, bool use_zero_con);
426
+
427
+    void setRtpLogger(msg_logger* _logger);
419 428
 };
420 429
 
421 430
 #endif
... ...
@@ -302,7 +302,7 @@ private:
302 302
     AmB2BMedia *media_session;
303 303
 
304 304
   public:
305
-    void setMediaSession(AmB2BMedia *new_session);
305
+    virtual void setMediaSession(AmB2BMedia *new_session);
306 306
     AmB2BMedia *getMediaSession() { return media_session; }
307 307
 };
308 308
 
... ...
@@ -42,6 +42,8 @@
42 42
 #include <sys/socket.h>
43 43
 #include <arpa/inet.h>
44 44
 
45
+#include "sip/msg_logger.h"
46
+
45 47
 AmRtpPacket::AmRtpPacket()
46 48
   : data_offset(0)
47 49
 {
... ...
@@ -272,3 +274,16 @@ int AmRtpPacket::recv(int sd)
272 274
     
273 275
   return ret;
274 276
 }
277
+
278
+void AmRtpPacket::logReceived(msg_logger *logger, struct sockaddr_storage *laddr)
279
+{
280
+  static const cstring empty;
281
+  logger->log((const char *)buffer, b_size, &addr, laddr, empty);
282
+}
283
+
284
+void AmRtpPacket::logSent(msg_logger *logger, struct sockaddr_storage *laddr)
285
+{
286
+  static const cstring empty;
287
+  logger->log((const char *)buffer, b_size, laddr, &addr, empty);
288
+}
289
+
... ...
@@ -33,6 +33,7 @@
33 33
 #include <netinet/in.h>
34 34
 
35 35
 class AmRtpPacketTracer;
36
+class msg_logger;
36 37
 
37 38
 /** \brief RTP packet implementation */
38 39
 class AmRtpPacket {
... ...
@@ -80,6 +81,8 @@ public:
80 81
   unsigned char* getBuffer();
81 82
   void setBufferSize(unsigned int b) { b_size = b; }
82 83
 
84
+  void logReceived(msg_logger *logger, struct sockaddr_storage *laddr);
85
+  void logSent(msg_logger *logger, struct sockaddr_storage *laddr);
83 86
 };
84 87
 
85 88
 #endif
... ...
@@ -41,6 +41,7 @@
41 41
 
42 42
 #include "sip/resolver.h"
43 43
 #include "sip/ip_util.h"
44
+#include "sip/msg_logger.h"
44 45
 
45 46
 #include "log.h"
46 47
 
... ...
@@ -202,6 +203,9 @@ void AmRtpStream::setLocalPort()
202 203
   AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
203 204
   DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this,
204 205
       get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port);
206
+
207
+  memcpy(&l_rtcp_saddr, &l_saddr, sizeof(l_saddr));
208
+  am_set_port(&l_rtcp_saddr, l_rtcp_port);
205 209
 }
206 210
 
207 211
 int AmRtpStream::ping()
... ...
@@ -278,6 +282,8 @@ int AmRtpStream::compile_and_send(const int payload, bool marker, unsigned int t
278 282
     return -1;
279 283
   }
280 284
  
285
+  if (logger) rp.logSent(logger, &l_saddr);
286
+ 
281 287
   return size;
282 288
 }
283 289
 
... ...
@@ -314,7 +320,9 @@ int AmRtpStream::send_raw( char* packet, unsigned int length )
314 320
     ERROR("while sending raw RTP packet.\n");
315 321
     return -1;
316 322
   }
317
- 
323
+
324
+  if (logger) rp.logSent(logger, &l_saddr);
325
+
318 326
   return length;
319 327
 }
320 328
 
... ...
@@ -380,6 +388,7 @@ AmRtpStream::AmRtpStream(AmSession* _s, int _if)
380 388
     l_sd(0), 
381 389
     r_ssrc_i(false),
382 390
     session(_s),
391
+    logger(NULL),
383 392
     passive(false),
384 393
     passive_rtcp(false),
385 394
     offer_answer_used(true),
... ...
@@ -416,6 +425,7 @@ AmRtpStream::~AmRtpStream()
416 425
     close(l_sd);
417 426
     close(l_rtcp_sd);
418 427
   }
428
+  if (logger) dec_ref(logger);
419 429
 }
420 430
 
421 431
 int AmRtpStream::getLocalPort()
... ...
@@ -938,6 +948,8 @@ void AmRtpStream::recvPacket(int fd)
938 948
   if(p->recv(l_sd) > 0){
939 949
     int parse_res = 0;
940 950
 
951
+    if (logger) p->logReceived(logger, &l_saddr);
952
+
941 953
     gettimeofday(&p->recv_time,NULL);
942 954
     
943 955
     if(!relay_raw)
... ...
@@ -970,6 +982,10 @@ void AmRtpStream::recvRtcpPacket()
970 982
     return;
971 983
   }
972 984
 
985
+  static const cstring empty;
986
+  if (logger)
987
+    logger->log((const char *)buffer, recved_bytes, &recv_addr, &l_rtcp_saddr, empty);
988
+
973 989
   // clear RTP timer
974 990
   clearRTPTimeout();
975 991
 
... ...
@@ -996,6 +1012,10 @@ void AmRtpStream::recvRtcpPacket()
996 1012
     ERROR("could not relay RTCP packet: %s\n",strerror(errno));
997 1013
     return;
998 1014
   }
1015
+
1016
+  if (logger)
1017
+    logger->log((const char *)buffer, recved_bytes, &relay_stream->l_rtcp_saddr, &rtcp_raddr, empty);
1018
+
999 1019
 }
1000 1020
 
1001 1021
 void AmRtpStream::relay(AmRtpPacket* p)
... ...
@@ -1020,6 +1040,7 @@ void AmRtpStream::relay(AmRtpPacket* p)
1020 1040
 	  get_addr_str(&r_saddr).c_str(),am_get_port(&r_saddr));
1021 1041
   }
1022 1042
   else {
1043
+    if (logger) p->logSent(logger, &l_saddr);
1023 1044
     if(session) session->onAfterRTPRelay(p,&r_saddr);
1024 1045
   }
1025 1046
 }
... ...
@@ -1166,4 +1187,10 @@ inline void PacketMem::clear()
1166 1187
   memset(used, 0, sizeof(used));
1167 1188
   n_used = cur_idx = 0;
1168 1189
 }
1169
- 
1190
+
1191
+void AmRtpStream::setLogger(msg_logger* _logger)
1192
+{
1193
+  if (logger) dec_ref(logger);
1194
+  logger = _logger;
1195
+  if (logger) inc_ref(logger);
1196
+}
... ...
@@ -63,6 +63,7 @@ class  AmAudio;
63 63
 class  AmSession;
64 64
 struct SdpPayload;
65 65
 struct amci_payload_t;
66
+class msg_logger;
66 67
 
67 68
 /**
68 69
  * This provides the memory for the receive buffer.
... ...
@@ -200,6 +201,7 @@ protected:
200 201
    */
201 202
   struct sockaddr_storage r_saddr;
202 203
   struct sockaddr_storage l_saddr;
204
+  struct sockaddr_storage l_rtcp_saddr;
203 205
 
204 206
   /** Local port */
205 207
   unsigned short     l_port;
... ...
@@ -265,6 +267,8 @@ protected:
265 267
   /** Session owning this stream */
266 268
   AmSession*         session;
267 269
 
270
+  msg_logger *logger;
271
+
268 272
   /** Payload provider */
269 273
   AmPayloadProvider* payload_provider;
270 274
 
... ...
@@ -479,6 +483,9 @@ public:
479 483
    * not be reinitialised implicitly (it might be used for media traffic
480 484
    * already). */
481 485
   void changeSession(AmSession *_s) { session = _s; }
486
+
487
+  /** set destination for logging all received/sent RTP and RTCP packets */
488
+  void setLogger(msg_logger *_logger);
482 489
 };
483 490
 
484 491
 #endif