Browse code

core: created a structure for async task attributes

Daniel-Constantin Mierla authored on 25/11/2020 08:08:59
Showing 2 changed files
... ...
@@ -44,10 +44,7 @@
44 44
 
45 45
 #include "async_task.h"
46 46
 
47
-static int _async_task_workers = 0;
48
-static int _async_task_sockets[2];
49
-static int _async_task_usleep = 0;
50
-static int _async_nonblock = 0;
47
+static async_wgroup_t *_async_wgroup_list = NULL;
51 48
 
52 49
 int async_task_run(int idx);
53 50
 
... ...
@@ -56,7 +53,7 @@ int async_task_run(int idx);
56 53
  */
57 54
 int async_task_workers_get(void)
58 55
 {
59
-	return _async_task_workers;
56
+	return (_async_wgroup_list)?_async_wgroup_list->workers:0;
60 57
 }
61 58
 
62 59
 /**
... ...
@@ -64,7 +61,7 @@ int async_task_workers_get(void)
64 61
  */
65 62
 int async_task_workers_active(void)
66 63
 {
67
-	if(_async_task_workers<=0)
64
+	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
68 65
 		return 0;
69 66
 
70 67
 	return 1;
... ...
@@ -77,17 +74,17 @@ int async_task_init_sockets(void)
77 74
 {
78 75
 	int val;
79 76
 
80
-	if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_task_sockets) < 0) {
77
+	if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_wgroup_list->sockets) < 0) {
81 78
 		LM_ERR("opening tasks dgram socket pair\n");
82 79
 		return -1;
83 80
 	}
84 81
 
