Browse code

nats: move nats connection handling into a struct in order to extend features

Emmanuel Schmidbauer authored on 19/11/2021 12:24:29
Showing 1 changed files
... ...
@@ -61,17 +61,21 @@ typedef struct _nats_on_message
61 61
 	int rt;
62 62
 } nats_on_message, *nats_on_message_ptr;
63 63
 
64
+typedef struct _nats_connection
65
+{
66
+	natsOptions *opts;
67
+	char *servers[NATS_MAX_SERVERS];
68
+} nats_connection, *nats_connection_ptr;
69
+
64 70
 struct nats_consumer_worker
65 71
 {
66 72
 	char *subject;
67 73
 	char *queue_group;
68 74
 	int pid;
69 75
 	natsConnection *conn;
70
-	natsOptions *opts;
71 76
 	natsSubscription *subscription;
72 77
 	uv_loop_t *uvLoop;
73 78
 	nats_on_message_ptr on_message;
74
-	char *init_nats_servers[NATS_MAX_SERVERS];
75 79
 };
76 80
 typedef struct nats_consumer_worker nats_consumer_worker_t;
77 81
 
... ...
@@ -86,14 +90,17 @@ int _init_nats_server_url_add(modparam_t type, void *val);
86 90
 init_nats_server_ptr _init_nats_server_list_new(char *url);
87 91
 int init_nats_server_url_add(char *url);
88 92
 int nats_cleanup_init_servers();
93
+int nats_init_connection(nats_connection_ptr c);
94
+int nats_cleanup_connection(nats_connection_ptr c);
89 95
 
90 96
 int _init_nats_sub_add(modparam_t type, void *val);
97
+nats_connection_ptr _init_nats_connection();
91 98
 init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group);
92 99
 int init_nats_sub_add(char *sub);
93 100
 int nats_cleanup_init_sub();
94 101
 
95 102
 void nats_consumer_worker_proc(
96
-		nats_consumer_worker_t *worker, const char *init_nats_servers[]);
103
+		nats_consumer_worker_t *worker, nats_connection_ptr c);
97 104
 int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);
98 105
 
99 106
 #endif
Browse code

nats: new nats message consumer module

Emmanuel Schmidbauer authored on 27/06/2021 11:58:58
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,99 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#ifndef __NATS_MOD_H_
26
+#define __NATS_MOD_H_
27
+
28
+#include <stdio.h>
29
+#include <nats/nats.h>
30
+#include <nats/adapters/libuv.h>
31
+#include "../json/api.h"
32
+#include "../../core/cfg/cfg_struct.h"
33
+#include "../../core/fmsg.h"
34
+
35
+#define NATS_DEFAULT_URL "nats://localhost:4222"
36
+#define NATS_MAX_SERVERS 10
37
+#define NATS_URL_MAX_SIZE 256
38
+
39
+typedef struct _nats_evroutes
40
+{
41
+	int connected;
42
+	int disconnected;
43
+} nats_evroutes_t;
44
+static nats_evroutes_t _nats_rts;
45
+
46
+typedef struct _init_nats_sub
47
+{
48
+	char *sub;
49
+	char *queue_group;
50
+	struct _init_nats_sub *next;
51
+} init_nats_sub, *init_nats_sub_ptr;
52
+
53
+typedef struct _init_nats_server
54
+{
55
+	char *url;
56
+	struct _init_nats_server *next;
57
+} init_nats_server, *init_nats_server_ptr;
58
+
59
+typedef struct _nats_on_message
60
+{
61
+	int rt;
62
+} nats_on_message, *nats_on_message_ptr;
63
+
64
+struct nats_consumer_worker
65
+{
66
+	char *subject;
67
+	char *queue_group;
68
+	int pid;
69
+	natsConnection *conn;
70
+	natsOptions *opts;
71
+	natsSubscription *subscription;
72
+	uv_loop_t *uvLoop;
73
+	nats_on_message_ptr on_message;
74
+	char *init_nats_servers[NATS_MAX_SERVERS];
75
+};
76
+typedef struct nats_consumer_worker nats_consumer_worker_t;
77
+
78
+static int mod_init(void);
79
+static int mod_child_init(int);
80
+static void mod_destroy(void);
81
+
82
+int nats_run_cfg_route(int rt);
83
+void nats_init_environment();
84
+
85
+int _init_nats_server_url_add(modparam_t type, void *val);
86
+init_nats_server_ptr _init_nats_server_list_new(char *url);
87
+int init_nats_server_url_add(char *url);
88
+int nats_cleanup_init_servers();
89
+
90
+int _init_nats_sub_add(modparam_t type, void *val);
91
+init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group);
92
+int init_nats_sub_add(char *sub);
93
+int nats_cleanup_init_sub();
94
+
95
+void nats_consumer_worker_proc(
96
+		nats_consumer_worker_t *worker, const char *init_nats_servers[]);
97
+int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);
98
+
99
+#endif