Browse code

Introduces optional (compile-time) threadpool for signaling support.

to use it, set USE_THREADPOOL in Makefile.defs and configure thread pool
size with session_processor_threads= parameter in sems.conf :

+# compile with session thread pool support?
+# use this for very high concurrent call count
+# applications (e.g. for signaling only)
+# if compiled with thread pool, there will be a
+# thread pool of configurable size processing the
+# signaling and application logic of the calls.
+# if compiled without thread pool support, every
+# session will have its own thread.
+#
+#USE_THREADPOOL = yes



git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1782 8eb893ce-cfd4-0310-b710-fb5ebe64c474

Stefan Sayer authored on 15/04/2010 12:22:30
Showing 11 changed files
... ...
@@ -22,6 +22,18 @@ CPPFLAGS += -D_DEBUG \
22 22
 #	  -DSUPPORT_IPV6 \
23 23
 #	  -DNO_THREADID_LOG \
24 24
 
25
+
26
+# compile with session thread pool support?
27
+#      use this for very high concurrent call count 
28
+#      applications (e.g. for signaling only)
29
+#      if compiled with thread pool, there will be a 
30
+#      thread pool of configurable size processing the
31
+#      signaling and application logic of the calls.
32
+#      if compiled without thread pool support, every
33
+#      session will have its own thread.
34
+#
35
+#USE_THREADPOOL = yes
36
+
25 37
 # compile with spandsp DTMF detection? see soft-switch.org
26 38
 #   this needs a fairly new version of spandsp - tested with 0.0.4pre11
27 39
 #   will not work with spandsp 0.0.2 .
... ...
@@ -62,6 +74,10 @@ GETARCH=$(COREPATH)/compat/getarch
62 74
 OS   := $(shell $(CC) $(EXTRA_CFLAGS) -o $(GETOS) $(GETOS).c && $(GETOS))
63 75
 ARCH := $(shell $(CC) $(EXTRA_CFLAGS) -o $(GETARCH) $(GETARCH).c && $(GETARCH))
64 76
 
77
+ifdef USE_THREADPOOL
78
+CPPFLAGS += -DSESSION_THREADPOOL
79
+endif
80
+
65 81
 ifdef USE_SPANDSP
66 82
 ifneq ($(spandsp_defs), 1)
67 83
 spandsp_defs=1
... ...
@@ -51,6 +51,7 @@ string       AmConfig::PublicIP                = "";
51 51
 string       AmConfig::PrefixSep               = PREFIX_SEPARATOR;
52 52
 int          AmConfig::RtpLowPort              = RTP_LOWPORT;
53 53
 int          AmConfig::RtpHighPort             = RTP_HIGHPORT;
54
+int          AmConfig::SessionProcessorThreads = NUM_SESSION_PROCESSORS;
54 55
 int          AmConfig::MediaProcessorThreads   = NUM_MEDIA_PROCESSORS;
55 56
 int          AmConfig::LocalSIPPort            = 5060;
56 57
 string       AmConfig::LocalSIPIP              = "";
... ...
@@ -135,6 +136,13 @@ int AmConfig::setStderr(const string& s) {
135 136
   return 1;
136 137
 }		
137 138
 
139
+int AmConfig::setSessionProcessorThreads(const string& th) {
140
+  if(sscanf(th.c_str(),"%u",&SessionProcessorThreads) != 1) {
141
+    return 0;
142
+  }
143
+  return 1;
144
+}
145
+
138 146
 int AmConfig::setMediaProcessorThreads(const string& th) {
139 147
   if(sscanf(th.c_str(),"%u",&MediaProcessorThreads) != 1) {
140 148
     return 0;
... ...
@@ -323,6 +331,25 @@ int AmConfig::readConfiguration()
323 331
     }
324 332
   }
325 333
 
