Browse code

core: option to set number of workers per listen socket

- new cfg global parameter: socket_workers - set the number of worker
processes for the next listen socket
- used before listen on udp and sctp socket - overwrites
children/sctp_children value for that socket
- used bofer listen on tcp and tls socket - adds extra tcp workers,
these handling traffic only on that socket
- socket_workers is reset with next listen socket that is added, thus
use it for each listen socket where you want custom number of workers
- if this parameter is not used at all, it is the same behaviour as so
far

Example for udp sockets:

children=4
socket_workers=2
listen=udp:127.0.0.1:5080
listen=udp:127.0.0.1:5070
listen=udp:127.0.0.1:5060

- it will start 2 workers to handle traffic on udp:127.0.0.1:5080 and 4
for each of udp:127.0.0.1:5070 and udp:127.0.0.1:5060. In total there
are 10 worker processes

Example for tcp sockets:

children=4
socket_workers=2
listen=tcp:127.0.0.1:5080
listen=tcp:127.0.0.1:5070
listen=tcp:127.0.0.1:5060

- it will start 2 workers to handle traffic on tcp:127.0.0.1:5080 and 4
to handle traffic on both tcp:127.0.0.1:5070 and tcp:127.0.0.1:5060.
In total there are 6 worker processes

Daniel-Constantin Mierla authored on 12/01/2012 17:55:22
Showing 8 changed files
... ...
@@ -390,6 +390,7 @@ STAT	statistics
390 390
 MAXBUFFER maxbuffer
391 391
 SQL_BUFFER_SIZE sql_buffer_size
392 392
 CHILDREN children
393
+SOCKET_WORKERS socket_workers
393 394
 CHECK_VIA	check_via
394 395
 PHONE2TEL	phone2tel
395 396
 SYN_BRANCH syn_branch
... ...
@@ -782,6 +783,7 @@ IMPORTFILE      "import_file"
782 782
 <INITIAL>{MAXBUFFER}	{ count(); yylval.strval=yytext; return MAXBUFFER; }
783 783
 <INITIAL>{SQL_BUFFER_SIZE}	{ count(); yylval.strval=yytext; return SQL_BUFFER_SIZE; }
784 784
 <INITIAL>{CHILDREN}	{ count(); yylval.strval=yytext; return CHILDREN; }
785
+<INITIAL>{SOCKET_WORKERS}	{ count(); yylval.strval=yytext; return SOCKET_WORKERS; }
785 786
 <INITIAL>{CHECK_VIA}	{ count(); yylval.strval=yytext; return CHECK_VIA; }
786 787
 <INITIAL>{PHONE2TEL}	{ count(); yylval.strval=yytext; return PHONE2TEL; }
787 788
 <INITIAL>{SYN_BRANCH}	{ count(); yylval.strval=yytext; return SYN_BRANCH; }
... ...
@@ -443,6 +443,7 @@ extern char *finame;
443 443
 %token PORT
444 444
 %token STAT
445 445
 %token CHILDREN
446
+%token SOCKET_WORKERS
446 447
 %token CHECK_VIA
447 448
 %token PHONE2TEL
448 449
 %token SYN_BRANCH
... ...
@@ -938,6 +939,8 @@ assign_stm:
938 938
 	| PORT EQUAL error    { yyerror("number expected"); }
939 939
 	| CHILDREN EQUAL NUMBER { children_no=$3; }
940 940
 	| CHILDREN EQUAL error { yyerror("number expected"); }
941
+	| SOCKET_WORKERS EQUAL NUMBER { socket_workers=$3; }
942
+	| SOCKET_WORKERS EQUAL error { yyerror("number expected"); }
941 943
 	| CHECK_VIA EQUAL NUMBER { check_via=$3; }
942 944
 	| CHECK_VIA EQUAL error { yyerror("boolean value expected"); }
943 945
 	| PHONE2TEL EQUAL NUMBER { phone2tel=$3; }
... ...
@@ -998,7 +1001,7 @@ assign_stm:
998 998
 	| TCP_ACCEPT_ALIASES EQUAL error { yyerror("boolean value expected"); }
