Browse code

core: new global parameter async_workers_group

- define groups of async worker processes
- async_workers_group="name=abc;workers=N;nonblock=[0|1];usleep=M"
- groups of worker processes can be used now with sworker module

Daniel-Constantin Mierla authored on 26/11/2020 15:04:39
Showing 4 changed files
... ...
@@ -40,13 +40,14 @@
40 40
 #include "ut.h"
41 41
 #include "pt.h"
42 42
 #include "cfg/cfg_struct.h"
43
+#include "parser/parse_param.h"
43 44
 
44 45
 
45 46
 #include "async_task.h"
46 47
 
47 48
 static async_wgroup_t *_async_wgroup_list = NULL;
48 49
 
49
-int async_task_run(int idx);
50
+int async_task_run(async_wgroup_t *awg, int idx);
50 51
 
51 52
 /**
52 53
  *
... ...
@@ -73,19 +74,22 @@ int async_task_workers_active(void)
73 74
 int async_task_init_sockets(void)
74 75
 {
75 76
 	int val;
77
+	async_wgroup_t *awg;
76 78
 
77
-	if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_wgroup_list->sockets) < 0) {
78
-		LM_ERR("opening tasks dgram socket pair\n");
79
-		return -1;
80
-	}
79
+	for(awg=_async_wgroup_list; awg!=NULL; awg=awg->next) {
80
+		if (socketpair(PF_UNIX, SOCK_DGRAM, 0, awg->sockets) < 0) {
81
+			LM_ERR("opening tasks dgram socket pair\n");
82
+			return -1;
83
+		}
81 84
 
82
-	if (_async_wgroup_list->nonblock) {
83
-		val = fcntl(_async_wgroup_list->sockets[1], F_GETFL, 0);
84
-		if(val<0) {
85
-			LM_WARN("failed to get socket flags\n");
86
-		} else {
87
-			if(fcntl(_async_wgroup_list->sockets[1], F_SETFL, val | O_NONBLOCK)<0) {
88
-				LM_WARN("failed to set socket nonblock flag\n");
85
+		if (awg->nonblock) {
86
+			val = fcntl(awg->sockets[1], F_GETFL, 0);
87
+			if(val<0) {
88
+				LM_WARN("failed to get socket flags\n");
89
+			} else {
90
+				if(fcntl(awg->sockets[1], F_SETFL, val | O_NONBLOCK)<0) {
91
+					LM_WARN("failed to set socket nonblock flag\n");
92
+				}
89 93
 			}
90 94
 		}
91 95
 	}
... ...
@@ -99,8 +103,13 @@ int async_task_init_sockets(void)
99 103
  */
100 104
 void async_task_close_sockets_child(void)
101 105
 {
106
+	async_wgroup_t *awg;
107
+
102 108
 	LM_DBG("closing the notification socket used by children\n");
103
-	close(_async_wgroup_list->sockets[1]);
109
+
110
+	for(awg=_async_wgroup_list; awg!=NULL; awg=awg->next) {
111
+		close(awg->sockets[1]);
112
+	}
104 113
 }
105 114
 
106 115
 /**
... ...
@@ -108,8 +117,13 @@ void async_task_close_sockets_child(void)
108 117
  */
109 118
 void async_task_close_sockets_parent(void)
110 119
 {
120
+	async_wgroup_t *awg;
121
+
111 122
 	LM_DBG("closing the notification socket used by parent\n");
112
-	close(_async_wgroup_list->sockets[0]);
123
+
124
+	for(awg=_async_wgroup_list; awg!=NULL; awg=awg->next) {
125
+		close(awg->sockets[0]);
126
+	}
113 127
 }
114 128
 
115 129
 /**
... ...
@@ -117,15 +131,23 @@ void async_task_close_sockets_parent(void)
117 131
  */
118 132
 int async_task_init(void)
119 133
 {
134
+	int nrg = 0;
135
+	async_wgroup_t *awg;
136
+
120 137
 	LM_DBG("start initializing asynk task framework\n");
121 138
 	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
122 139
 		return 0;
123 140
 
141
+	/* overall number of processes */
142
+	for(awg=_async_wgroup_list; awg!=NULL; awg=awg->next) {
143
+		nrg += awg->workers;
144
+	}
145
+
124 146
 	/* advertise new processes to core */
125
-	register_procs(_async_wgroup_list->workers);
147
+	register_procs(nrg);
126 148
 
127 149
 	/* advertise new processes to cfg framework */
128
-	cfg_register_child(_async_wgroup_list->workers);
150
+	cfg_register_child(nrg);
129 151
 
130 152
 	return 0;
131 153
 }
