Browse code

nats: move nats connection handling into a struct in order to extend features

Emmanuel Schmidbauer authored on 19/11/2021 12:24:29
Showing 2 changed files
... ...
@@ -29,6 +29,7 @@ MODULE_VERSION
29 29
 init_nats_sub_ptr _init_nats_sc = NULL;
30 30
 init_nats_server_ptr _init_nats_srv = NULL;
31 31
 nats_consumer_worker_t *nats_workers = NULL;
32
+nats_connection_ptr _nats_connection = NULL;
32 33
 int _nats_proc_count;
33 34
 char *eventData = NULL;
34 35
 
... ...
@@ -103,60 +104,10 @@ static void closedCB(natsConnection *nc, void *closure)
103 104
 }
104 105
 
105 106
 void nats_consumer_worker_proc(
106
-		nats_consumer_worker_t *worker, const char *servers[])
107
+		nats_consumer_worker_t *worker, nats_connection_ptr c)
107 108
 {
108
-	natsStatus s;
109
-	bool closed = false;
110
-
111
-	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
112
-			worker->subject, worker->queue_group);
109
+	natsStatus s = NATS_OK;
113 110
 
114
-	s = natsOptions_Create(&worker->opts);
115
-	if(s != NATS_OK) {
116
-		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
117
-		return;
118
-	}
119
-	// use these defaults
120
-	natsOptions_SetAllowReconnect(worker->opts, true);
121
-	natsOptions_SetSecure(worker->opts, false);
122
-	natsOptions_SetMaxReconnect(worker->opts, 10000);
123
-	natsOptions_SetReconnectWait(worker->opts, 2 * 1000);	  // 2s
124
-	natsOptions_SetPingInterval(worker->opts, 2 * 60 * 1000); // 2m
125
-	natsOptions_SetMaxPingsOut(worker->opts, 2);
126
-	natsOptions_SetIOBufSize(worker->opts, 32 * 1024); // 32 KB
127
-	natsOptions_SetMaxPendingMsgs(worker->opts, 65536);
128
-	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
129
-	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
130
-	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
131
-	s = natsOptions_SetServers(worker->opts, servers, 1);
132
-	if(s != NATS_OK) {
133
-		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
134
-	}
135
-	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
136
-	if(s != NATS_OK) {
137
-		LM_ERR("could not set disconnect callback [%s]\n",
138
-				natsStatus_GetText(s));
139
-	}
140
-	s = natsOptions_SetReconnectedCB(worker->opts, reconnectedCb, NULL);
141
-	if(s != NATS_OK) {
142
-		LM_ERR("could not set reconnect callback [%s]\n",
143
-				natsStatus_GetText(s));
144
-	}
145
-	s = natsOptions_SetRetryOnFailedConnect(
146
-			worker->opts, true, connectedCB, NULL);
147
-	if(s != NATS_OK) {
148
-		LM_ERR("could not set retry on failed callback [%s]\n",
149
-				natsStatus_GetText(s));
150
-	}
151
-	s = natsOptions_SetClosedCB(worker->opts, closedCB, (void *)&closed);
152
-	if(s != NATS_OK) {
153
-		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
154
-	}
155
-
156
-	s = natsConnection_Connect(&worker->conn, worker->opts);
157
-	if(s != NATS_OK) {
158
-		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
159
-	}
160 111
 	// create a loop
161 112
 	natsLibuv_Init();
162 113
 	worker->uvLoop = uv_default_loop();
... ...
@@ -165,8 +116,15 @@ void nats_consumer_worker_proc(
165 116
 	} else {
166 117
 		s = NATS_ERR;
167 118
 	}
119
+	if(s != NATS_OK) {
120
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
121
+	}
122
+	if((s = natsConnection_Connect(&worker->conn, c->opts)) != NATS_OK) {
123
+		LM_ERR("could not connect to nats servers [%s]\n",
124
+				natsStatus_GetText(s));
125
+	}
168 126
 