999 999
 	| TCP_CHILDREN EQUAL NUMBER {
1000 1000
 		#ifdef USE_TCP
1001
-			tcp_children_no=$3;
1001
+			tcp_cfg_children_no=$3;
1002 1002
 		#else
1003 1003
 			warn("tcp support not compiled in");
1004 1004
 		#endif
... ...
@@ -88,8 +88,10 @@ extern struct socket_info* sendipv6_sctp; /* same as above for ipv6 */
88 88
 extern unsigned int maxbuffer;
89 89
 extern unsigned int sql_buffer_size;
90 90
 extern int children_no;
91
+extern int socket_workers;
91 92
 #ifdef USE_TCP
92 93
 extern int tcp_main_pid;
94
+extern int tcp_cfg_children_no;
93 95
 extern int tcp_children_no;
94 96
 extern int tcp_disable;
95 97
 extern enum poll_types tcp_poll_method;
... ...
@@ -116,6 +116,8 @@ struct socket_info{
116 116
 	char proto; /* tcp or udp*/
117 117
 	str sock_str; /* Socket proto, ip, and port as string */
118 118
 	struct addr_info* addr_info_lst; /* extra addresses (e.g. SCTP mh) */
119
+	int workers; /* number of worker processes for this socket */
120
+	int workers_tcpidx; /* index of workers in tcp children array */
119 121
 };
120 122
 
121 123
 
... ...
@@ -341,9 +341,12 @@ unsigned int maxbuffer = MAX_RECV_BUFFER_SIZE; /* maximum buffer size we do
341 341
 												  be re-configured */
342 342
 unsigned int sql_buffer_size = 65535; /* Size for the SQL buffer. Defaults to 64k. 
343 343
                                          This may be re-configured */
344
-int children_no = 0;			/* number of children processing requests */
344
+int socket_workers = 0;		/* number of workers processing requests for a socket
345
+							   - it's reset everytime with a new listen socket */
346
+int children_no = 0;		/* number of children processing requests */
345 347
 #ifdef USE_TCP
346
-int tcp_children_no = 0;
348
+int tcp_cfg_children_no = 0; /* set via config or command line option */
349
+int tcp_children_no = 0; /* based on socket_workers and tcp_cfg_children_no */
347 350
 int tcp_disable = 0; /* 1 if tcp is disabled */
348 351
 #endif
349 352
 #ifdef USE_TLS
... ...
@@ -1265,6 +1268,7 @@ int main_loop()
1265 1265
 #ifdef EXTRA_DEBUG
1266 1266
 	int r;
1267 1267
 #endif
1268
+	int nrprocs;
1268 1269
 
1269 1270
 	/* one "main" process and n children handling i/o */
1270 1271
 	if (dont_fork){
... ...
@@ -1453,7 +1457,7 @@ int main_loop()
1453 1453
 				sendipv6=si;
1454 1454
 	#endif
1455 1455
 			/* children_no per each socket */
1456
-			cfg_register_child(children_no);
1456
+			cfg_register_child((si->workers>0)?si->workers:children_no);
1457 1457
 		}
1458 1458
 #ifdef USE_RAW_SOCKS
1459 1459
 		/* always try to have a raw socket opened if we are using ipv4 */
... ...
@@ -1507,7 +1511,7 @@ int main_loop()
1507 1507
 					sendipv6_sctp=si;
1508 1508
 		#endif
1509 1509
 				/* sctp_children_no per each socket */
1510
-				cfg_register_child(sctp_children_no);
1510
+				cfg_register_child((si->workers>0)?si->workers:sctp_children_no);
1511 1511
 			}
1512 1512
 		}
1513 1513
 #endif /* USE_SCTP */
... ...
@@ -1589,7 +1593,8 @@ int main_loop()
1589 1589
 
1590 1590
 		/* udp processes */