... ...
@@ -148,6 +170,7 @@ int async_task_child_init(int rank)
148 170
 	int pid;
149 171
 	int i;
150 172
 	char pname[64];
173
+	async_wgroup_t *awg;
151 174
 
152 175
 	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
153 176
 		return 0;
... ...
@@ -169,22 +192,26 @@ int async_task_child_init(int rank)
169 192
 	if (rank!=PROC_MAIN)
170 193
 		return 0;
171 194
 
172
-	snprintf(pname, 62, "Async Task Worker - %s",
173
-			(_async_wgroup_list->name.s)?_async_wgroup_list->name.s:"unknown");
174
-	for(i=0; i<_async_wgroup_list->workers; i++) {
175
-		pid=fork_process(PROC_RPC, pname, 1);
176
-		if (pid<0)
177
-			return -1; /* error */
178
-		if(pid==0) {
179
-			/* child */
180
-
181
-			/* initialize the config framework */
182
-			if (cfg_child_init())
183
-				return -1;
184
-			/* main function for workers */
185
-			if(async_task_run(i+1)<0) {
186
-				LM_ERR("failed to initialize task worker process: %d\n", i);
187
-				return -1;
195
+	for(awg=_async_wgroup_list; awg!=NULL; awg=awg->next) {
196
+		snprintf(pname, 62, "Async Task Worker - %s",
197
+				(awg->name.s)?awg->name.s:"unknown");
198
+		for(i=0; i<awg->workers; i++) {
199
+			pid=fork_process(PROC_RPC, pname, 1);
200
+			if (pid<0) {
201
+				return -1; /* error */
202
+			}
203
+			if(pid==0) {
204
+				/* child */
205
+
206
+				/* initialize the config framework */
207
+				if (cfg_child_init()) {
208
+					return -1;
209
+				}
210
+				/* main function for workers */
211
+				if(async_task_run(awg, i+1)<0) {
212
+					LM_ERR("failed to initialize task worker process: %d\n", i);
213
+					return -1;
214
+				}
188 215
 			}
189 216
 		}
190 217
 	}
... ...
@@ -215,12 +242,12 @@ int async_task_set_workers(int n)
215 242
 		}
216 243
 		memset(_async_wgroup_list, 0, sizeof(async_wgroup_t)
217 244
 				+ (gname.len+1)*sizeof(char));
245
+		_async_wgroup_list->name.s = (char*)_async_wgroup_list
246
+				+ sizeof(async_wgroup_t);
247
+		memcpy(_async_wgroup_list->name.s, gname.s, gname.len);
248
+		_async_wgroup_list->name.len = gname.len;
218 249
 	}
219 250
 	_async_wgroup_list->workers = n;
220
-	_async_wgroup_list->name.s = (char*)_async_wgroup_list
221
-		+ sizeof(async_wgroup_t);
222
-	memcpy(_async_wgroup_list->name.s, gname.s, gname.len);
223
-	_async_wgroup_list->name.len = gname.len;
224 251
 
225 252
 	return 0;
226 253
 }
... ...
@@ -252,6 +279,108 @@ int async_task_set_usleep(int n)
252 279
 	return v;
253 280
 }
254 281
 