169
-	s = natsOptions_SetEventLoop(worker->opts, (void *)worker->uvLoop,
127
+	s = natsOptions_SetEventLoop(c->opts, (void *)worker->uvLoop,
170 128
 			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
171 129
 			natsLibuv_Detach);
172 130
 	if(s != NATS_OK) {
... ...
@@ -206,6 +164,11 @@ static int mod_init(void)
206 164
 		return -1;
207 165
 	}
208 166
 	nats_init_environment();
167
+	_nats_connection = _init_nats_connection();
168
+	if(nats_init_connection(_nats_connection) < 0) {
169
+		LM_ERR("failed to init nat connections\n");
170
+		return -1;
171
+	}
209 172
 	register_procs(_nats_proc_count);
210 173
 	nats_workers =
211 174
 			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
... ...
@@ -223,10 +186,6 @@ int init_worker(
223 186
 	int buffsize = strlen(subject) + 6;
224 187
 	char routename[buffsize];
225 188
 	int rt;
226
-	int len;
227
-	char *sc;
228
-	int num_servers = 0;
229
-	init_nats_server_ptr s0;
230 189
 
231 190
 	memset(worker, 0, sizeof(*worker));
232 191
 	worker->subject = shm_malloc(strlen(subject) + 1);
... ...
@@ -235,31 +194,10 @@ int init_worker(
235 194
 	worker->queue_group = shm_malloc(strlen(queue_group) + 1);
236 195
 	strcpy(worker->queue_group, queue_group);
237 196
 	worker->queue_group[strlen(queue_group)] = '\0';
238
-	memset(worker->init_nats_servers, 0, sizeof(worker->init_nats_servers));
239 197
 	worker->on_message =
240 198
 			(nats_on_message_ptr)shm_malloc(sizeof(nats_on_message));
241 199
 	memset(worker->on_message, 0, sizeof(nats_on_message));
242 200
 
243
-	s0 = _init_nats_srv;
244
-	while(s0) {
245
-		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
246
-			len = strlen(s0->url);
247
-			sc = shm_malloc(len + 1);
248
-			if(!sc) {
249
-				LM_ERR("no shm memory left\n");
250
-				return -1;
251
-			}
252
-			strcpy(sc, s0->url);
253
-			sc[len] = '\0';
254
-			worker->init_nats_servers[num_servers++] = sc;
255
-		}
256
-		s0 = s0->next;
257
-	}
258
-	if(num_servers == 0) {
259
-		worker->init_nats_servers[0] = NATS_DEFAULT_URL;
260
-		LM_INFO("using default server [%s]\n", NATS_DEFAULT_URL);
261
-	}
262
-
263 201
 	snprintf(routename, buffsize, "nats:%s", subject);
264 202
 	routename[buffsize] = '\0';
265 203
 
... ...
@@ -273,10 +211,10 @@ int init_worker(
273 211
 	return 0;
274 212
 }
275 213
 
276
-void worker_loop(int id)
214
+void worker_loop(int id, nats_connection_ptr c)
277 215
 {
278 216
 	nats_consumer_worker_t *worker = &nats_workers[id];
279
-	nats_consumer_worker_proc(worker, (const char **)worker->init_nats_servers);
217
+	nats_consumer_worker_proc(worker, c);
280 218
 	for(;;) {
281 219
 		sleep(1000);
282 220
 	}
... ...
@@ -301,12 +239,6 @@ static int mod_child_init(int rank)
301 239
 			n = n->next;
302 240
 			i++;
303 241
 		}
304
-		if(nats_cleanup_init_sub() < 0) {
305
-			LM_INFO("could not cleanup init data\n");
306
-		}
307
-		if(nats_cleanup_init_servers() < 0) {
308
-			LM_INFO("could not cleanup init server data\n");
309
-		}
310 242
 		return 0;
311 243
 	}
312 244
 
... ...
@@ -317,7 +249,7 @@ static int mod_child_init(int rank)
317 249
 				LM_ERR("failed to fork worker process %d\n", i);
318 250
 				return -1;
319 251
 			} else if(newpid == 0) {
320
-				worker_loop(i);
252
+				worker_loop(i, _nats_connection);
321 253
 			} else {
322 254
 				nats_workers[i].pid = newpid;
323 255
 			}
... ...
@@ -348,6 +280,97 @@ int nats_cleanup_init_sub()
348 280
 	return 0;
349 281
 }
350 282
 