334
+  if(cfg.hasParameter("session_processor_threads")){
335
+#ifdef SESSION_THREADPOOL
336
+    if(!setSessionProcessorThreads(cfg.getParameter("session_processor_threads"))){
337
+      ERROR("invalid session_processor_threads value specified\n");
338
+      return -1;
339
+    }
340
+    if (SessionProcessorThreads<1) {
341
+      ERROR("invalid session_processor_threads value specified."
342
+	    " need at least one thread\n");
343
+      return -1;
344
+    }
345
+#else
346
+    WARN("session_processor_threads specified in sems.conf,\n");
347
+    WARN("but SEMS is compiled without SESSION_THREADPOOL support.\n");
348
+    WARN("set USE_THREADPOOL in Makefile.defs to enable session thread pool.\n");
349
+    WARN("SEMS will start now, but every call will have its own thread.\n");    
350
+#endif
351
+  }
352
+
326 353
   if(cfg.hasParameter("media_processor_threads")){
327 354
     if(!setMediaProcessorThreads(cfg.getParameter("media_processor_threads"))){
328 355
       ERROR("invalid media_processor_threads value specified");
... ...
@@ -330,6 +357,7 @@ int AmConfig::readConfiguration()
330 357
     }
331 358
   }
332 359
 
360
+
333 361
   // single codec in 200 OK
334 362
   if(cfg.hasParameter("single_codec_in_ok")){
335 363
     SingleCodecInOK = (cfg.getParameter("single_codec_in_ok") == "yes");
... ...
@@ -75,7 +75,9 @@ struct AmConfig
75 75
   static int RtpLowPort;
76 76
   /** Highest local RTP port */
77 77
   static int RtpHighPort;
78
-  /** number of session scheduler threads */
78
+  /** number of session (signaling/application) processor threads */
79
+  static int SessionProcessorThreads;
80
+  /** number of media processor threads */
79 81
   static int MediaProcessorThreads;
80 82
   /** the interface SIP requests are sent from - needed for registrar_client */
81 83
   static string LocalSIPIP;
... ...
@@ -153,6 +155,8 @@ struct AmConfig
153 155
   static int setFork(const string& fork);
154 156
   /** Setter for parameter stderr, returns 0 on invalid value */
155 157
   static int setStderr(const string& s);
158
+  /** Setter for parameter SessionProcessorThreads, returns 0 on invalid value */
159
+  static int setSessionProcessorThreads(const string& th);
156 160
   /** Setter for parameter MediaProcessorThreads, returns 0 on invalid value */
157 161
   static int setMediaProcessorThreads(const string& th);
158 162
   /** Setter for parameter DeadRtpTime, returns 0 on invalid value */
... ...
@@ -29,8 +29,11 @@
29 29
 #include "log.h"
30 30
 #include "AmConfig.h"
31 31
 
32
+#include <typeinfo>
32 33
 AmEventQueue::AmEventQueue(AmEventHandler* handler)
33
-  : handler(handler),ev_pending(false)
34
+  : handler(handler),
35
+    wakeup_handler(NULL),
36
+    ev_pending(false)
34 37
 {
35 38
 }
36 39
 
... ...
@@ -53,6 +56,8 @@ void AmEventQueue::postEvent(AmEvent* event)
53 56
   if(event)
54 57
     ev_queue.push(event);
55 58
   ev_pending.set(true);
59
+  if (NULL != wakeup_handler)
60
+    wakeup_handler->notify(this);
56 61
   m_queue.unlock();
57 62
 
58 63
   if (AmConfig::LogEvents) 
... ...
@@ -70,10 +75,12 @@ void AmEventQueue::processEvents()
70 75
     m_queue.unlock();
71 76
 
72 77
     if (AmConfig::LogEvents) 
73
-      DBG("before processing event\n");
78
+      DBG("before processing event (%s)\n",
79
+	  typeid(*event).name());
74 80
     handler->process(event);
75 81
     if (AmConfig::LogEvents) 
76
-      DBG("event processed\n");
82
+      DBG("event processed (%s)\n",
83
+	  typeid(*event).name());
77 84
     delete event;
78 85
     m_queue.lock();
79 86
   }
... ...
@@ -117,3 +124,10 @@ void AmEventQueue::processSingleEvent()
117 124
   m_queue.unlock();
