Browse code

modules_k/*: moved k modules in directory modules/

Daniel-Constantin Mierla authored on 20/01/2013 11:57:52
Showing 1 changed files
1 1
deleted file mode 100644
... ...
@@ -1,1129 +0,0 @@
1
-/* $Id$
2
- *
3
- * Copyright (C) 2006-2007 VozTelecom Sistemas S.L
4
- *
5
- * This file is part of Kamailio, a free SIP server.
6
- *
7
- * Kamailio is free software; you can redistribute it and/or modify
8
- * it under the terms of the GNU General Public License as published by
9
- * the Free Software Foundation; either version 2 of the License, or
10
- * (at your option) any later version
11
- *
12
- * Kamailio is distributed in the hope that it will be useful,
13
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
- * GNU General Public License for more details.
16
- *
17
- * You should have received a copy of the GNU General Public License 
18
- * along with this program; if not, write to the Free Software 
19
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20
- */
21
-
22
-#include <sys/types.h>/*setsockopt,bind,accept,fork,pid_t*/
23
-#include <sys/socket.h>/*setsockopt,bind,accept,listen*/
24
-#include <netinet/tcp.h>/*TCP_NODELAY*/
25
-#include <string.h>/*strcmp,memset*/
26
-#include <errno.h>/*errno*/
27
-#include <unistd.h>/*close(),read(),pipe,fork,pid_t*/
28
-#include <sys/poll.h>/*poll*/
29
-#include <signal.h>/*signal*/
30
-#include <time.h>/*time*/
31
-#include <string.h>/*memcmp*/
32
-#include <sys/types.h>/*waitpid*/
33
-#include <sys/wait.h>/*waitpid*/
34
-
35
-#include "../../ip_addr.h" /*sockaddr_union, ip_addr*/
36
-#include "../../hashes.h" /*T_TABLE_POWER*/
37
-#include "../../mem/mem.h" /*pkg_malloc*/
38
-#include "../../mem/shm_mem.h" /*shm_malloc*/
39
-#include "../../dprint.h" /*LM_**/
40
-#include "../../locking.h"
41
-#include "../../cfg/cfg_struct.h"
42
-
43
-#include "seas.h"
44
-#include "ha.h"
45
-#include "cluster.h"
46
-#include "seas_action.h"
47
-#include "statistics.h"
48
-#include "event_dispatcher.h"
49
-
50
-#define PING_OVER_FACTOR 2
51
-#define MAX_WRITE_TRIES 10
52
-
53
-char *action_names[]={"NONE",
54
-     "PROVISIONAL_REPLY",
55
-     "FINAL_REPLY",
56
-     "REPLY_FIN_DLG",
57
-     "UAC_REQ",
58
-     "AC_RES_FAIL",
59
-     "STATELESS_MSG",
60
-     "AC_CANCEL",
61
-     "JAIN_PONG"};
62
-
63
-
64
-struct unc_as unc_as_t[2*MAX_UNC_AS_NR];
65
-
66
-/*this is for the Action Dispatcher Process */
67
-struct as_entry *my_as;
68
-extern int process_no;
69
-extern int sig_flag;
70
-
71
-static int process_event_reply(as_p as);
72
-static int handle_as_data(int fd);
73
-static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type);
74
-static inline int send_sockinfo(int fd);
75
-static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as);
76
-static int dispatch_relay();
77
-static int new_as_connect(int fd,char which);
78
-static inline int read_name(int sock,char *dst,int dstlen);
79
-static int handle_unc_as_data(int fd);
80
-static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd);
81
-
82
-
83
-
84
-/** Main loop for the Event Dispatcher process.
85
- * 
86
- */
87
-int dispatcher_main_loop(void)
88
-{
89
-   struct pollfd poll_fds[3+MAX_AS_NR],*poll_tmp;
90
-   int clean_index,i,j,k,fd,poll_events=0,socks[2],chld_status;
91
-   int as_nr,unc_as_nr;
92
-   pid_t chld;
93
-   struct timeval last_ping,now;
94
-   struct as_entry *as;
95
-
96
-   sig_flag=0;
97
-   is_dispatcher=1;
98
-   as_nr=0;
99
-
100
-   timerclear(&last_ping);
101
-   timerclear(&now);
102
-   signal(SIGCHLD,seas_sighandler);
103
-   signal(SIGTERM,seas_sighandler);
104
-   signal(SIGUSR1,seas_sighandler);
105
-   signal(SIGINT, seas_sighandler);
106
-   signal(SIGKILL,seas_sighandler);
107
-
108
-   strcpy(whoami,"Seas Event Dispatcher process");
109
-   /*I set process_no to -1 because otherwise, the logging process confuses this process with another from SER
110
-    * (see LM_*() and dprint() and my_pid())*/
111
-   process_no = -1;
112
-
113
-   if(open_server_sockets(seas_listen_ip,seas_listen_port,socks)==-1){
114
-      LM_ERR("unable to open server sockets on dispatcher\n");
115
-      return -1;
116
-   }
117
-   for(i=0;i<2;i++){
118
-      poll_fds[i].fd=socks[i];
119
-      poll_fds[i].revents=0;
120
-      poll_fds[i].events=POLLIN;
121
-   }
122
-   poll_fds[2].fd=read_pipe;
123
-   poll_fds[2].revents=0;
124
-   poll_fds[2].events=POLLIN;/*pollhup ?*/
125
-
126
-   poll_events=0;
127
-   unc_as_nr=0;
128
-
129
-   if(use_ha)
130
-      spawn_pinger();
131
-
132
-   while(1){
133
-		/* update the local config framework structures */
134
-		cfg_update();
135
-
136
-      if(sig_flag==SIGCHLD){
137
-	 while ((chld=waitpid( -1, &chld_status, WNOHANG ))>0) {
138
-	    if (WIFEXITED(chld_status)){
139
-	       LM_INFO("child process %d exited normally, status=%d\n",
140
-				   chld,WEXITSTATUS(chld_status));
141
-	    }else if (WIFSIGNALED(chld_status)) {
142
-	       LM_INFO("child process %d exited by a signal %d\n",
143
-				   chld,WTERMSIG(chld_status));
144
-	    }else if (WIFSTOPPED(chld_status)) 
145
-	       LM_INFO("child process %d stopped by a signal %d\n",
146
-				   chld,WSTOPSIG(chld_status));
147
-	    for (as=as_list;as;as=as->next) {
148
-	       if(as->type!=AS_TYPE)
149
-		  continue;
150
-	       if(as->u.as.action_pid==chld){
151
-		  for(i=0;i<as_nr && ((poll_fds[3+i].fd)!=(as->u.as.event_fd));i++)
152
-		     ;
153
-		  if(i==as_nr){
154
-		     LM_ERR("Either the pinger has died or BUG found..\n");
155
-		     continue;
156
-		  }
157
-		  /*overwrite the obsolete 'i' position with the next position*/
158
-		  for(j=3+i;j<(as_nr+unc_as_nr+3-1);i++){
159
-		     poll_fds[j].fd=poll_fds[j+1].fd;
160
-		     poll_fds[j].events=poll_fds[j+1].events;
161
-		     poll_fds[j].revents=poll_fds[j+1].revents;
162
-		  }
163
-		  close(as->u.as.event_fd);/*close the socket fd*/
164
-		  if (as->u.as.ev_buffer.s) {
165
-		     pkg_free(as->u.as.ev_buffer.s);
166
-		     as->u.as.ev_buffer.s=(char *)0;
167
-		     as->u.as.ev_buffer.len=0;
168
-		  }
169
-		  as->u.as.event_fd=as->u.as.action_fd=-1;
170
-		  as->connected=0;
171
-		  destroy_pingtable(&as->u.as.jain_pings);
172
-		  destroy_pingtable(&as->u.as.servlet_pings);
173
-		  as_nr--;
174
-		  LM_WARN("client [%.*s] leaving (Action Dispatcher Process died !)\n",
175
-				  as->name.len,as->name.s);
176
-		  break;
177
-	       }/*if(action_pid==chld)*/
178
-	    }/*for(as=as_list;as;as=as->next)*/
179
-	 }/*while(waitpid(-1)>0)*/
180
-      }else if (sig_flag) {
181
-	 LM_WARN("received signal != sigchld(%d)\n",sig_flag);
182
-	  }
183
-      sig_flag=0;
184
-      clean_index=0;
185
-      LM_INFO("polling [2 ServSock] [1 pipe] [%d App Servers]"
186
-			  " [%d Uncomplete AS]\n",as_nr,unc_as_nr);
187
-      poll_events = poll(poll_fds,3+unc_as_nr+as_nr,-1);
188
-      if (poll_events == -1) {
189
-	 if(errno==EINTR){
190
-	    /*handle the case a child has died.
191
-	     * It will be done in the next iteration in if(seas_sigchld_received)*/
192
-	    continue;
193
-	 }
194
-	 if(errno==EBADF){
195
-	    LM_ERR("invalid file descriptor passed to poll (%s)\n",
196
-				strerror(errno));
197
-	    return -1;/*??*/
198
-	 }
199
-	 /* errors */
200
-	 LM_ERR("poll'ing:%s\n",strerror(errno));
201
-	 poll_events=0;
202
-	 continue;
203
-      } else if (poll_events == 0) {/*timeout*/
204
-	 continue;
205
-      } else {/*there are events !*/
206
-	 /*handle connections from server sockets*/
207
-	 for(i=0;i<2;i++){
208
-	    if(poll_fds[i].revents)
209
-	       poll_events--;
210
-	    if(poll_fds[i].revents & POLLIN){
211
-	       poll_fds[i].revents &= (~POLLIN);
212
-	       if((fd=new_as_connect(socks[i],i==0?'e':'a'))>=0){
213
-		  poll_tmp=&poll_fds[3+as_nr+unc_as_nr];
214
-		  poll_tmp->fd=fd;
215
-		  poll_tmp->events=POLLIN|POLLHUP;
216
-		  unc_as_nr++;
217
-		  LM_DBG("Have new %s client\n",i==0?"event":"action");
218
-	       }else{
219
-		  LM_ERR("accepting connection from AS\n");
220
-	       }
221
-	    }
222
-	 }
223
-	 /*handle data from pipe*/
224
-	 if(poll_fds[2].revents & POLLIN){
225
-	    poll_fds[2].revents &= (~POLLIN);
226
-	    poll_events--;
227
-	    if(dispatch_relay()<0){
228
-	       LM_ERR("dispatch_relay returned -1"
229
-		     "should clean-up table\n");
230
-	    }
231
-	 }
232
-	 /*now handle receive data from completed AS*/
233
-	 clean_index=0;
234
-	 LM_DBG("Scanning data from %d AS\n",as_nr);
235
-	 for(i=0;(i<as_nr) && poll_events;i++){
236
-	    clean_index=0;
237
-	    poll_tmp=&poll_fds[3+i];
238
-	    if(poll_tmp->revents)
239
-	       poll_events--;
240
-	    if(poll_tmp->revents & POLLIN){
241
-	       LM_DBG("POLLIN found in AS #%i\n",i);
242
-	       poll_tmp->revents &= (~POLLIN);
243
-	       switch(handle_as_data(poll_tmp->fd)){
244
-		  case -2:/*read returned 0 bytes, an AS client is leaving*/
245
-		     clean_index=1;
246
-		     break;
247
-		  case -1:/*shouldnt happen*/
248
-		     LM_ERR("reading from AS socket\n");
249
-		     break;
250
-		  case 0:/* event_response received and processed*/
251
-		     break;
252
-		  default:
253
-		     LM_WARN("unknown return type from handle_as_data\n");
254
-	       }
255
-	    }
256
-	    if(clean_index || (poll_tmp->revents & POLLHUP)){
257
-	       LM_DBG("POLHUP or read==0 found in %i AS \n",i);
258
-	       clean_index=0;
259
-	       poll_tmp->revents = 0;
260
-	       for(as=as_list;as;as=as->next){
261
-		  if(as->type==CLUSTER_TYPE)
262
-		     continue;
263
-		  if(as->connected && (as->u.as.event_fd == poll_tmp->fd)){
264
-		     close(poll_tmp->fd);/*close the socket fd*/
265
-		     /*TODO  we should send a signal to the Action Dispatcher !!!*/
266
-		     as->connected=0;
267
-		     as_nr--;
268
-		     /*overwrite the obsolete 'i' position with the next position*/
269
-		     for(k=i;k<(as_nr+unc_as_nr);k++){
270
-			j=3+k;
271
-			poll_fds[j].fd=poll_fds[j+1].fd;
272
-			poll_fds[j].events=poll_fds[j+1].events;
273
-			poll_fds[j].revents=poll_fds[j+1].revents;
274
-		     }
275
-		     --i;
276
-		     LM_WARN("client %.*s leaving !!!\n",as->name.len,as->name.s);
277
-		     break;
278
-		  }
279
-	       }
280
-	       if (!as) {
281
-		  LM_ERR("the leaving client was not found in the as_list\n");
282
-		   }
283
-	    }
284
-	 }
285
-	 /*now handle data sent from uncompleted AS*/
286
-	 LM_DBG("Scanning data from %d uncomplete AS \n",unc_as_nr);
287
-	 clean_index=0;
288
-	 for(i=0;i<unc_as_nr && poll_events;i++){
289
-	    poll_tmp=&poll_fds[3+as_nr+i];
290
-	    if(poll_tmp->revents)
291
-	       poll_events--;
292
-	    if(poll_tmp->revents & POLLIN){
293
-	       LM_DBG("POLLIN found in %d uncomplete AS \n",i);
294
-	       poll_tmp->revents &= (~POLLIN);
295
-	       fd=handle_unc_as_data(poll_tmp->fd);
296
-	       if(fd>0){
297
-		  /* there's a new AS, push the uncomplete poll_fds up and set the AS */
298
-		  for(k=i;k>0;k--){
299
-		     j=3+as_nr+k;
300
-		     poll_fds[j].fd=poll_fds[j-1].fd;
301
-		     poll_fds[j].events=poll_fds[j-1].events;
302
-		     poll_fds[j].revents=poll_fds[j-1].revents;
303
-		  }
304
-		  poll_fds[3+as_nr].fd=fd;
305
-		  poll_fds[3+as_nr].events=POLLIN|POLLHUP;
306
-		  poll_fds[3+as_nr].revents=0;
307
-		  as_nr++;/*not very sure if this is thread-safe*/
308
-		  unc_as_nr--;
309
-	       }else if(fd<=0){/* pull the upper set of uncomplete AS down and take this one out*/
310
-		  poll_tmp->revents=0;
311
-		  for(k=i;k<(unc_as_nr-1);k++){
312
-		     j=3+as_nr+k;
313
-		     poll_fds[j].fd=poll_fds[j+1].fd;
314
-		     poll_fds[j].events=poll_fds[j+1].events;
315
-		     poll_fds[j].revents=poll_fds[j+1].revents;
316
-		  }
317
-		  unc_as_nr--;
318
-		  /** we decrement i so that pulling down the upper part of the unc_as array so that
319
-		   * it doesn't affect our for loop */
320
-		  i--;
321
-	       }
322
-	    }
323
-	    if(poll_tmp->revents & POLLHUP){
324
-	       LM_DBG("POLLHUP found in %d uncomplete AS \n",i);
325
-	       close(poll_tmp->fd);
326
-	       for(k=i;k<(unc_as_nr-1);k++){
327
-		  j=3+as_nr+k;
328
-		  poll_fds[j].fd=poll_fds[j+1].fd;
329
-		  poll_fds[j].events=poll_fds[j+1].events;
330
-		  poll_fds[j].revents=poll_fds[j+1].revents;
331
-	       }
332
-	       unc_as_nr--;
333
-	       i--;
334
-	       poll_tmp->revents = 0;
335
-	    }
336
-	 }/*for*/
337
-      }/*else ...(poll_events>0)*/
338
-   }/*while(1)*/
339
-}
340
-
341
-
342
-/**
343
- * opens the server socket, which attends (accepts) the clients, that is: 
344
- * params:
345
- * address:
346
- * 	address to which to listen
347
- * port:
348
- * 	base port to which to listen. then port+1 will be the socket
349
- * 	for action's delivery.
350
- * fds:
351
- * 	in fd[0] the action socket will be put.
352
- * 	in fd[1] the event socket will be put.
353
- *
354
- * returns 0 on exit, <0 on fail
355
- *
356
- */
357
-static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd)
358
-{
359
-
360
-   /*using sockaddr_union enables ipv6..*/
361
-   union sockaddr_union su;
362
-   int i,optval;
363
-
364
-   fd[0]=fd[1]=-1;
365
-
366
-   if(address->af!=AF_INET && address->af!=AF_INET6){
367
-      LM_ERR("Only ip and ipv6 allowed socket types\n");
368
-      return -1;
369
-   }
370
-
371
-   for(i=0;i<2;i++){
372
-      if(init_su(&su,address,port+i)<0){
373
-	 LM_ERR("unable to init sockaddr_union\n");
374
-	 return -1;
375
-      }
376
-      if((fd[i]=socket(AF2PF(su.s.sa_family), SOCK_STREAM, 0))==-1){
377
-	 LM_ERR("trying to open server %s socket (%s)\n",i==0?"event":"action",strerror(errno));
378
-	 goto error;
379
-      }
380
-      optval=1;
381
-      if (setsockopt(fd[i], SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof(optval))==-1) {
382
-	 LM_ERR("setsockopt (%s)\n",strerror(errno));
383
-	 goto error;
384
-      }
385
-      if ((bind(fd[i],(struct sockaddr *)&(su.s),sizeof(struct sockaddr_in)))==-1){
386
-	 LM_ERR( "bind (%s)\n",strerror(errno));
387
-	 goto error;
388
-      }
389
-      if (listen(fd[i], 10)==-1){
390
-	 LM_ERR( "listen (%s)\n",strerror(errno));
391
-	 goto error;
392
-      }
393
-   }
394
-   return 0;
395
-
396
-error:
397
-   for(i=0;i<2;i++)
398
-      if(fd[i]!=-1){
399
-	 close(fd[i]);
400
-	 fd[i]=-1;
401
-      }
402
-   return -1;
403
-}
404
-
405
-
406
-union helper{
407
-   as_msg_p ptr;
408
-   char bytes[sizeof(as_msg_p)];
409
-};
410
-
411
-/**
412
- * Sends event 
413
- *
414
- * returns
415
- * 	 0  OK
416
- * 	-1 couldn't read the event from the pipe
417
- * 	-2 couldn't send the event
418
- * TODO this should be FAR more generic... for example, there might be events
419
- * which are not related to any transaction (finish event, or error event...)
420
- * we should separate event-specific handling in different functions...
421
- */
422
-static int dispatch_relay(void)
423
-{
424
-   int i,j,retval,tries;
425
-   union helper thepointer;
426
-
427
-   i=j=0;
428
-   retval=0;
429
-read_again:
430
-   i=read(read_pipe,thepointer.bytes+j,sizeof(as_msg_p)-j);
431
-   if(i<0){
432
-      if(errno==EINTR){
433
-	 goto read_again;
434
-      }else{
435
-	 LM_ERR("Dispatcher Process received unknown error"
436
-			 " reading from pipe (%s)\n",strerror(errno));
437
-	 retval=-1;
438
-	 goto error;
439
-      }
440
-   }else if(i==0){
441
-	 LM_ERR("Dispatcher Process "
442
-	       "received 0 while reading from pipe\n");
443
-	 goto error;
444
-   }else{
445
-      j+=i;
446
-      if(j<sizeof(as_msg_p))
447
-	 goto read_again;
448
-   }
449
-
450
-   if (!thepointer.ptr) {
451
-      LM_ERR("Received Corrupted pointer to event !!\n");
452
-      retval=0;
453
-      goto error;
454
-   }
455
-   /*the message*/
456
-   if(use_stats && thepointer.ptr->transaction)
457
-      event_stat(thepointer.ptr->transaction);
458
-   if(thepointer.ptr->as == NULL || !thepointer.ptr->as->connected || thepointer.ptr->as->type==CLUSTER_TYPE){
459
-      LM_WARN("tryied to send an event to an App Server"
460
-			  " that is scheduled to die!!\n");
461
-      retval=-2;
462
-      goto error;
463
-   }
464
-   j=0;
465
-   tries=0;
466
-write_again:
467
-   i=write(thepointer.ptr->as->u.as.event_fd,thepointer.ptr->msg+j,thepointer.ptr->len-j);
468
-   if(i==-1){
469
-      switch(errno){
470
-	 case EINTR:
471
-	    if(!thepointer.ptr->as->connected){
472
-	       LM_WARN("tryied to send an event to an App Server"
473
-				   " that is scheduled to die!!\n");
474
-	       retval=-2;
475
-	       goto error;
476
-	    }
477
-	    goto write_again;
478
-	 case EPIPE:
479
-	    LM_ERR("AS [%.*s] closed "
480
-		  "the socket !\n",thepointer.ptr->as->u.as.name.len,thepointer.ptr->as->u.as.name.s);
481
-	    retval=-2;
482
-	    goto error;
483
-	 default:
484
-	    LM_ERR("unknown error while trying to write to AS socket(%s)\n",
485
-				strerror(errno));
486
-	    retval=-2;
487
-	    goto error;
488
-      }
489
-   }else if(i>0){
490
-      j+=i;
491
-      if(j<thepointer.ptr->len)
492
-	 goto write_again;
493
-   }else if(i==0){
494
-      if (tries++ > MAX_WRITE_TRIES) { 
495
-	 LM_ERR("MAX WRITE TRIES !!!\n");
496
-	 goto error;
497
-      }else
498
-	 goto write_again;
499
-   }
500
-   LM_DBG("Event relaied to %.*s AS\n",thepointer.ptr->as->u.as.name.len,
501
-		   thepointer.ptr->as->u.as.name.s);
502
-   LM_DBG("Event type %s \n",action_names[thepointer.ptr->type]);
503
-   retval=0;
504
-error:
505
-   if(thepointer.ptr){
506
-      if(thepointer.ptr->msg)
507
-	 shm_free(thepointer.ptr->msg);
508
-      shm_free(thepointer.ptr);
509
-   }
510
-   return retval;
511
-}
512
-
513
-/**
514
- * receives 2 indexes in unc_as_t which correspond one to
515
- * the events socket and the other to the actions socket
516
- *
517
- * returns
518
- * 	0 on success
519
- * 	-1 on error
520
- */
521
-static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as)
522
-{
523
-   struct unc_as *ev,*ac;
524
-   int j;
525
-   as_p the_as=0;
526
-   struct as_entry *tmp;
527
-
528
-   ev=&unc_as_t[event_idx];
529
-   ac=&unc_as_t[action_idx];
530
-
531
-   the_as=&(as->u.as);
532
-
533
-   the_as->action_fd=ac->fd;
534
-   the_as->event_fd=ev->fd;
535
-   the_as->name.len = strlen(ev->name);
536
-   if(use_ha){
537
-      if(jain_ping_timeout){
538
-	 if (0>init_pingtable(&the_as->jain_pings,jain_ping_timeout,(jain_ping_timeout/jain_ping_period+1)*PING_OVER_FACTOR)){
539
-	    LM_ERR("Unable to init jain pinging table...\n");
540
-	    goto error;
541
-	 }
542
-      }
543
-      if(servlet_ping_timeout){
544
-	 if (0>init_pingtable(&the_as->servlet_pings,servlet_ping_timeout,(servlet_ping_timeout/servlet_ping_period+1)*PING_OVER_FACTOR)){
545
-	    LM_ERR("Unable to init servlet pinging table...\n");
546
-	    goto error;
547
-	 }
548
-      }
549
-   }
550
-   /*TODO attention, this is pkg_malloc because only the Event_Dispatcher process 
551
-    * has to use it !!*/
552
-   if(!(the_as->ev_buffer.s = pkg_malloc(AS_BUF_SIZE))){
553
-      LM_ERR("unable to alloc pkg mem for the event buffer\n");
554
-      goto error;
555
-   }
556
-   the_as->ev_buffer.len=0;
557
-   as->connected=1;
558
-   the_as->action_pid=0;
559
-   for(tmp=as_list;tmp;tmp=tmp->next){
560
-      if(tmp->type==AS_TYPE)
561
-	 continue;
562
-      for (j=0;j<tmp->u.cs.num;j++) {
563
-	 if (tmp->u.cs.as_names[j].len == the_as->name.len && 
564
-	       !memcmp(tmp->u.cs.as_names[j].s,the_as->name.s,the_as->name.len)) { 
565
-	    if(tmp->u.cs.num==tmp->u.cs.registered){
566
-	       LM_ERR("AS %.*s belongs to cluster %.*s which is already completed\n",
567
-		     the_as->name.len,the_as->name.s,tmp->name.len,tmp->name.s);
568
-	       break;
569
-	    }
570
-	    tmp->u.cs.registered++;
571
-	    break;
572
-	 }
573
-      }
574
-   }
575
-   if(0>spawn_action_dispatcher(as)){
576
-      LM_ERR("Unable to spawn Action Dispatcher for as %s\n",ev->name);
577
-      goto error;
578
-   }
579
-   if(send_sockinfo(the_as->event_fd)==-1){
580
-      LM_ERR("Unable to send socket info to as %s\n",ev->name);
581
-      goto error;
582
-   }
583
-   return 0;
584
-error:
585
-   if(the_as->ev_buffer.s){
586
-      pkg_free(the_as->ev_buffer.s);
587
-      the_as->ev_buffer.s=(char*)0;
588
-   }
589
-   if(the_as->action_pid)
590
-      kill(the_as->action_pid,SIGTERM);
591
-   if(jain_ping_timeout)
592
-      destroy_pingtable(&the_as->jain_pings);
593
-   if(servlet_ping_timeout)
594
-      destroy_pingtable(&the_as->servlet_pings);
595
-   return -1;
596
-}
597
-
598
-
599
-
600
-
601
-
602
-
603
-/**prints available sockets in SER to the App Server.
604
- * format is:
605
- * 1: transport identifier (u for UDP, t for TCP, s for TLS)
606
- * 1: length of socket name (sip.voztele.com or whatever)
607
- * N: name
608
- * 1: length of IP address (192.168.1.2)
609
- * N: ip address in ascii
610
- * 2: port nubmer in NBO
611
- *
612
- * returns
613
- * 	-1 on error
614
- * 	0  on success
615
- */
616
-static inline int send_sockinfo(int fd)
617
-{
618
-   struct socket_info *s;
619
-   unsigned char i;
620
-   char buffer[300];
621
-   int k=0,j;
622
-   buffer[k++]=16;/*This used to be T_TABLE_POWER in Kamailio 1.0.1, now its hardcoded in config.h*/
623
-   for(i=0,s=udp_listen;s;s=s->next,i++);
624
-#ifdef USE_TCP
625
-   for(s=tcp_listen;s;s=s->next,i++);
626
-#endif
627
-#ifdef USE_TLS
628
-   for(s=tls_listen;s;s=s->next,i++);
629
-#endif
630
-   if(i==0){
631
-      LM_ERR("no udp|tcp|tls sockets ?!!\n");
632
-      return -1;
633
-   }
634
-   buffer[k++]=i;
635
-   for(s=udp_listen;s;s=s->next){
636
-      if(print_sock_info(buffer,300,&k,s,PROTO_UDP)==-1)
637
-	 return -1;
638
-   }
639
-#ifdef USE_TCP
640
-   for(s=tcp_listen;s;s=s->next){
641
-      if(print_sock_info(buffer,300,&k,s,PROTO_TCP)==-1)
642
-	 return -1;
643
-   }
644
-#endif
645
-#ifdef USE_TLS
646
-   for(s=tls_listen;s;s=s->next){
647
-      if(print_sock_info(buffer,300,&k,s,PROTO_TLS)==-1)
648
-	 return -1;
649
-   }
650
-#endif
651
-write_again:
652
-   j=write(fd,buffer,k);
653
-   if(j==-1){
654
-      if(errno==EINTR)
655
-	 goto write_again;
656
-      else
657
-	 return -1;
658
-   }
659
-   return 0;
660
-}
661
-
662
-/* prints sock info into the byte array where
663
- * returns 0 on success, -1 on err
664
- * the message sent is as follows:
665
- * 1: protocol type (0=NONE,1=UDP, 2=TCP, 3=TLS)
666
- * 1: name length
667
- * N: name
668
- * 1: address string length
669
- * N: address
670
- * 2: NBO unsigned shor int port number
671
- *
672
- * TODO buffer overflow risk
673
- */
674
-static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type)
675
-{
676
-   int k;
677
-   unsigned char i;
678
-   unsigned short int j;
679
-   if((wheremax-*idx)<49)/*31*name+17*ipv6+2*port+1*type*/
680
-      return -1;
681
-   k=*idx;
682
-   buffer[k++]=(char)type;
683
-   if((i=(unsigned char)s->name.len)>30){
684
-      LM_ERR("name too long\n");
685
-      return -1;
686
-   }
687
-   buffer[k++]=i;
688
-   memcpy(&buffer[k],s->name.s,i);
689
-   k+=i;
690
-   i=(unsigned char)s->address_str.len;
691
-   buffer[k++]=i;
692
-   memcpy(&buffer[k],s->address_str.s,i);
693
-   k+=i;
694
-   j=htons(s->port_no);
695
-   memcpy(&buffer[k],&j,2);
696
-   k+=2;
697
-   *idx=k;
698
-   return 0;
699
-}
700
-
701
-/**
702
- * Handles data from an AppServer. First searches in the AS table which was the AS
703
- * that sent the data (we dont already know it because this comes from a poll_fd
704
- * struct). When the one is found, it calls process_event_reply, which in turn
705
- * looks if there's a complete event in the buffer, and if there is, processes it.
706
- *
707
- * returns
708
- * 	-1 on error
709
- * 	-2 on read()==0 (the socket has been closed by the other end)
710
- *  	0 on success
711
- */
712
-static int handle_as_data(int fd)
713
-{
714
-   int j,k;
715
-   struct as_entry *as;
716
-   for(as=as_list;as;as=as->next)
717
-      if(as->type == AS_TYPE && as->connected && (as->u.as.event_fd==fd))
718
-	 break;
719
-   if(!as){
720
-      LM_ERR("AS not found\n");
721
-      return -1;
722
-   }
723
-   k=AS_BUF_SIZE-(as->u.as.ev_buffer.len);
724
-again:
725
-   if((j=read(fd,as->u.as.ev_buffer.s+as->u.as.ev_buffer.len,k))<0){
726
-      LM_ERR("reading data for as %.*s\n",as->name.len,as->name.s);
727
-      if(errno==EINTR)
728
-	 goto again;
729
-      else
730
-	 return -1;
731
-   }else if(j==0){
732
-      LM_ERR("AS client leaving (%.*s)\n",as->name.len,as->name.s);
733
-      return -2;
734
-   }
735
-   as->u.as.ev_buffer.len+=j;
736
-   LM_DBG("read %d bytes from AS (total = %d)\n",j,as->u.as.ev_buffer.len);
737
-   if(as->u.as.ev_buffer.len>10)
738
-      process_event_reply(&as->u.as);
739
-   return 0;
740
-}
741
-
742
-/**
743
- * This function processess the Application Server buffer. We do buffered
744
- * processing because it increases performance quite a bit. Any message
745
- * sent from the AS comes with the first 2 bytes as an NBO unsigned short int 
746
- * which says the length of the following message (header and payload).
747
- * This way, we avoid multiple small reads() to the socket, which (as we know), consumes
748
- * far more processor because of the kernel read(2) system call. The drawback
749
- * is the added complexity of mantaining a buffer, the bytes read, and looking
750
- * if there is a complete message already prepared.
751
- *
752
- * Actions are supposed to be small, that's why BUF_SIZE is 2000 bytes length. 
753
- * Most of the actions will be that size or less. That is why the 4 bytes telling the
754
- * length of the Action payload are included in its size. This way you can use a fixed size
755
- * buffer to receive the Actions and not need to be pkb_malloc'ing for each new event.
756
- * If there is a particular bigger packet, for example one carrying a picture (a JPG can
757
- * easily surpass the 2000 byte limit) then a pkg_malloc will be required. This is left TODO
758
- *
759
- * returns
760
- * 	-1 on error (packet too big)
761
- * 	0  on success
762
- */
763
-static int process_event_reply(as_p as)
764
-{
765
-   unsigned int ev_len;
766
-   unsigned char processor_id,type;
767
-   unsigned int flags;
768
-
769
-   ev_len=(as->ev_buffer.s[0]<<24)|(as->ev_buffer.s[1]<<16)|(as->ev_buffer.s[2]<<8)|((as->ev_buffer.s[3])&0xFF);
770
-   type=as->ev_buffer.s[4];
771
-   processor_id=as->ev_buffer.s[5];
772
-   flags=(as->ev_buffer.s[6]<<24)|(as->ev_buffer.s[7]<<16)|(as->ev_buffer.s[8]<<8)|((as->ev_buffer.s[9])&0xFF);
773
- 
774
-   /*if ev_len > BUF_SIZE then a flag should be put on the AS so that the whole length
775
-    * of the action is skipped, until a mechanism for handling big packets is implemented*/
776
-   if(ev_len>AS_BUF_SIZE){
777
-      LM_WARN("Packet too big (%d)!!! should be skipped"
778
-			  " and an error returned!\n",ev_len);
779
-      return -1;
780
-   }
781
-   if((as->ev_buffer.len<ev_len) || as->ev_buffer.len<4)
782
-      return 0;
783
-
784
-   while (as->ev_buffer.len>=ev_len) {
785
-      switch(type){
786
-         case BIND_AC:
787
-            LM_DBG("Processing a BIND action from AS (length=%d): %.*s\n",
788
-                  ev_len,as->name.len,as->name.s);
789
-            process_bind_action(as,processor_id,flags,&as->ev_buffer.s[10],ev_len-10);
790
-            break;
791
-         case UNBIND_AC:
792
-            LM_DBG("Processing a UNBIND action from AS (length=%d): %.*s\n",
793
-                  ev_len,as->name.len,as->name.s);
794
-            process_unbind_action(as,processor_id,flags,&as->ev_buffer.s[10],ev_len-10);
795
-            break;
796
-         default:
797
-            LM_DBG("Unknown action type %d (len=%d,proc=%d,flags=%d)\n",type,ev_len,(int)processor_id,flags);
798
-            return 0;
799
-      }
800
-      memmove(as->ev_buffer.s,&(as->ev_buffer.s[ev_len]),(as->ev_buffer.len)-ev_len);
801
-      (as->ev_buffer.len)-=ev_len;
802
-      if(as->ev_buffer.len>10){
803
-         ev_len=(as->ev_buffer.s[0]<<24)|(as->ev_buffer.s[1]<<16)|(as->ev_buffer.s[2]<<8)|((as->ev_buffer.s[3])&0xFF);
804
-         type=as->ev_buffer.s[4];
805
-         processor_id=as->ev_buffer.s[5];
806
-         flags=(as->ev_buffer.s[6]<<24)|(as->ev_buffer.s[7]<<16)|(as->ev_buffer.s[8]<<8)|((as->ev_buffer.s[9])&0xFF);
807
-      }else{
808
-         return 0;
809
-      }
810
-   }
811
-
812
-   return 0;
813
-}
814
-
815
-
816
-/**
817
- * processes a BIND event type from the AS.
818
- * Bind events follow this form:
819
- * 1:Address Family
820
- * 1:address length in bytes (16 for ipv6, 4 for ipv4) in NETWORK BYTE ORDER (fortunately, ip_addr struct stores it in NBO)
821
- * [16|4]:the IP address
822
- * 1:protocol used (UDP,TCP or TLS);
823
- * 2:NBO port
824
- *
825
- */
826
-int process_bind_action(as_p as,unsigned char processor_id,unsigned int flags,char *payload,int len)
827
-{
828
-   struct socket_info *si,*xxx_listen;
829
-   struct ip_addr my_addr;
830
-   int i,k,proto;
831
-   unsigned short port;
832
-   char buffer[300],*proto_s;
833
-   k=0;
834
-   *buffer=0;
835
-   proto_s="NONE";
836
-   for(i=0;i<MAX_BINDS;i++){
837
-      if(as->bound_processor[i]==0)
838
-	 break;
839
-   }
840
-   if(i==MAX_BINDS){
841
-      LM_ERR("No more bindings allowed. Ignoring bind request for processor %d\n",processor_id);
842
-      return -1;
843
-   }
844
-   memset(&my_addr,0,sizeof(struct ip_addr));
845
-   my_addr.af=payload[k++];
846
-   my_addr.len=payload[k++];
847
-   memcpy(my_addr.u.addr,payload+k,my_addr.len);
848
-   k+=my_addr.len;
849
-   proto=payload[k++];
850
-   memcpy(&port,payload+k,2);
851
-   k+=2;
852
-   port=ntohs(port);
853
-   print_ip_buf(&my_addr,buffer,300);
854
-   switch(proto){
855
-      case PROTO_UDP:
856
-	 proto_s="UDP";
857
-	 xxx_listen=udp_listen;
858
-	 break;
859
-#ifdef USE_TCP
860
-      case PROTO_TCP:
861
-	 proto_s="TCP";
862
-	 xxx_listen=tcp_listen;
863
-	 break;
864
-#endif
865
-#ifdef USE_TLS
866
-      case PROTO_TLS:
867
-	 proto_s="TLS";
868
-	 xxx_listen=tls_listen;
869
-	 break;
870
-#endif
871
-      default:
872
-	 goto error;
873
-   }
874
-   for(si=xxx_listen;si;si=si->next){
875
-      if(my_addr.af==si->address.af && 
876
-	    my_addr.len==si->address.len && 
877
-	    !memcmp(si->address.u.addr,my_addr.u.addr,my_addr.len) && 
878
-	    port == si->port_no){
879
-	 as->binds[i]=si;
880
-	 as->bound_processor[i]=processor_id;
881
-	 as->num_binds++;
882
-	 LM_DBG("AS processor with id: %d bound to %s %s %d\n",processor_id,proto_s,buffer,port);
883
-	 return 0;
884
-      }
885
-   }
886
-error:
887
-   LM_ERR("Cannot bind to %s %s %d !!!\n",proto_s,buffer,port);
888
-   return -1;
889
-}
890
-
891
-/**
892
- * processes a UNBIND event type from the AS.
893
- * Bind events follow this form:
894
- * 1:processor_id
895
- *
896
- */
897
-int process_unbind_action(as_p as,unsigned char processor_id,unsigned int flags,char *payload,int len)
898
-{
899
-   int i,k;
900
-   k=0;
901
-   for(i=0;i<as->num_binds;i++){