283
+int nats_init_connection(nats_connection_ptr c)
284
+{
285
+	natsStatus s = NATS_OK;
286
+	bool closed = false;
287
+	int len;
288
+	char *sc;
289
+	int num_servers = 0;
290
+	init_nats_server_ptr s0;
291
+
292
+	s0 = _init_nats_srv;
293
+	while(s0) {
294
+		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
295
+			len = strlen(s0->url);
296
+			sc = shm_malloc(len + 1);
297
+			if(!sc) {
298
+				LM_ERR("no shm memory left\n");
299
+				return -1;
300
+			}
301
+			strcpy(sc, s0->url);
302
+			sc[len] = '\0';
303
+			c->servers[num_servers++] = sc;
304
+			LM_INFO("adding server [%s] [%d]\n", sc, num_servers);
305
+		}
306
+		s0 = s0->next;
307
+	}
308
+	if(num_servers == 0) {
309
+		len = strlen(NATS_DEFAULT_URL);
310
+		sc = shm_malloc(len + 1);
311
+		if(!sc) {
312
+			LM_ERR("no shm memory left\n");
313
+			return -1;
314
+		}
315
+		strcpy(sc, NATS_DEFAULT_URL);
316
+		sc[len] = '\0';
317
+		c->servers[0] = sc;
318
+		LM_INFO("using default server [%s]\n", sc);
319
+	}
320
+
321
+	// nats create options
322
+	if((s = natsOptions_Create(&c->opts)) != NATS_OK) {
323
+		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
324
+		return -1;
325
+	}
326
+
327
+	// use these defaults
328
+	natsOptions_SetAllowReconnect(c->opts, true);
329
+	natsOptions_SetSecure(c->opts, false);
330
+	natsOptions_SetMaxReconnect(c->opts, 10000);
331
+	natsOptions_SetReconnectWait(c->opts, 2 * 1000);	 // 2s
332
+	natsOptions_SetPingInterval(c->opts, 2 * 60 * 1000); // 2m
333
+	natsOptions_SetMaxPingsOut(c->opts, 2);
334
+	natsOptions_SetIOBufSize(c->opts, 32 * 1024); // 32 KB
335
+	natsOptions_SetMaxPendingMsgs(c->opts, 65536);
336
+	natsOptions_SetTimeout(c->opts, 2 * 1000);				   // 2s
337
+	natsOptions_SetReconnectBufSize(c->opts, 8 * 1024 * 1024); // 8 MB;
338
+	natsOptions_SetReconnectJitter(c->opts, 100, 1000);		   // 100ms, 1s;
339
+
340
+	// nats set servers and options
341
+	if((s = natsOptions_SetServers(
342
+				c->opts, (const char **)c->servers, num_servers))
343
+			!= NATS_OK) {
344
+		LM_ERR("could not set nats server[%s]\n", natsStatus_GetText(s));
345
+		return -1;
346
+	}
347
+
348
+	// nats set callbacks
349
+	s = natsOptions_SetDisconnectedCB(c->opts, disconnectedCb, NULL);
350
+	if(s != NATS_OK) {
351
+		LM_ERR("could not set disconnect callback [%s]\n",
352
+				natsStatus_GetText(s));
353
+	}
354
+
355
+	s = natsOptions_SetReconnectedCB(c->opts, reconnectedCb, NULL);
356
+	if(s != NATS_OK) {
357
+		LM_ERR("could not set reconnect callback [%s]\n",
358
+				natsStatus_GetText(s));
359
+	}
360
+
361
+	s = natsOptions_SetRetryOnFailedConnect(c->opts, true, connectedCB, NULL);
362
+	if(s != NATS_OK) {
363
+		LM_ERR("could not set retry on failed callback [%s]\n",
364
+				natsStatus_GetText(s));
365
+	}
366
+
367
+	s = natsOptions_SetClosedCB(c->opts, closedCB, (void *)&closed);
368
+	if(s != NATS_OK) {
369
+		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
370
+	}
371
+	return 0;
372
+}
373
+
351 374
 int nats_cleanup_init_servers()
352 375
 {
353 376
 	init_nats_server_ptr s0;
... ...
@@ -358,17 +381,35 @@ int nats_cleanup_init_servers()
358 381
 		if(s0->url != NULL) {
359 382
 			shm_free(s0->url);
360 383
 		}
384
+
361 385
 		shm_free(s0);
362 386
 		s0 = s1;
363 387
 	}
388
+
389
+	// To silence reports of memory still in used with valgrind
390
+	nats_Close();
391
+
364 392
 	_init_nats_srv = NULL;
365 393
 	return 0;
366 394
 }
367 395
 
396
+int nats_cleanup_connection(nats_connection_ptr c)
397
+{
398
+	if(c->opts != NULL) {
399
+		natsOptions_Destroy(c->opts);
400
+	}
401
+	for(int s = 0; s < NATS_MAX_SERVERS; s++) {
402
+		if(c->servers[s]) {
403
+			shm_free(c->servers[s]);
404
+		}
405
+	}
406
+	shm_free(c);
407
+	return 0;
408
+}
409
+
368 410
 int nats_destroy_workers()