118 125
 }
119 126
 
127
+void AmEventQueue::setEventNotificationSink(AmEventNotificationSink* 
128
+					    _wakeup_handler) {
129
+  // locking actually not necessary - if replacing pointer is atomic 
130
+  m_queue.lock(); 
131
+  wakeup_handler = _wakeup_handler;
132
+  m_queue.unlock();
133
+}
... ...
@@ -40,6 +40,16 @@ class AmEventQueueInterface
40 40
   virtual void postEvent(AmEvent*)=0;
41 41
 };
42 42
 
43
+class AmEventQueue;
44
+/** a receiver for notifications about 
45
+    the fact that events are pending */
46
+class AmEventNotificationSink
47
+{
48
+ public:
49
+  virtual ~AmEventNotificationSink() { }
50
+  virtual void notify(AmEventQueue* sender) = 0;
51
+};
52
+
43 53
 /** 
44 54
  * \brief Asynchronous event queue implementation 
45 55
  * 
... ...
@@ -52,12 +62,13 @@ class AmEventQueue: public AmEventQueueInterface
52 62
 {
53 63
 protected:
54 64
   AmEventHandler*   handler;
65
+  AmEventNotificationSink* wakeup_handler;
55 66
   std::queue<AmEvent*>   ev_queue;
56 67
   AmMutex           m_queue;
57 68
   AmCondition<bool> ev_pending;
58 69
 
59 70
 public:
60
-  AmEventQueue(AmEventHandler*);
71
+  AmEventQueue(AmEventHandler* handler);
61 72
   virtual ~AmEventQueue();
62 73
 
63 74
   void postEvent(AmEvent*);
... ...
@@ -65,6 +76,8 @@ public:
65 76
   void waitForEvent();
66 77
   void wakeup();
67 78
   void processSingleEvent();
79
+
80
+  void setEventNotificationSink(AmEventNotificationSink* _wakeup_handler);
68 81
 };
69 82
 
70 83
 #endif
... ...
@@ -33,6 +33,7 @@
33 33
 #include "AmPlugIn.h"
34 34
 #include "AmApi.h"
35 35
 #include "AmSessionContainer.h"
36
+#include "AmSessionProcessor.h"
36 37
 #include "AmMediaProcessor.h"
37 38
 #include "AmDtmfDetector.h"
38 39
 #include "AmPlayoutBuffer.h"
... ...
@@ -57,17 +58,22 @@ AmMutex AmSession::session_num_mut;
57 58
 
58 59
 
59 60
 AmSession::AmSession()
60
-  : AmEventQueue(this), // AmDialogState(),
61
+  : AmEventQueue(this),
61 62
     dlg(this),
62 63
     detached(true),
63 64
     sess_stopped(false),rtp_str(this),negotiate_onreply(false),
64 65
     input(0), output(0), local_input(0), local_output(0),
65 66
     m_dtmfDetector(this), m_dtmfEventQueue(&m_dtmfDetector),
66 67
     m_dtmfDetectionEnabled(true),
67
-    accept_early_session(false)
68
+    accept_early_session(false),
69
+    processing_status(SESSION_PROCESSING_EVENTS)
68 70
 #ifdef WITH_ZRTP
69 71
   ,  zrtp_session(NULL), zrtp_audio(NULL), enable_zrtp(true)
70 72
 #endif
73
+
74
+#ifdef SESSION_THREADPOOL
75
+  , _pid(this)
76
+#endif
71 77
 {
72 78
   use_local_audio[AM_AUDIO_IN] = false;
73 79
   use_local_audio[AM_AUDIO_OUT] = false;
... ...
@@ -83,6 +89,8 @@ AmSession::~AmSession()
83 89
 #ifdef WITH_ZRTP
84 90
   AmZRTP::freeSession(zrtp_session);
85 91
 #endif
92
+
93
+  DBG("AmSession destructor finished\n");
86 94
 }
87 95
 
88 96
 void AmSession::setCallgroup(const string& cg) {
... ...
@@ -296,8 +304,41 @@ void AmSession::negotiate(const string& sdp_body,
296 304
     sdp.genResponse(advertisedIP(), rtp_str.getLocalPort(), *sdp_reply, AmConfig::SingleCodecInOK);
297 305
 }
298 306
 
299
-void AmSession::run()
300
-{
307
+#ifdef SESSION_THREADPOOL
308
+void AmSession::start() {
309
+  AmSessionProcessorThread* processor_thread = 
310
+    AmSessionProcessor::getProcessorThread();
311
+  if (NULL == processor_thread) 
312
+    throw string("no processing thread available");
313
+
314
+  // have the thread register and start us
315
+  processor_thread->startSession(this);
316
+}
317
+
318
+bool AmSession::is_stopped() {
319
+  return processing_status == SESSION_ENDED_DISCONNECTED;
320
+}
321
+#else
322
+// in this case every session has its own thread 
323
+// - this is the main processing loop
324
+void AmSession::run() {
325
+  DBG("startup session\n");
326
+  if (!startup())
327
+    return;
328
+
329
+  DBG("running session event loop\n");
330
+  while (true) {
331
+    waitForEvent();
332
+    if (!processingCycle())
333
+      break;
334
+  }
335
+
336
+  DBG("session event loop ended, finalizing session\n");
337
+  finalize();
338
+}
339
+#endif
340
+
341
+bool AmSession::startup() {
301 342
 #ifdef WITH_ZRTP
302 343
   if (enable_zrtp) {
303 344
     zrtp_session = (zrtp_conn_ctx_t*)malloc(sizeof(zrtp_conn_ctx_t));
... ...
@@ -315,7 +356,7 @@ void AmSession::run()
315 356
 						   &profile, 
316 357
 						   AmZRTP::zrtp_instance_zid) ) {
317 358
 	ERROR("initializing ZRTP session context\n");
318
-	return;
359
+	return false;
319 360
       }
320 361
       
321 362
       zrtp_audio = zrtp_attach_stream(zrtp_session, rtp_str.get_ssrc());
... ...
@@ -323,7 +364,7 @@ void AmSession::run()
323 364
       
324 365
       if (NULL == zrtp_audio) {
325 366
 	ERROR("attaching zrtp stream.\n");
326
-	return;
367
+	return false;
327 368
       }
328 369
       
329 370
       DBG("initialized ZRTP session context OK\n");
... ...
@@ -340,32 +381,7 @@ void AmSession::run()
340 381
 
341 382
       onStart();
342 383
 
343
-      while (!sess_stopped.get() || 
344
-	     (dlg.getStatus() == AmSipDialog::Disconnecting)//  ||
345
-	     // (dlg.getUACTransPending())
346
-	     ){
347
-
348
-	waitForEvent();
349
-	processEvents();
350
-
351
-	DBG("%s dlg.getUACTransPending() = %i\n",
352
-	    dlg.callid.c_str(),dlg.getUACTransPending());
353
-      }
354
-	    
355
-      if ( dlg.getStatus() != AmSipDialog::Disconnected ) {
356
-		
357
-	DBG("dlg '%s' not terminated: sending bye\n",dlg.callid.c_str());
358
-	if(dlg.bye() == 0){
359
-	  while ( dlg.getStatus() != AmSipDialog::Disconnected ){
360
-	    waitForEvent();
361
-	    processEvents();
362
-	  }
363
-	}
364
-	else {
365
-	  WARN("failed to terminate call properly\n");
366
-	}
367
-      }
368
-    }
384
+    } 
369 385
     catch(const AmSession::Exception& e){ throw e; }
370 386
     catch(const string& str){
371 387
       ERROR("%s\n",str.c_str());
... ...
@@ -374,28 +390,128 @@ void AmSession::run()
374 390
     catch(...){
375 391
       throw AmSession::Exception(500,"unexpected exception.");
376 392
     }
393
+    
394
+  } catch(const AmSession::Exception& e){
395
+    ERROR("%i %s\n",e.code,e.reason.c_str());
396
+    onBeforeDestroy();
397
+    destroy();
398
+    
399
+    session_num_mut.lock();
400
+    session_num--;
401
+    session_num_mut.unlock();
402
+
403
+    return false;
377 404
   }
378
-  catch(const AmSession::Exception& e){
405
+
406
+  return true;
407
+}
408
+
409
+bool AmSession::processEventsCatchExceptions() {
410
+  try {
411
+    try {	
412
+      processEvents();
413
+    } 
414
+    catch(const AmSession::Exception& e){ throw e; }
415
+    catch(const string& str){
416
+      ERROR("%s\n",str.c_str());
417
+      throw AmSession::Exception(500,"unexpected exception.");
418
+    } 
419
+    catch(...){
420
+      throw AmSession::Exception(500,"unexpected exception.");
421
+    }    
422
+  } catch(const AmSession::Exception& e){
379 423
     ERROR("%i %s\n",e.code,e.reason.c_str());
424
+    return false;
380 425
   }
426
+  return true;
427
+}
428
+
429
+/** one cycle of the event processing loop. 
430
+    this should be called until it returns false. */
431
+bool AmSession::processingCycle() {
381 432
 
433
+  switch (processing_status) {
434
+  case SESSION_PROCESSING_EVENTS: 
435
+    {
436
+      if (!processEventsCatchExceptions())
437
+	return false; // exception occured, stop processing
438
+      
439
+      int dlg_status = dlg.getStatus();
440
+      bool s_stopped = sess_stopped.get();
441
+      
442
+      DBG("%s/%s: %s, %s, %i UACTransPending\n",
443
+	  dlg.callid.c_str(),getLocalTag().c_str(), 
444
+	  AmSipDialog::status2str[dlg_status],
445
+	  s_stopped?"stopped":"running",
446
+	  dlg.getUACTransPending());
447
+      
448
+      // session running?
449
+      if (!s_stopped || (dlg_status == AmSipDialog::Disconnecting))
450
+	return true;
451
+      
452
+      // session stopped?
453
+      if (s_stopped &&
454
+	  (dlg_status == AmSipDialog::Disconnected)) {
455
+	processing_status = SESSION_ENDED_DISCONNECTED;
456
+	return false;
457
+      }
458
+      
459
+      // wait for session's status to be disconnected
460
+      // todo: set some timer to tear down the session anyway,
461
+      //       or react properly on negative reply to BYE (e.g. timeout)
462
+      processing_status = SESSION_WAITING_DISCONNECTED;
463
+      
464
+      if (dlg_status != AmSipDialog::Disconnected) {
465
+	// app did not send BYE - do that for the app
466
+	if (dlg.bye() != 0) {
467
+	  processing_status = SESSION_ENDED_DISCONNECTED;
468
+	  // BYE sending failed - don't wait for dlg status to go disconnected
469
+	  return false;
470
+	}
471
+      }
472
+      
473
+      return true;
474
+      
475
+    } break;
476
+    
477
+  case SESSION_WAITING_DISCONNECTED: {
478
+    // processing events until dialog status is Disconnected 
479
+    
480
+    if (!processEventsCatchExceptions()) {
481
+      processing_status = SESSION_ENDED_DISCONNECTED;
482
+      return false; // exception occured, stop processing
483
+    }
484
+    bool res = dlg.getStatus() != AmSipDialog::Disconnected;
485
+    if (!res)
486
+      processing_status = SESSION_ENDED_DISCONNECTED;
487
+    return res;
488
+  }; break;
489
+
490
+  default: {
491
+    ERROR("unknown session processing state\n");
492
+    return false; // stop processing      
493
+  }
494
+  }
495
+}
496
+
497
+void AmSession::finalize() {
498
+  DBG("running finalize sequence...\n");
382 499
   onBeforeDestroy();
383 500
   destroy();
384
-
501
+  
385 502
   session_num_mut.lock();
386 503
   session_num--;
387 504
   session_num_mut.unlock();
388
-    
389
-  // wait at least until session is out of RtpScheduler
390
-  //detached.wait_for();
391 505
 
392 506
   DBG("session is stopped.\n");
393 507
 }