85
-	if (_async_nonblock) {
86
-		val = fcntl(_async_task_sockets[1], F_GETFL, 0);
82
+	if (_async_wgroup_list->nonblock) {
83
+		val = fcntl(_async_wgroup_list->sockets[1], F_GETFL, 0);
87 84
 		if(val<0) {
88 85
 			LM_WARN("failed to get socket flags\n");
89 86
 		} else {
90
-			if(fcntl(_async_task_sockets[1], F_SETFL, val | O_NONBLOCK)<0) {
87
+			if(fcntl(_async_wgroup_list->sockets[1], F_SETFL, val | O_NONBLOCK)<0) {
91 88
 				LM_WARN("failed to set socket nonblock flag\n");
92 89
 			}
93 90
 		}
... ...
@@ -103,7 +100,7 @@ int async_task_init_sockets(void)
103 100
 void async_task_close_sockets_child(void)
104 101
 {
105 102
 	LM_DBG("closing the notification socket used by children\n");
106
-	close(_async_task_sockets[1]);
103
+	close(_async_wgroup_list->sockets[1]);
107 104
 }
108 105
 
109 106
 /**
... ...
@@ -112,7 +109,7 @@ void async_task_close_sockets_child(void)
112 109
 void async_task_close_sockets_parent(void)
113 110
 {
114 111
 	LM_DBG("closing the notification socket used by parent\n");
115
-	close(_async_task_sockets[0]);
112
+	close(_async_wgroup_list->sockets[0]);
116 113
 }
117 114
 
118 115
 /**
... ...
@@ -121,14 +118,14 @@ void async_task_close_sockets_parent(void)
121 118
 int async_task_init(void)
122 119
 {
123 120
 	LM_DBG("start initializing asynk task framework\n");
124
-	if(_async_task_workers<=0)
121
+	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
125 122
 		return 0;
126 123
 
127 124
 	/* advertise new processes to core */
128
-	register_procs(_async_task_workers);
125
+	register_procs(_async_wgroup_list->workers);
129 126
 
130 127
 	/* advertise new processes to cfg framework */
131
-	cfg_register_child(_async_task_workers);
128
+	cfg_register_child(_async_wgroup_list->workers);
132 129
 
133 130
 	return 0;
134 131
 }
... ...
@@ -138,7 +135,7 @@ int async_task_init(void)
138 135
  */
139 136
 int async_task_initialized(void)
140 137
 {
141
-	if(_async_task_workers<=0)
138
+	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
142 139
 		return 0;
143 140
 	return 1;
144 141
 }
... ...
@@ -150,8 +147,9 @@ int async_task_child_init(int rank)
150 147
 {
151 148
 	int pid;
152 149
 	int i;
150
+	char pname[64];
153 151
 
154
-	if(_async_task_workers<=0)
152
+	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0)
155 153
 		return 0;
156 154
 
157 155
 	LM_DBG("child initializing asynk task framework\n");
... ...
@@ -171,8 +169,10 @@ int async_task_child_init(int rank)
171 169
 	if (rank!=PROC_MAIN)
172 170
 		return 0;
173 171
 
174
-	for(i=0; i<_async_task_workers; i++) {
175
-		pid=fork_process(PROC_RPC, "Async Task Worker", 1);
172
+	snprintf(pname, 62, "Async Task Worker - %s",
173
+			(_async_wgroup_list->name.s)?_async_wgroup_list->name.s:"unknown");
174
+	for(i=0; i<_async_wgroup_list->workers; i++) {
175
+		pid=fork_process(PROC_RPC, pname, 1);
176 176
 		if (pid<0)
177 177
 			return -1; /* error */
178 178
 		if(pid==0) {
... ...
@@ -197,14 +197,30 @@ int async_task_child_init(int rank)
197 197
  */
198 198
 int async_task_set_workers(int n)
199 199
 {
200
-	if(_async_task_workers>0) {
200
+	str gname = str_init("default");
201
+
202
+	if(_async_wgroup_list!=NULL && _async_wgroup_list->workers>0) {
201 203
 		LM_WARN("task workers already set\n");
202 204
 		return 0;
203 205
 	}
204 206
 	if(n<=0)
205 207
 		return 0;
206 208
 
207
-	_async_task_workers = n;
209
+	if(_async_wgroup_list==NULL) {
210
+		_async_wgroup_list = (async_wgroup_t*)pkg_malloc(sizeof(async_wgroup_t)
211
+				+ (gname.len+1)*sizeof(char));
212
+		if(_async_wgroup_list==NULL) {
213
+			LM_ERR("failed to create async wgroup\n");
214
+			return -1;
215
+		}
216
+		memset(_async_wgroup_list, 0, sizeof(async_wgroup_t)
217
+				+ (gname.len+1)*sizeof(char));
218
+	}
219
+	_async_wgroup_list->workers = n;
220
+	_async_wgroup_list->name.s = (char*)_async_wgroup_list
221
+		+ sizeof(async_wgroup_t);
222
+	memcpy(_async_wgroup_list->name.s, gname.s, gname.len);
223
+	_async_wgroup_list->name.len = gname.len;
208 224
 
209 225
 	return 0;
210 226
 }
... ...
@@ -214,8 +230,9 @@ int async_task_set_workers(int n)
214 230
  */
215 231
 int async_task_set_nonblock(int n)
216 232
 {
217
-	if(n>0)
218
-		_async_nonblock = 1;
233
+	if(n>0 && _async_wgroup_list!=NULL) {
234
+		_async_wgroup_list->nonblock = 1;
235
+	}
219 236
 
220 237
 	return 0;
221 238
 }
... ...
@@ -225,10 +242,12 @@ int async_task_set_nonblock(int n)
225 242
  */
226 243
 int async_task_set_usleep(int n)
227 244
 {
228
-	int v;
245
+	int v = 0;
229 246
 
230
-	v = _async_task_usleep;
231
-	_async_task_usleep = n;
247
+	if(_async_wgroup_list!=NULL) {
248
+		v = _async_wgroup_list->usleep;
249
+		_async_wgroup_list->usleep = n;
250
+	}
232 251
 
233 252
 	return v;
234 253
 }
... ...
@@ -240,12 +259,12 @@ int async_task_push(async_task_t *task)
240 259
 {
241 260
 	int len;
242 261
 
243
-	if(_async_task_workers<=0) {
262
+	if(_async_wgroup_list==NULL || _async_wgroup_list->workers<=0) {
244 263
 		LM_WARN("async task pushed, but no async workers - ignoring\n");
245 264
 		return 0;
246 265
 	}
247 266
 
248
-	len = write(_async_task_sockets[1], &task, sizeof(async_task_t*));
267
+	len = write(_async_wgroup_list->sockets[1], &task, sizeof(async_task_t*));
249 268
 	if(len<=0) {
250 269
 		LM_ERR("failed to pass the task to asynk workers\n");
251 270
 		return -1;
... ...
@@ -265,8 +284,8 @@ int async_task_run(int idx)
265 284
 	LM_DBG("async task worker %d ready\n", idx);
266 285
 
267 286
 	for( ; ; ) {
268
-		if(unlikely(_async_task_usleep)) sleep_us(_async_task_usleep);
269
-		if ((received = recvfrom(_async_task_sockets[0],
287
+		if(unlikely(_async_wgroup_list->usleep)) sleep_us(_async_wgroup_list->usleep);
288
+		if ((received = recvfrom(_async_wgroup_list->sockets[0],
270 289
 							&ptask, sizeof(async_task_t*),
271 290
 							0, NULL, 0)) < 0) {
272 291
 			LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno));
... ...
@@ -31,6 +31,15 @@ typedef struct _async_task {
31 31
 	void *param;
32 32
 } async_task_t;
33 33
 
34
+typedef struct _async_wgroup {
35
+	str name;
36
+	int workers;
37
+	int sockets[2];
38
+	int usleep;
39
+	int nonblock;
40
+	struct _async_wgroup *next;
41
+} async_wgroup_t;
42
+
34 43
 int async_task_init(void);
35 44
 int async_task_child_init(int rank);
36 45
 int async_task_initialized(void);