369 411
 {
370 412
 	int i;
371
-	int s;
372 413
 	nats_consumer_worker_t *worker;
373 414
 	for(i = 0; i < _nats_proc_count; i++) {
374 415
 		worker = &nats_workers[i];
... ...
@@ -381,13 +422,9 @@ int nats_destroy_workers()
381 422
 				natsConnection_Close(worker->conn);
382 423
 				natsConnection_Destroy(worker->conn);
383 424
 			}
384
-			if(worker->opts != NULL) {
385
-				natsOptions_Destroy(worker->opts);
386
-			}
387 425
 			if(worker->uvLoop != NULL) {
388 426
 				uv_loop_close(worker->uvLoop);
389 427
 			}
390
-			nats_Close();
391 428
 			if(worker->subject != NULL) {
392 429
 				shm_free(worker->subject);
393 430
 			}
... ...
@@ -397,11 +434,6 @@ int nats_destroy_workers()
397 434
 			if(worker->on_message != NULL) {
398 435
 				shm_free(worker->on_message);
399 436
 			}
400
-			for(s = 0; s < NATS_MAX_SERVERS; s++) {
401
-				if(worker->init_nats_servers[s]) {
402
-					shm_free(worker->init_nats_servers[s]);
403
-				}
404
-			}
405 437
 			shm_free(worker);
406 438
 		}
407 439
 	}
... ...
@@ -416,6 +448,15 @@ static void mod_destroy(void)
416 448
 	if(nats_destroy_workers() < 0) {
417 449
 		LM_ERR("could not cleanup workers\n");
418 450
 	}
451
+	if(nats_cleanup_init_sub() < 0) {
452
+		LM_INFO("could not cleanup init data\n");
453
+	}
454
+	if(nats_cleanup_connection(_nats_connection) < 0) {
455
+		LM_INFO("could not cleanup connection\n");
456
+	}
457
+	if(nats_cleanup_init_servers() < 0) {
458
+		LM_INFO("could not cleanup init server data\n");
459
+	}
419 460
 }
420 461
 
421 462
 int _init_nats_server_url_add(modparam_t type, void *val)
... ...
@@ -511,6 +552,7 @@ init_nats_server_ptr _init_nats_server_list_new(char *url)
511 552
 	return p;
512 553
 }
513 554
 
555
+
514 556
 int init_nats_server_url_add(char *url)
515 557
 {
516 558
 	init_nats_server_ptr n;
... ...
@@ -524,6 +566,14 @@ int init_nats_server_url_add(char *url)
524 566
 	return 0;
525 567
 }
526 568
 
569
+nats_connection_ptr _init_nats_connection()
570
+{
571
+	nats_connection_ptr p =
572
+			(nats_connection_ptr)shm_malloc(sizeof(nats_connection));
573
+	memset(p, 0, sizeof(nats_connection));
574
+	return p;
575
+}
576
+
527 577
 init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group)
528 578
 {
529 579
 	init_nats_sub_ptr p = (init_nats_sub_ptr)shm_malloc(sizeof(init_nats_sub));
... ...
@@ -61,17 +61,21 @@ typedef struct _nats_on_message
61 61
 	int rt;
62 62
 } nats_on_message, *nats_on_message_ptr;
63 63
 
64
+typedef struct _nats_connection
65
+{
66
+	natsOptions *opts;
67
+	char *servers[NATS_MAX_SERVERS];
68
+} nats_connection, *nats_connection_ptr;
69
+
64 70
 struct nats_consumer_worker
65 71
 {
66 72
 	char *subject;
67 73
 	char *queue_group;
68 74
 	int pid;
69 75
 	natsConnection *conn;
70
-	natsOptions *opts;
71 76
 	natsSubscription *subscription;
72 77
 	uv_loop_t *uvLoop;
73 78
 	nats_on_message_ptr on_message;
74
-	char *init_nats_servers[NATS_MAX_SERVERS];
75 79
 };
76 80
 typedef struct nats_consumer_worker nats_consumer_worker_t;
77 81
 
... ...
@@ -86,14 +90,17 @@ int _init_nats_server_url_add(modparam_t type, void *val);
86 90
 init_nats_server_ptr _init_nats_server_list_new(char *url);
87 91
 int init_nats_server_url_add(char *url);
88 92
 int nats_cleanup_init_servers();
93
+int nats_init_connection(nats_connection_ptr c);
94
+int nats_cleanup_connection(nats_connection_ptr c);
89 95
 
90 96
 int _init_nats_sub_add(modparam_t type, void *val);
97
+nats_connection_ptr _init_nats_connection();
91 98
 init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group);
92 99
 int init_nats_sub_add(char *sub);
93 100
 int nats_cleanup_init_sub();
94 101
 
95 102
 void nats_consumer_worker_proc(
96
-		nats_consumer_worker_t *worker, const char *init_nats_servers[]);
103
+		nats_consumer_worker_t *worker, nats_connection_ptr c);
97 104
 int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);
98 105
 
99 106
 #endif