394
-
395
-void AmSession::on_stop()
508
+#ifndef SESSION_THREADPOOL
509
+void AmSession::on_stop() 
510
+#else
511
+void AmSession::stop()
512
+#endif  
396 513
 {
397
-  //sess_stopped.set(true);
398
-  DBG("AmSession::on_stop()\n");
514
+  DBG("AmSession::stop()\n");
399 515
 
400 516
   if (!getDetached())
401 517
     AmMediaProcessor::instance()->clearSession(this);
... ...
@@ -66,10 +66,13 @@ class AmDtmfEvent;
66 66
  * 
67 67
  * The session is identified by Call-ID, From-Tag and To-Tag.
68 68
  */
69
-class AmSession : public AmThread,
70
-		  public AmEventQueue, 
71
-		  public AmEventHandler,
72
-		  public AmSipDialogEventHandler
69
+class AmSession : 
70
+#ifndef SESSION_THREADPOOL
71
+  public AmThread,
72
+#endif
73
+  public AmEventQueue, 
74
+  public AmEventHandler,
75
+  public AmSipDialogEventHandler
73 76
 {
74 77
   AmMutex      audio_mut;
75 78
   // remote (to/from RTP) audio inout
... ...
@@ -95,11 +98,40 @@ private:
95 98
   AmDtmfEventQueue m_dtmfEventQueue;
96 99
   bool m_dtmfDetectionEnabled;
97 100
 
101
+  enum ProcessingStatus { 
102
+    SESSION_PROCESSING_EVENTS,
103
+    SESSION_WAITING_DISCONNECTED,
104
+    SESSION_ENDED_DISCONNECTED
105
+  };
106
+  ProcessingStatus processing_status;
107
+
108
+#ifndef SESSION_THREADPOOL
98 109
   /** @see AmThread::run() */
99 110
   void run();
100
-
101
-  /** @see AmThread::on_stop() */
102 111
   void on_stop();
112
+#else
113
+public:
114
+  void start();
115
+  bool is_stopped();
116
+
117
+private:
118
+  void stop();
119
+  void* _pid;
120
+#endif
121
+
122
+  /** @return whether startup was successful */
123
+  bool startup();
124
+
125
+  /** @return whether session continues running */
126
+  bool processingCycle();
127
+
128
+  /** clean up session */
129
+  void finalize();
130
+
131
+  /** process pending events,  
132
+      @return whether everything went smoothly */
133
+  bool processEventsCatchExceptions();
134
+
103 135
 
104 136
   AmCondition<bool> sess_stopped;
105 137
   AmCondition<bool> detached;
... ...
@@ -111,6 +143,7 @@ private:
111 143
   friend class AmMediaProcessorThread;
112 144
   friend class AmSessionContainer;
113 145
   friend class AmSessionFactory;
146
+  friend class AmSessionProcessorThread;
114 147
 	
115 148
 protected:
116 149
   AmSdp               sdp;
... ...
@@ -90,11 +90,11 @@ bool AmSessionContainer::clean_sessions() {
90 90
 	
91 91
 	MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str());
92 92
 
93
-	DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
93
+	DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid);
94 94
 	delete cur_session;
95 95
       }