282
+/**
283
+ *
284
+ */
285
+int async_task_set_workers_group(char *data)
286
+{
287
+	str sval;
288
+	param_t* params_list = NULL;
289
+	param_hooks_t phooks;
290
+	param_t *pit=NULL;
291
+	async_wgroup_t awg;
292
+	async_wgroup_t *newg;
293
+
294
+	if(data==NULL) {
295
+		return -1;
296
+	}
297
+	sval.s = data;
298
+	sval.len = strlen(sval.s);
299
+
300
+	if(sval.len<=0) {
301
+		LM_ERR("invalid parameter value\n");
302
+		return -1;
303
+	}
304
+
305
+	if(sval.s[sval.len-1]==';') {
306
+		sval.len--;
307
+	}
308
+	if (parse_params(&sval, CLASS_ANY, &phooks, &params_list)<0) {
309
+		return -1;
310
+	}
311
+	memset(&awg, 0, sizeof(async_wgroup_t));
312
+
313
+	for (pit = params_list; pit; pit=pit->next) {
314
+		if (pit->name.len==4
315
+				&& strncasecmp(pit->name.s, "name", 4)==0) {
316
+			awg.name = pit->body;
317
+		} else if (pit->name.len==7
318
+				&& strncasecmp(pit->name.s, "workers", 7)==0) {
319
+			if (str2sint(&pit->body, &awg.workers) < 0) {
320
+				LM_ERR("invalid workers value: %.*s\n", pit->body.len, pit->body.s);
321
+				return -1;
322
+			}
323
+		} else if (pit->name.len==6
324
+				&& strncasecmp(pit->name.s, "usleep", 6)==0) {
325
+			if (str2sint(&pit->body, &awg.usleep) < 0) {
326
+				LM_ERR("invalid usleep value: %.*s\n", pit->body.len, pit->body.s);
327
+				return -1;
328
+			}
329
+		} else if (pit->name.len==8
330
+				&& strncasecmp(pit->name.s, "nonblock", 8)==0) {
331
+			if (str2sint(&pit->body, &awg.nonblock) < 0) {
332
+				LM_ERR("invalid nonblock value: %.*s\n", pit->body.len, pit->body.s);
333
+				return -1;
334
+			}
335
+		}
336
+	}
337
+
338
+	if(awg.name.len<=0) {
339
+		LM_ERR("invalid name value: [%.*s]\n", sval.len, sval.s);
340
+		return -1;
341
+	}
342
+	if (awg.workers<=0) {
343
+		LM_ERR("invalid workers value: %d\n", awg.workers);
344
+		return -1;
345
+	}
346
+
347
+	if(awg.name.len==7 && strncmp(awg.name.s, "default", 7)==0) {
348
+		if(async_task_set_workers(awg.workers)<0) {
349
+			LM_ERR("failed to create the default group\n");
350
+			return -1;
351
+		}
352
+		async_task_set_nonblock(awg.nonblock);
353
+		async_task_set_usleep(awg.usleep);
354
+		return 0;
355
+	}
356
+	if(_async_wgroup_list==NULL) {
357
+		if(async_task_set_workers(1)<0) {
358
+			LM_ERR("failed to create the initial default group\n");
359
+			return -1;
360
+		}
361
+	}
362
+
363
+	newg = (async_wgroup_t*)pkg_malloc(sizeof(async_wgroup_t)
364
+				+ (awg.name.len+1)*sizeof(char));
365
+	if(newg==NULL) {
366
+		LM_ERR("failed to create async wgroup [%.*s]\n", sval.len, sval.s);
367
+		return -1;
368
+	}
369
+	memset(newg, 0, sizeof(async_wgroup_t)
370
+				+ (awg.name.len+1)*sizeof(char));
371
+	newg->name.s = (char*)newg + sizeof(async_wgroup_t);
372
+	memcpy(newg->name.s, awg.name.s, awg.name.len);
373
+	newg->name.len = awg.name.len;
374
+	newg->workers = awg.workers;
375
+	newg->nonblock = awg.nonblock;
376
+	newg->usleep = awg.usleep;
377
+
378
+	newg->next = _async_wgroup_list->next;
379
+	_async_wgroup_list->next = newg;
380
+
381
+	return 0;
382
+}
383
+
255 384
 /**
256 385
  *
257 386
  */
... ...
@@ -295,7 +424,7 @@ int async_task_group_push(str *gname, async_task_t *task)
295 424
 		LM_WARN("group [%.*s] not found - ignoring\n", gname->len, gname->s);
296 425
 		return 0;
297 426
 	}
