Browse code

Merge dd86ed8d88ad1e25a76534d4820a9dd6f88103a3 into 5e617b5e2130ed96f624e027023329433f8814ee

Stefan Mititelu authored on 19/11/2021 14:15:46 • GitHub committed on 19/11/2021 14:15:46
Showing 3 changed files
... ...
@@ -97,7 +97,7 @@
97 97
 				(str)
98 98
 			</title>
99 99
 			<para>
100
-				The nats url.
100
+				The nats url. More than one url can be configured. For each url, there will be one separate nats connection, either for subscriber or publisher.
101 101
 			</para>
102 102
 			<para>
103 103
 				Usage: nats related.
... ...
@@ -204,4 +204,4 @@ event_route[nats:MyQueue1]
204 204
 	</section>
205 205
 
206 206
 
207
-</chapter>
208 207
\ No newline at end of file
208
+</chapter>
... ...
@@ -107,6 +107,7 @@ void nats_consumer_worker_proc(
107 107
 {
108 108
 	natsStatus s;
109 109
 	bool closed = false;
110
+	int i;
110 111
 
111 112
 	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
112 113
 			worker->subject, worker->queue_group);
... ...
@@ -128,10 +129,6 @@ void nats_consumer_worker_proc(
128 129
 	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
129 130
 	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
130 131
 	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
131
-	s = natsOptions_SetServers(worker->opts, servers, 1);
132
-	if(s != NATS_OK) {
133
-		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
134
-	}
135 132
 	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
136 133
 	if(s != NATS_OK) {
137 134
 		LM_ERR("could not set disconnect callback [%s]\n",
... ...
@@ -153,9 +150,17 @@ void nats_consumer_worker_proc(
153 150
 		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
154 151
 	}
155 152
 
156
-	s = natsConnection_Connect(&worker->conn, worker->opts);
157
-	if(s != NATS_OK) {
158
-		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
153
+	i = 0;
154
+	while (servers[i] != NULL) {
155
+		s = natsOptions_SetServers(worker->opts, &servers[i], 1);
156
+		if(s != NATS_OK) {
157
+			LM_ERR("could not set nats server %s [%s]\n", servers[i], natsStatus_GetText(s));
158
+		}
159
+		s = natsConnection_Connect(&worker->conn[i], worker->opts);
160
+		if(s != NATS_OK) {
161
+			LM_ERR("could not connect %s [%s]\n", servers[i], natsStatus_GetText(s));
162
+		}
163
+		i++;
159 164
 	}
160 165
 	// create a loop
161 166
 	natsLibuv_Init();
... ...
@@ -177,10 +182,14 @@ void nats_consumer_worker_proc(
177 182
 		LM_ERR("error setting options [%s]\n", natsStatus_GetText(s));
178 183
 	}
179 184
 
180
-	s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn,
181
-			worker->subject, worker->queue_group, onMsg, worker->on_message);
182
-	if(s != NATS_OK) {
183
-		LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s));
185
+	i = 0;
186
+	while (servers[i] != NULL) {
187
+		s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn[i],
188
+				worker->subject, worker->queue_group, onMsg, worker->on_message);
189
+		if(s != NATS_OK) {
190
+			LM_ERR("could not subscribe %s [%s]\n", servers[i], natsStatus_GetText(s));
191
+		}
192
+		i++;
184 193
 	}
185 194
 
186 195
 	s = natsSubscription_SetPendingLimits(worker->subscription, -1, -1);
... ...
@@ -367,7 +376,7 @@ int nats_cleanup_init_servers()
367 376
 
368 377
 int nats_destroy_workers()
369 378
 {
370
-	int i;
379
+	int i, j;
371 380
 	int s;
372 381
 	nats_consumer_worker_t *worker;
373 382
 	for(i = 0; i < _nats_proc_count; i++) {
... ...
@@ -377,9 +386,11 @@ int nats_destroy_workers()
377 386
 				natsSubscription_Unsubscribe(worker->subscription);
378 387
 				natsSubscription_Destroy(worker->subscription);
379 388
 			}
380
-			if(worker->conn != NULL) {
381
-				natsConnection_Close(worker->conn);
382
-				natsConnection_Destroy(worker->conn);
389
+			j = 0;
390
+			while (worker->conn[j] != NULL) {
391
+				natsConnection_Close(worker->conn[j]);
392
+				natsConnection_Destroy(worker->conn[j]);
393
+				j++;
383 394
 			}
384 395
 			if(worker->opts != NULL) {
385 396
 				natsOptions_Destroy(worker->opts);
... ...
@@ -66,7 +66,7 @@ struct nats_consumer_worker
66 66
 	char *subject;
67 67
 	char *queue_group;
68 68
 	int pid;
69
-	natsConnection *conn;
69
+	natsConnection *conn[NATS_MAX_SERVERS];
70 70
 	natsOptions *opts;
71 71
 	natsSubscription *subscription;
72 72
 	uv_loop_t *uvLoop;