... | ... |
@@ -27,6 +27,7 @@ |
27 | 27 |
|
28 | 28 |
#include <nats/nats.h> |
29 | 29 |
#include <uv.h> |
30 |
+#include "../../core/str.h" |
|
30 | 31 |
|
31 | 32 |
#define NATS_DEFAULT_URL "nats://localhost:4222" |
32 | 33 |
#define NATS_MAX_SERVERS 10 |
... | ... |
@@ -63,6 +64,8 @@ typedef struct _init_nats_server |
63 | 64 |
typedef struct _nats_on_message |
64 | 65 |
{ |
65 | 66 |
int rt; |
67 |
+ char *_evname; |
|
68 |
+ str evname; |
|
66 | 69 |
} nats_on_message, *nats_on_message_ptr; |
67 | 70 |
|
68 | 71 |
struct nats_consumer_worker |
... | ... |
@@ -173,6 +173,37 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020") |
173 | 173 |
modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject |
174 | 174 |
modparam("nats", "subject_queue_group", "MyQueue1:2021") |
175 | 175 |
modparam("nats", "subject_queue_group", "MyQueue2:2021") |
176 |
+... |
|
177 |
+ </programlisting> |
|
178 |
+ </example> |
|
179 |
+ </section> |
|
180 |
+ <section> |
|
181 |
+ <title> |
|
182 |
+ <varname>event_callback</varname> |
|
183 |
+ (str) |
|
184 |
+ </title> |
|
185 |
+ <para> |
|
186 |
+ Name of the KEMI function to be executed instead of the event route. |
|
187 |
+ </para> |
|
188 |
+ <para> |
|
189 |
+ <emphasis>Default value is not set.</emphasis> |
|
190 |
+ </para> |
|
191 |
+ <example> |
|
192 |
+ <title> |
|
193 |
+ Set |
|
194 |
+ <varname>event_callback</varname> |
|
195 |
+ parameter |
|
196 |
+ </title> |
|
197 |
+ <programlisting format="linespecific"> |
|
198 |
+... |
|
199 |
+modparam("nats", "event_callback", "ksr_nats_event") |
|
200 |
+ |
|
201 |
+-- event callback function implemented in Lua |
|
202 |
+function ksr_nats_event(evname) |
|
203 |
+ KSR.info("===== nats module received event: " .. evname .. |
|
204 |
+ ", data:" .. KSR.pv.gete('$natsData') .. "\n"); |
|
205 |
+ return 1; |
|
206 |
+end |
|
176 | 207 |
... |
177 | 208 |
</programlisting> |
178 | 209 |
</example> |
... | ... |
@@ -254,4 +285,4 @@ event_route[nats:MyQueue1] |
254 | 285 |
</section> |
255 | 286 |
|
256 | 287 |
|
257 |
-</chapter> |
|
258 | 288 |
\ No newline at end of file |
289 |
+</chapter> |
... | ... |
@@ -24,6 +24,8 @@ |
24 | 24 |
|
25 | 25 |
#include "defs.h" |
26 | 26 |
#include "nats_mod.h" |
27 |
+#include "nats_pub.h" |
|
28 |
+#include "../../core/kemi.h" |
|
27 | 29 |
|
28 | 30 |
MODULE_VERSION |
29 | 31 |
|
... | ... |
@@ -38,6 +40,7 @@ char *eventData = NULL; |
38 | 40 |
|
39 | 41 |
int *nats_pub_worker_pipes_fds = NULL; |
40 | 42 |
int *nats_pub_worker_pipes = NULL; |
43 |
+static str nats_event_callback = STR_NULL; |
|
41 | 44 |
|
42 | 45 |
static nats_evroutes_t _nats_rts; |
43 | 46 |
|
... | ... |
@@ -50,7 +53,10 @@ static param_export_t params[] = { |
50 | 53 |
{"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add}, |
51 | 54 |
{"num_publish_workers", INT_PARAM, &nats_pub_workers_num}, |
52 | 55 |
{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM, |
53 |
- (void *)_init_nats_sub_add}}; |
|
56 |
+ (void *)_init_nats_sub_add}, |
|
57 |
+ {"event_callback", PARAM_STR, &nats_event_callback}, |
|
58 |
+ {0, 0, 0} |
|
59 |
+}; |
|
54 | 60 |
|
55 | 61 |
static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f, |
56 | 62 |
2, fixup_publish_get_value, |
... | ... |
@@ -73,16 +79,9 @@ static void onMsg( |
73 | 79 |
natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) |
74 | 80 |
{ |
75 | 81 |
nats_on_message_ptr on_message = (nats_on_message_ptr)closure; |
76 |
- char *s = (char *)natsMsg_GetSubject(msg); |
|
77 | 82 |
char *data = (char *)natsMsg_GetData(msg); |
78 |
- if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) { |
|
79 |
- LM_INFO("event-route [nats:%s] does not exist\n", s); |
|
80 |
- goto end; |
|
81 |
- } |
|
82 | 83 |
eventData = data; |
83 |
- nats_run_cfg_route(on_message->rt); |
|
84 |
- |
|
85 |
-end: |
|
84 |
+ nats_run_cfg_route(on_message->rt, &on_message->evname); |
|
86 | 85 |
eventData = NULL; |
87 | 86 |
natsMsg_Destroy(msg); |
88 | 87 |
} |
... | ... |
@@ -90,22 +89,25 @@ end: |
90 | 89 |
static void connectedCB(natsConnection *nc, void *closure) |
91 | 90 |
{ |
92 | 91 |
char url[NATS_URL_MAX_SIZE]; |
92 |
+ str evname = str_init("nats:connected"); |
|
93 | 93 |
natsConnection_GetConnectedUrl(nc, url, sizeof(url)); |
94 |
- nats_run_cfg_route(_nats_rts.connected); |
|
94 |
+ nats_run_cfg_route(_nats_rts.connected, &evname); |
|
95 | 95 |
} |
96 | 96 |
|
97 | 97 |
static void disconnectedCb(natsConnection *nc, void *closure) |
98 | 98 |
{ |
99 | 99 |
char url[NATS_URL_MAX_SIZE]; |
100 |
+ str evname = str_init("nats:disconnected"); |
|
100 | 101 |
natsConnection_GetConnectedUrl(nc, url, sizeof(url)); |
101 |
- nats_run_cfg_route(_nats_rts.disconnected); |
|
102 |
+ nats_run_cfg_route(_nats_rts.disconnected, &evname); |
|
102 | 103 |
} |
103 | 104 |
|
104 | 105 |
static void reconnectedCb(natsConnection *nc, void *closure) |
105 | 106 |
{ |
106 | 107 |
char url[NATS_URL_MAX_SIZE]; |
108 |
+ str evname = str_init("nats:connected"); |
|
107 | 109 |
natsConnection_GetConnectedUrl(nc, url, sizeof(url)); |
108 |
- nats_run_cfg_route(_nats_rts.connected); |
|
110 |
+ nats_run_cfg_route(_nats_rts.connected, &evname); |
|
109 | 111 |
} |
110 | 112 |
|
111 | 113 |
static void closedCB(natsConnection *nc, void *closure) |
... | ... |
@@ -247,9 +249,13 @@ int init_worker( |
247 | 249 |
if(rt < 0 || event_rt.rlist[rt] == NULL) { |
248 | 250 |
LM_INFO("route [%s] does not exist\n", routename); |
249 | 251 |
worker->on_message->rt = -1; |
250 |
- return 0; |
|
252 |
+ } else { |
|
253 |
+ worker->on_message->rt = rt; |
|
251 | 254 |
} |
252 |
- worker->on_message->rt = rt; |
|
255 |
+ worker->on_message->_evname = malloc(buffsize); |
|
256 |
+ strcpy(worker->on_message->_evname, routename); |
|
257 |
+ worker->on_message->evname.s = worker->on_message->_evname; |
|
258 |
+ worker->on_message->evname.len = strlen(worker->on_message->_evname); |
|
253 | 259 |
worker->nc = nc; |
254 | 260 |
return 0; |
255 | 261 |
} |
... | ... |
@@ -565,6 +571,9 @@ int nats_destroy_workers() |
565 | 571 |
} |
566 | 572 |
} |
567 | 573 |
if(worker->on_message != NULL) { |
574 |
+ if (worker->on_message->_evname) { |
|
575 |
+ free(worker->on_message->_evname); |
|
576 |
+ } |
|
568 | 577 |
shm_free(worker->on_message); |
569 | 578 |
} |
570 | 579 |
shm_free(worker); |
... | ... |
@@ -657,15 +666,18 @@ int _init_nats_sub_add(modparam_t type, void *val) |
657 | 666 |
/** |
658 | 667 |
* Invoke a event route block |
659 | 668 |
*/ |
660 |
-int nats_run_cfg_route(int rt) |
|
669 |
+int nats_run_cfg_route(int rt, str *evname) |
|
661 | 670 |
{ |
662 | 671 |
struct run_act_ctx ctx; |
672 |
+ sr_kemi_eng_t *keng = NULL; |
|
663 | 673 |
sip_msg_t *fmsg; |
664 | 674 |
sip_msg_t tmsg; |
665 | 675 |
|
676 |
+ keng = sr_kemi_eng_get(); |
|
677 |
+ |
|
666 | 678 |
// check for valid route pointer |
667 |
- if(rt < 0) { |
|
668 |
- return 0; |
|
679 |
+ if(rt < 0 || !event_rt.rlist[rt]) { |
|
680 |
+ if (keng == NULL) return 0; |
|
669 | 681 |
} |
670 | 682 |
|
671 | 683 |
fmsg = faked_msg_next(); |
... | ... |
@@ -673,6 +685,13 @@ int nats_run_cfg_route(int rt) |
673 | 685 |
fmsg = &tmsg; |
674 | 686 |
set_route_type(EVENT_ROUTE); |
675 | 687 |
init_run_actions_ctx(&ctx); |
688 |
+ if (rt < 0 && keng) { |
|
689 |
+ if (sr_kemi_route(keng, fmsg, EVENT_ROUTE, |
|
690 |
+ &nats_event_callback, evname) < 0) { |
|
691 |
+ LM_ERR("error running event route kemi callback\n"); |
|
692 |
+ } |
|
693 |
+ return 0; |
|
694 |
+ } |
|
676 | 695 |
run_top_route(event_rt.rlist[rt], fmsg, 0); |
677 | 696 |
return 0; |
678 | 697 |
} |
... | ... |
@@ -791,3 +810,35 @@ int nats_pv_get_event_payload( |
791 | 810 |
return eventData == NULL ? pv_get_null(msg, param, res) |
792 | 811 |
: pv_get_strzval(msg, param, res, eventData); |
793 | 812 |
} |
813 |
+ |
|
814 |
+/** |
|
815 |
+ * |
|
816 |
+ */ |
|
817 |
+int ki_nats_publish(sip_msg_t *msg, str *subject, str *payload) |
|
818 |
+{ |
|
819 |
+ return w_nats_publish(msg, *subject, *payload); |
|
820 |
+} |
|
821 |
+ |
|
822 |
+/** |
|
823 |
+ * |
|
824 |
+ */ |
|
825 |
+/* clang-format off */ |
|
826 |
+static sr_kemi_t sr_kemi_nats_exports[] = { |
|
827 |
+ { str_init("nats"), str_init("publish"), |
|
828 |
+ SR_KEMIP_INT, ki_nats_publish, |
|
829 |
+ { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE, |
|
830 |
+ SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } |
|
831 |
+ }, |
|
832 |
+ |
|
833 |
+ { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } } |
|
834 |
+}; |
|
835 |
+/* clang-format on */ |
|
836 |
+ |
|
837 |
+/** |
|
838 |
+ * |
|
839 |
+ */ |
|
840 |
+int mod_register(char *path, int *dlflags, void *p1, void *p2) |
|
841 |
+{ |
|
842 |
+ sr_kemi_modules_add(sr_kemi_nats_exports); |
|
843 |
+ return 0; |
|
844 |
+} |
... | ... |
@@ -40,7 +40,7 @@ extern int fixup_publish_get_value(void **param, int param_no); |
40 | 40 |
extern int fixup_publish_get_value_free(void **param, int param_no); |
41 | 41 |
extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events); |
42 | 42 |
|
43 |
-int nats_run_cfg_route(int rt); |
|
43 |
+int nats_run_cfg_route(int rt, str *evname); |
|
44 | 44 |
void nats_init_environment(); |
45 | 45 |
|
46 | 46 |
int _init_nats_server_url_add(modparam_t type, void *val); |
... | ... |
@@ -90,6 +90,11 @@ int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload) |
90 | 90 |
return -1; |
91 | 91 |
} |
92 | 92 |
|
93 |
+ return w_nats_publish(msg, subj_s, payload_s); |
|
94 |
+} |
|
95 |
+ |
|
96 |
+int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s) |
|
97 |
+{ |
|
93 | 98 |
// round-robin pub workers |
94 | 99 |
pub_worker++; |
95 | 100 |
if(pub_worker >= nats_pub_workers_num) { |
... | ... |
@@ -38,6 +38,7 @@ typedef struct _nats_pub_delivery |
38 | 38 |
nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload); |
39 | 39 |
void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr); |
40 | 40 |
int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload); |
41 |
+int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s); |
|
41 | 42 |
int fixup_publish_get_value(void **param, int param_no); |
42 | 43 |
int fixup_publish_get_value_free(void **param, int param_no); |
43 | 44 |
|