298
-	len = write(_async_wgroup_list->sockets[1], &task, sizeof(async_task_t*));
427
+	len = write(awg->sockets[1], &task, sizeof(async_task_t*));
299 428
 	if(len<=0) {
300 429
 		LM_ERR("failed to pass the task [%p] to group [%.*s]\n", task,
301 430
 				gname->len, gname->s);
... ...
@@ -308,16 +437,17 @@ int async_task_group_push(str *gname, async_task_t *task)
308 437
 /**
309 438
  *
310 439
  */
311
-int async_task_run(int idx)
440
+int async_task_run(async_wgroup_t *awg, int idx)
312 441
 {
313 442
 	async_task_t *ptask;
314 443
 	int received;
315 444
 
316
-	LM_DBG("async task worker %d ready\n", idx);
445
+	LM_DBG("async task worker [%.*s] idx [%d] ready\n", awg->name.len,
446
+			awg->name.s, idx);
317 447
 
318 448
 	for( ; ; ) {
319
-		if(unlikely(_async_wgroup_list->usleep)) sleep_us(_async_wgroup_list->usleep);
320
-		if ((received = recvfrom(_async_wgroup_list->sockets[0],
449
+		if(unlikely(awg->usleep)) sleep_us(awg->usleep);
450
+		if ((received = recvfrom(awg->sockets[0],
321 451
 							&ptask, sizeof(async_task_t*),
322 452
 							0, NULL, 0)) < 0) {
323 453
 			LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno));
... ...
@@ -45,6 +45,7 @@ int async_task_child_init(int rank);
45 45
 int async_task_initialized(void);
46 46
 int async_task_set_workers(int n);
47 47
 int async_task_set_nonblock(int n);
48
+int async_task_set_workers_group(char *data);
48 49
 int async_task_push(async_task_t *task);
49 50
 int async_task_set_usleep(int n);
50 51
 int async_task_workers_get(void);
... ...
@@ -358,6 +358,7 @@ SOCKET_WORKERS socket_workers
358 358
 ASYNC_WORKERS async_workers
359 359
 ASYNC_USLEEP async_usleep
360 360
 ASYNC_NONBLOCK async_nonblock
361
+ASYNC_WORKERS_GROUP async_workers_group
361 362
 CHECK_VIA	check_via
362 363
 PHONE2TEL	phone2tel
363 364
 MEMLOG		"memlog"|"mem_log"
... ...
@@ -815,6 +816,7 @@ IMPORTFILE      "import_file"
815 816
 <INITIAL>{ASYNC_WORKERS}	{ count(); yylval.strval=yytext; return ASYNC_WORKERS; }
816 817
 <INITIAL>{ASYNC_USLEEP}	{ count(); yylval.strval=yytext; return ASYNC_USLEEP; }
817 818
 <INITIAL>{ASYNC_NONBLOCK}	{ count(); yylval.strval=yytext; return ASYNC_NONBLOCK; }
819
+<INITIAL>{ASYNC_WORKERS_GROUP}	{ count(); yylval.strval=yytext; return ASYNC_WORKERS_GROUP; }
818 820
 <INITIAL>{CHECK_VIA}	{ count(); yylval.strval=yytext; return CHECK_VIA; }
819 821
 <INITIAL>{PHONE2TEL}	{ count(); yylval.strval=yytext; return PHONE2TEL; }
820 822
 <INITIAL>{MEMLOG}	{ count(); yylval.strval=yytext; return MEMLOG; }
... ...
@@ -383,6 +383,7 @@ extern char *default_routename;
383 383
 %token ASYNC_WORKERS
384 384
 %token ASYNC_USLEEP
385 385
 %token ASYNC_NONBLOCK
386
+%token ASYNC_WORKERS_GROUP
386 387
 %token CHECK_VIA
387 388
 %token PHONE2TEL
388 389
 %token MEMLOG
... ...
@@ -943,6 +944,8 @@ assign_stm:
943 944
 	| ASYNC_USLEEP EQUAL error { yyerror("number expected"); }
944 945
 	| ASYNC_NONBLOCK EQUAL NUMBER { async_task_set_nonblock($3); }
945 946
 	| ASYNC_NONBLOCK EQUAL error { yyerror("number expected"); }
947
+	| ASYNC_WORKERS_GROUP EQUAL STRING { async_task_set_workers_group($3); }
948
+	| ASYNC_WORKERS_GROUP EQUAL error { yyerror("string expected"); }
946 949
 	| CHECK_VIA EQUAL NUMBER { check_via=$3; }
947 950
 	| CHECK_VIA EQUAL error { yyerror("boolean value expected"); }
948 951
 	| PHONE2TEL EQUAL NUMBER { phone2tel=$3; }