1591 1591
 		for(si=udp_listen; si; si=si->next){
1592
-			for(i=0;i<children_no;i++){
1592
+			nrprocs = (si->workers>0)?si->workers:children_no;
1593
+			for(i=0;i<nrprocs;i++){
1593 1594
 				if(si->address.af==AF_INET6) {
1594 1595
 					snprintf(si_desc, MAX_PT_DESC, "udp receiver child=%d "
1595 1596
 						"sock=[%s]:%s",
... ...
@@ -1620,7 +1625,8 @@ int main_loop()
1620 1620
 		/* sctp processes */
1621 1621
 		if (!sctp_disable){
1622 1622
 			for(si=sctp_listen; si; si=si->next){
1623
-				for(i=0;i<sctp_children_no;i++){
1623
+				nrprocs = (si->workers>0)?si->workers:sctp_children_no;
1624
+				for(i=0;i<nrprocs;i++){
1624 1625
 					if(si->address.af==AF_INET6) {
1625 1626
 						snprintf(si_desc, MAX_PT_DESC, "sctp receiver child=%d "
1626 1627
 								"sock=[%s]:%s",
... ...
@@ -1762,17 +1768,33 @@ static int calc_proc_no(void)
1762 1762
 {
1763 1763
 	int udp_listeners;
1764 1764
 	struct socket_info* si;
1765
+#ifdef USE_TCP
1766
+	int tcp_listeners;
1767
+	int tcp_e_listeners;
1768
+#endif
1765 1769
 #ifdef USE_SCTP
1766 1770
 	int sctp_listeners;
1767 1771
 #endif
1768 1772
 
1769
-	for (si=udp_listen, udp_listeners=0; si; si=si->next, udp_listeners++);
1773
+	for (si=udp_listen, udp_listeners=0; si; si=si->next)
1774
+		udp_listeners += (si->workers>0)?si->workers:children_no;
1775
+#ifdef USE_TCP
1776
+	for (si=tcp_listen, tcp_listeners=0, tcp_e_listeners=0; si; si=si->next) {
1777
+		if(si->workers>0)
1778
+			tcp_listeners += si->workers;
1779
+		else
1780
+			 tcp_e_listeners = tcp_cfg_children_no;
1781
+	}
1782
+	tcp_listeners += tcp_e_listeners;
1783
+	tcp_children_no = tcp_listeners;
1784
+#endif
1770 1785
 #ifdef USE_SCTP
1771
-	for (si=sctp_listen, sctp_listeners=0; si; si=si->next, sctp_listeners++);
1786
+	for (si=sctp_listen, sctp_listeners=0; si; si=si->next)
1787
+		sctp_listeners += (si->workers>0)?si->workers:sctp_children_no;
1772 1788
 #endif
1773 1789
 	return
1774 1790
 		     /* receivers and attendant */
1775
-		(dont_fork ? 1 : children_no * udp_listeners + 1)
1791
+		(dont_fork ? 1 : udp_listeners + 1)
1776 1792
 		     /* timer process */
1777 1793
 		+ 1 /* always, we need it in most cases, and we can't tell here
1778 1794
 		       & now if we don't need it */
... ...
@@ -1780,10 +1802,10 @@ static int calc_proc_no(void)
1780 1780
 		+ 1 /* slow timer process */
1781 1781
 #endif
1782 1782
 #ifdef USE_TCP
1783
-		+((!tcp_disable)?( 1/* tcp main */ + tcp_children_no ):0)
1783
+		+((!tcp_disable)?( 1/* tcp main */ + tcp_listeners ):0)
1784 1784
 #endif
1785 1785
 #ifdef USE_SCTP
1786
-		+((!sctp_disable)?sctp_children_no*sctp_listeners:0)
1786
+		+((!sctp_disable)?sctp_listeners:0)
1787 1787
 #endif
1788 1788
 		;
1789 1789
 }
... ...
@@ -2142,7 +2164,7 @@ try_again:
2142 2142
 					break;
2143 2143
 			case 'N':
2144 2144
 				#ifdef USE_TCP
2145
-					tcp_children_no=strtol(optarg, &tmp, 10);
2145
+					tcp_cfg_children_no=strtol(optarg, &tmp, 10);
2146 2146
 					if ((tmp==0) ||(*tmp)){
2147 2147
 						fprintf(stderr, "bad process number: -N %s\n",
2148 2148
 									optarg);
... ...
@@ -2274,7 +2296,8 @@ try_again:
2274 2274
 	if (children_no<=0) children_no=CHILD_NO;
2275 2275
 #ifdef USE_TCP
2276 2276
 	if (!tcp_disable){
2277
-		if (tcp_children_no<=0) tcp_children_no=children_no;
2277
+		if (tcp_cfg_children_no<=0) tcp_cfg_children_no=children_no;
2278
+		tcp_children_no = tcp_cfg_children_no;
2278 2279
 	}
2279 2280
 #endif
2280 2281
 #ifdef USE_SCTP
... ...
@@ -622,6 +622,10 @@ static struct socket_info* new_sock2list(char* name, struct name_lst* addr_l,
622 622
 		LOG(L_ERR, "ERROR: new_sock2list: new_sock_info failed\n");
623 623
 		goto error;
624 624
 	}
625
+	if(socket_workers>0) {
626
+		si->workers = socket_workers;
627
+		socket_workers = 0;
628
+	}
625 629
 	sock_listadd(list, si);
626 630
 	return si;
627 631
 error:
... ...
@@ -50,6 +50,7 @@ struct tcp_child{
50 50
 	int proc_no; /* ser proc_no, for debugging */
51 51
 	int unix_sock; /* unix "read child" sock fd */
52 52
 	int busy;
53
+	struct socket_info *mysocket; /* listen socket to handle traffic on it */
53 54
 	int n_reqs; /* number of requests serviced so far */
54 55
 };
55 56
 
... ...
@@ -268,7 +268,7 @@ struct tcp_conn_alias** tcpconn_aliases_hash=0;
268 268
 struct tcp_connection** tcpconn_id_hash=0;
269 269
 gen_lock_t* tcpconn_lock=0;
270 270
 
271
-struct tcp_child* tcp_children;
271
+struct tcp_child* tcp_children=0;
272 272
 static int* connection_id=0; /*  unique for each connection, used for 
273 273
 								quickly finding the corresponding connection
274 274
 								for a reply */
... ...
@@ -282,6 +282,12 @@ static io_wait_h io_h;
282 282
 static struct local_timer tcp_main_ltimer;
283 283
 static ticks_t tcp_main_prev_ticks;
284 284
 
285
+/* tell if there are tcp workers that should handle only specific socket
286
+ * - used to optimize the search of least loaded worker for a tcp socket
287
+ * - 0 - no workers per tcp sockets have been set
288
+ * - 1 + generic_workers - when there are workers per tcp sockets
289
+ */
290
+static int tcp_sockets_gworkers = 0;
285 291
 
286 292
 static ticks_t tcpconn_main_timeout(ticks_t , struct timer_ln* , void* );
287 293
 
... ...
@@ -3890,24 +3896,63 @@ inline static int send2child(struct tcp_connection* tcpconn)
3890 3890
 	int i;
3891 3891
 	int min_busy;
3892 3892
 	int idx;
3893
+	int wfirst;
3894
+	int wlast;
3893 3895
 	static int crt=0; /* current child */
3894 3896
 	int last;
3895 3897
 	
3896
-	min_busy=tcp_children[0].busy;
3897
-	idx=0;
3898
-	last=crt+tcp_children_no;
3899
-	for (; crt<last; crt++){
3900
-		i=crt%tcp_children_no;
3901
-		if (!tcp_children[i].busy){
3902
-			idx=i;
3903
-			min_busy=0;
3904
-			break;
3905
-		}else if (min_busy>tcp_children[i].busy){
3906
-			min_busy=tcp_children[i].busy;
3907
-			idx=i;
3898
+	if(likely(tcp_sockets_gworkers==0)) {
3899
+		/* no child selection based on received socket
3900
+		 * - use least loaded over all */
3901
+		min_busy=tcp_children[0].busy;
3902
+		idx=0;
3903
+		last=crt+tcp_children_no;
3904
+		for (; crt<last; crt++){
3905
+			i=crt%tcp_children_no;
3906
+			if (!tcp_children[i].busy){
3907
+				idx=i;
3908
+				min_busy=0;
3909
+				break;
3910
+			}else if (min_busy>tcp_children[i].busy){
3911
+				min_busy=tcp_children[i].busy;
3912
+				idx=i;
3913
+			}
3914
+		}
3915
+		crt=idx+1; /* next time we start with crt%tcp_children_no */
3916
+	} else {
3917
+		/* child selection based on received socket
3918
+		 * - use least loaded per received socket, starting with the first
3919
+		 *   in its group */
3920
+		if(tcpconn->rcv.bind_address->workers>0) {
3921
+			wfirst = tcpconn->rcv.bind_address->workers_tcpidx;
3922
+			wlast = wfirst + tcpconn->rcv.bind_address->workers;
3923
+			LM_DBG("===== checking per-socket specific workers (%d/%d..%d/%d) [%s]\n",
3924
+					tcp_children[wfirst].pid, tcp_children[wfirst].proc_no,
3925
+					tcp_children[wlast-1].pid, tcp_children[wlast-1].proc_no,
3926
+					tcpconn->rcv.bind_address->sock_str.s);
3927
+		} else {
3928
+			wfirst = 0;
3929
+			wlast = tcp_sockets_gworkers - 1;
3930
+			LM_DBG("+++++ checking per-socket generic workers (%d/%d..%d/%d) [%s]\n",
3931
+					tcp_children[wfirst].pid, tcp_children[wfirst].proc_no,
3932
+					tcp_children[wlast-1].pid, tcp_children[wlast-1].proc_no,
3933
+					tcpconn->rcv.bind_address->sock_str.s);
3934
+		}
3935
+		idx = wfirst;
3936
+		min_busy = tcp_children[idx].busy;
3937
+		for(i=wfirst; i<wlast; i++) {
3938
+			if (!tcp_children[i].busy){
3939
+				idx=i;
3940
+				min_busy=0;
3941
+				break;
3942
+			} else {
3943
+				if (min_busy>tcp_children[i].busy) {
3944
+					min_busy=tcp_children[i].busy;
3945
+					idx=i;
3946
+				}
3947
+			}
3908 3948
 		}
3909 3949
 	}
3910
-	crt=idx+1; /* next time we start with crt%tcp_children_no */
3911 3950
 	
3912 3951
 	tcp_children[idx].busy++;
3913 3952
 	tcp_children[idx].n_reqs++;
... ...
@@ -3916,9 +3961,9 @@ inline static int send2child(struct tcp_connection* tcpconn)
3916 3916
 				" connection passed to the least busy one (%d)\n",
3917 3917
 				min_busy);
3918 3918
 	}
3919
-	DBG("send2child: to tcp child %d %d(%ld), %p\n", idx, 
3920
-					tcp_children[idx].proc_no,
3921
-					(long)tcp_children[idx].pid, tcpconn);
3919
+	LM_DBG("selected tcp worker %d %d(%ld) for activity on [%s], %p\n",
3920
+			idx, tcp_children[idx].proc_no, (long)tcp_children[idx].pid,
3921
+			tcpconn->rcv.bind_address->sock_str.s, tcpconn);
3922 3922
 	/* first make sure this child doesn't have pending request for
3923 3923
 	 * tcp_main (to avoid a possible deadlock: e.g. child wants to
3924 3924
 	 * send a release command, but the master fills its socket buffer
... ...
@@ -4838,9 +4883,10 @@ int tcp_fix_child_sockets(int* fd)
4838 4838
 /* starts the tcp processes */
4839 4839
 int tcp_init_children()
4840 4840
 {
4841
-	int r;
4841
+	int r, i;
4842 4842
 	int reader_fd_1; /* for comm. with the tcp children read  */
4843 4843
 	pid_t pid;
4844
+	char si_desc[MAX_PT_DESC];
4844 4845
 	struct socket_info *si;
4845 4846
 	
4846 4847
 	/* estimate max fd. no:
... ...
@@ -4867,13 +4913,32 @@ int tcp_init_children()
4867 4867
 			LOG(L_ERR, "ERROR: tcp_init_children: out of memory\n");
4868 4868
 			goto error;
4869 4869
 	}
4870
+	memset(tcp_children, 0, sizeof(struct tcp_child)*tcp_children_no);
4871
+	/* assign own socket for tcp workers, if it is the case
4872
+	 * - add them from end to start of tcp children array
4873
+	 * - thus, have generic tcp workers at beginning */
4874
+	i = tcp_children_no-1;
4875
+	for(si=tcp_listen; si; si=si->next) {
4876
+		if(si->workers>0) {
4877
+			si->workers_tcpidx = i - si->workers + 1;
4878
+			for(r=0; r<si->workers; r++) {
4879
+				tcp_children[i].mysocket = si;
4880
+				i--;
4881
+			}
4882
+		}
4883
+	}
4884
+	tcp_sockets_gworkers = (i != tcp_children_no-1)?(1 + i + 1):0;
4885
+
4870 4886
 	/* create the tcp sock_info structures */
4871 4887
 	/* copy the sockets --moved to main_loop*/
4872 4888
 	
4873 4889
 	/* fork children & create the socket pairs*/
4874 4890
 	for(r=0; r<tcp_children_no; r++){
4875 4891
 		child_rank++;
4876
-		pid=fork_tcp_process(child_rank, "tcp receiver", r, &reader_fd_1);
4892
+		snprintf(si_desc, MAX_PT_DESC, "tcp receiver (%s)",
4893
+				(tcp_children[r].mysocket!=NULL)?
4894
+					tcp_children[r].mysocket->sock_str.s:"generic");
4895
+		pid=fork_tcp_process(child_rank, si_desc, r, &reader_fd_1);
4877 4896
 		if (pid<0){
4878 4897
 			LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
4879 4898
 					strerror(errno));