96 96
       else {
97
-	DBG("session %p still running\n",(void*)cur_session->_pid);
97
+	DBG("session [%p] still running\n",(void*)cur_session->_pid);
98 98
 	n_sessions.push(cur_session);
99 99
       }
100 100
       
... ...
@@ -191,9 +191,10 @@ void AmSessionContainer::destroySession(AmSession* s)
191 191
 }
192 192
 
193 193
 AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session_params) {
194
+
194 195
   AmSession* session = NULL;
195 196
   try {
196
-    if((session = createSession(req, session_params)) != 0){
197
+    if((session = createSession(req, session_params)) != 0) {
197 198
       session->dlg.updateStatusFromLocalRequest(req); // sets local tag as well
198 199
       session->setCallgroup(req.from_tag);
199 200
 
... ...
@@ -228,8 +229,18 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
228 229
 	INFO("Starting UAC session %s app %s\n",
229 230
 	     session->getLocalTag().c_str(), req.cmd.c_str());
230 231
       }
231
-
232
-      session->start();
232
+      
233
+      try {
234
+	session->start();
235
+      } catch (const string& err) {
236
+	AmEventDispatcher::instance()->
237
+	  delEventQueue(session->getLocalTag(),
238
+			session->getCallID(),
239
+			session->getRemoteTag());
240
+	
241
+	delete session;
242
+	throw;
243
+      }
233 244
 
234 245
     }
235 246
   } 
... ...
@@ -280,7 +291,17 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req)
280 291
 			"to", req.to.c_str(),
