Browse code

core: framework for creating asynchronous pool of workers

- dedicated group of processes that can get tasks from other processes
via memory pipe
- react immediately, no time based polling
- should reduce the need for other components to create extra processes
for special handling

Daniel-Constantin Mierla authored on 14/04/2014 12:13:36
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,220 @@
0
+/**
1
+ * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
2
+ *
3
+ * This file is part of Extensible SIP Router, a free SIP server.
4
+ *
5
+ * Permission to use, copy, modify, and distribute this software for any
6
+ * purpose with or without fee is hereby granted, provided that the above
7
+ * copyright notice and this permission notice appear in all copies.
8
+ *
9
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
+ */
17
+
18
+#include <stdio.h>
19
+#include <unistd.h>
20
+#include <stdlib.h>
21
+#include <string.h>
22
+
23
+#include <sys/socket.h>
24
+#include <sys/types.h>
25
+#include <sys/un.h>
26
+#include <netinet/in.h>
27
+#include <arpa/inet.h>
28
+#include <fcntl.h>
29
+#include <errno.h>
30
+
31
+#include "dprint.h"
32
+#include "sr_module.h"
33
+#include "ut.h"
34
+#include "pt.h"
35
+#include "cfg/cfg_struct.h"
36
+
37
+
38
+#include "async_task.h"
39
+
40
+static int _async_task_workers = 0;
41
+static int _async_task_sockets[2];
42
+
43
+int async_task_run(int idx);
44
+
45
+/**
46
+ *
47
+ */
48
+int async_task_init_sockets(void)
49
+{
50
+	if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_task_sockets) < 0) {
51
+		LM_ERR("opening tasks dgram socket pair\n");
52
+		return -1;
53
+	}
54
+	LM_DBG("inter-process event notification sockets initialized\n");
55
+	return 0;
56
+}
57
+
58
+/**
59
+ *
60
+ */
61
+void async_task_close_sockets_child(void)
62
+{
63
+	LM_DBG("closing the notification socket used by children\n");
64
+	close(_async_task_sockets[1]);
65
+}
66
+
67
+/**
68
+ *
69
+ */
70
+void async_task_close_sockets_parent(void)
71
+{
72
+	LM_DBG("closing the notification socket used by parent\n");
73
+	close(_async_task_sockets[0]);
74
+}
75
+
76
+/**
77
+ *
78
+ */
79
+int async_task_init(void)
80
+{
81
+	LM_DBG("start initializing asynk task framework\n");
82
+	if(_async_task_workers<=0)
83
+		return 0;
84
+
85
+	/* advertise new processes to core */
86
+	register_procs(_async_task_workers);
87
+
88
+	/* advertise new processes to cfg framework */
89
+	cfg_register_child(_async_task_workers);
90
+
91
+	return 0;
92
+}
93
+
94
+/**
95
+ *
96
+ */
97
+int async_task_initialized(void)
98
+{
99
+	if(_async_task_workers<=0)
100
+		return 0;
101
+	return 1;
102
+}
103
+
104
+/**
105
+ *
106
+ */
107
+int async_task_child_init(int rank)
108
+{
109
+	int pid;
110
+	int i;
111
+
112
+	LM_DBG("child initializing asynk task framework\n");
113
+
114
+	if(_async_task_workers<=0)
115
+		return 0;
116
+
117
+	if (rank==PROC_INIT) {
118
+		if(async_task_init_sockets()<0) {
119
+			LM_ERR("failed to initialize tasks sockets\n");
120
+			return -1;
121
+		}
122
+		return 0;
123
+	}
124
+
125
+	if(rank>0) {
126
+		async_task_close_sockets_parent();
127
+		return 0;
128
+	}
129
+	if (rank!=PROC_MAIN)
130
+		return 0;
131
+
132
+	for(i=0; i<_async_task_workers; i++) {
133
+		pid=fork_process(PROC_RPC, "Async Task Worker", 1);
134
+		if (pid<0)
135
+			return -1; /* error */
136
+		if(pid==0) {
137
+			/* child */
138
+
139
+			/* initialize the config framework */
140
+			if (cfg_child_init())
141
+				return -1;
142
+			/* main function for workers */
143
+			if(async_task_run(i+1)<0) {
144
+				LM_ERR("failed to initialize task worker process: %d\n", i);
145
+				return -1;
146
+			}
147
+		}
148
+	}
149
+
150
+	return 0;
151
+}
152
+
153
+/**
154
+ *
155
+ */
156
+int async_task_set_workers(int n)
157
+{
158
+	if(_async_task_workers>0) {
159
+		LM_WARN("task workers already set\n");
160
+		return 0;
161
+	}
162
+	if(n<=0)
163
+		return 0;
164
+
165
+	_async_task_workers = n;
166
+
167
+	return 0;
168
+}
169
+
170
+/**
171
+ *
172
+ */
173
+int async_task_push(async_task_t *task)
174
+{
175
+	int len;
176
+
177
+	if(_async_task_workers<=0)
178
+		return 0;
179
+
180
+	len = write(_async_task_sockets[1], &task, sizeof(async_task_t*));
181
+	if(len<=0) {
182
+		LM_ERR("failed to pass the task to asynk workers\n");
183
+		return -1;
184
+	}
185
+	LM_DBG("task sent [%p]\n", task);
186
+	return 0;
187
+}
188
+
189
+/**
190
+ *
191
+ */
192
+int async_task_run(int idx)
193
+{
194
+	async_task_t *ptask;
195
+	int received;
196
+
197
+	LM_DBG("async task worker %d ready\n", idx);
198
+
199
+	for( ; ; ) {
200
+		if ((received = recvfrom(_async_task_sockets[0],
201
+							&ptask, sizeof(async_task_t*),
202
+							0, NULL, 0)) < 0) {
203
+			LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno));
204
+			continue;
205
+		}
206
+		if(received != sizeof(async_task_t*)) {
207
+			LM_ERR("invalid task size %d\n", received);
208
+			continue;
209
+		}
210
+		if(ptask->exec!=NULL) {
211
+			LM_DBG("task executed [%p] (%p/%p)\n", ptask,
212
+					ptask->exec, ptask->param);
213
+			ptask->exec(ptask->param);
214
+		}
215
+		shm_free(ptask);
216
+	}
217
+
218
+	return 0;
219
+}
0 220
new file mode 100644
... ...
@@ -0,0 +1,36 @@
0
+/**
1
+ * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
2
+ *
3
+ * This file is part of Extensible SIP Router, a free SIP server.
4
+ *
5
+ * Permission to use, copy, modify, and distribute this software for any
6
+ * purpose with or without fee is hereby granted, provided that the above
7
+ * copyright notice and this permission notice appear in all copies.
8
+ *
9
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
+ */
17
+
18
+
19
+#ifndef _ASYNC_TASK_H_
20
+#define _ASYNC_TASK_H_
21
+
22
+typedef void (*async_cbe_t)(void *p);
23
+
24
+typedef struct _async_task {
25
+	async_cbe_t exec;
26
+	void *param;
27
+} async_task_t;
28
+
29
+int async_task_init(void);
30
+int async_task_child_init(int rank);
31
+int async_task_initialized(void);
32
+int async_task_set_workers(int n);
33
+int async_task_push(async_task_t *task);
34
+
35
+#endif