281 292
 			"ruri", req.r_uri.c_str());
282 293
 
283
-	session->start();
294
+	try {
295
+	  session->start();
296
+	} catch (const string& err) {
297
+	  AmEventDispatcher::instance()->
298
+	    delEventQueue(session->getLocalTag(),
299
+			  session->getCallID(),
300
+			  session->getRemoteTag());
301
+	  
302
+	  delete session;
303
+	  throw;
304
+	}
284 305
 
285 306
 	session->postEvent(new AmSipRequestEvent(req));
286 307
       }
... ...
@@ -230,6 +230,16 @@ loglevel=2
230 230
 ############################################################
231 231
 # tuning
232 232
 
233
+# optional parameter: session_processor_threads=<num_value>
234
+# 
235
+# - controls how many threads should be created that
236
+#   process the application logic and in-dialog signaling. 
237
+#   This is only available if compiled with threadpool support!
238
+#   (set USE_THREADPOOL in Makefile.defs)
239
+#   Defaults to 10
240
+#
241
+# session_processor_threads=50
242
+
233 243
 # optional parameter: media_processor_threads=<num_value>
234 244
 # 
235 245
 # - controls how many threads should be created that
... ...
@@ -30,7 +30,7 @@
30 30
 #include "AmConfig.h"
31 31
 #include "AmPlugIn.h"
32 32
 #include "AmSessionContainer.h"
33
-//#include "AmServer.h"
33
+#include "AmSessionProcessor.h"
34 34
 #include "AmMediaProcessor.h"
35 35
 #include "AmRtpReceiver.h"
36 36
 #include "AmEventDispatcher.h"
... ...
@@ -99,7 +99,6 @@ static void sig_usr_un(int signo)
99 99
 
100 100
     clean_up_mut.lock();
101 101
     if(need_clean.get()) {
102
-
103 102
       need_clean.set(false);
104 103
 
105 104
       AmSessionContainer::dispose();
... ...
@@ -111,7 +110,6 @@ static void sig_usr_un(int signo)
111 110
       AmMediaProcessor::dispose();
112 111
 
113 112
       AmEventDispatcher::dispose();
114
-
115 113
     } 
116 114
 
117 115
     clean_up_mut.unlock();
... ...
@@ -413,6 +411,11 @@ int main(int argc, char* argv[])
413 411
 
414 412
   DBG("Starting session container\n");
415 413
   AmSessionContainer::instance()->start();
414
+  
415
+#ifdef SESSION_THREADPOOL
416
+  DBG("starting session processor threads\n");
417
+  AmSessionProcessor::addThreads(AmConfig::SessionProcessorThreads);
418
+#endif 
416 419
 
417 420
   DBG("Starting media processor\n");
418 421
   AmMediaProcessor::instance()->init();
... ...
@@ -52,6 +52,9 @@
52 52
 #define SESSION_EXPIRES              60 // seconds
53 53
 #define MINIMUM_TIMER                5   //seconds
54 54
 
55
+// threads to start for signaling/application
56
+#define NUM_SESSION_PROCESSORS 10    
57
+// threads to start for RTP processing
55 58
 #define NUM_MEDIA_PROCESSORS 1
56 59
 
57 60
 #define MAX_NET_DEVICES     32