Browse code

kazoo: add basic kemi support

Author Yufei Tao authored on 11/05/2020 13:48:32
Showing 4 changed files
... ...
@@ -297,6 +297,28 @@ modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
297 297
         </example>
298 298
     </section>
299 299
 
300
+<section>
301
+  <title><varname>event_callback</varname>(str)</title>
302
+    <para>
303
+        The name of the function in the kemi configuration file (embedded
304
+        scripting language such as Lua, Python, ...) to be executed instead
305
+        of event_route[...] blocks.
306
+    </para>
307
+    <para>
308
+        The function receives a string parameter with the name of the event,
309
+        the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.
310
+    </para>
311
+
312
+  <example>
313
+    <title>Set <varname>event_callback</varname> parameter</title>
314
+    <programlisting format="linespecific">
315
+    ...
316
+    modparam("kazoo", "event_callback", "ksr_kazoo_event")
317
+    ...
318
+    </programlisting>
319
+  </example>
320
+</section>
321
+
300 322
     
301 323
 
302 324
     
... ...
@@ -31,6 +31,7 @@
31 31
 #include "../../lib/srdb1/db.h"
32 32
 #include "../../core/dprint.h"
33 33
 #include "../../core/cfg/cfg_struct.h"
34
+#include "../../core/kemi.h"
34 35
 
35 36
 #include "kz_amqp.h"
36 37
 #include "kz_json.h"
... ...
@@ -46,6 +47,8 @@
46 46
 static int mod_init(void);
47 47
 static int  mod_child_init(int rank);
48 48
 static int fire_init_event(int rank);
49
+static int fire_init_event_cfg(void);
50
+static int fire_init_event_kemi(void);
49 51
 static void mod_destroy(void);
50 52
 
51 53
 str dbk_node_hostname = { 0, 0 };
... ...
@@ -122,6 +125,9 @@ pv_spec_t kz_query_result_spec;
122 122
 
123 123
 str kz_app_name = str_init(NAME);
124 124
 
125
+str kazoo_event_callback = STR_NULL;
126
+int kazoo_kemi_enabled=0;
127
+
125 128
 MODULE_VERSION
126 129
 
127 130
 static tr_export_t mod_trans[] = {
... ...
@@ -209,6 +215,7 @@ static param_export_t params[] = {
209 209
 	{"pua_lock_type", INT_PARAM, &kz_pua_lock_type},
210 210
     {"amqp_connect_timeout_micro", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_usec},
211 211
     {"amqp_connect_timeout_sec", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_sec},
212
+    {"event_callback",  PARAM_STR,    &kazoo_event_callback},
212 213
     {0, 0, 0}
213 214
 };
214 215
 
... ...
@@ -273,6 +280,7 @@ static int mod_init(void) {
273 273
    		return -1;
274 274
     }
275 275
 
276
+
276 277
     if(kz_timer_ms > 0) {
277 278
     	kz_timer_tv.tv_usec = (kz_timer_ms % 1000) * 1000;
278 279
     	kz_timer_tv.tv_sec = kz_timer_ms / 1000;
... ...
@@ -312,6 +320,16 @@ static int mod_init(void) {
312 312
 		}
313 313
     }
314 314
 
315
+	sr_kemi_eng_t *keng = NULL;
316
+	if(kazoo_event_callback.s!=NULL && kazoo_event_callback.len>0) {
317
+		keng = sr_kemi_eng_get();
318
+		if(keng==NULL) {
319
+			LM_ERR("failed to find kemi engine\n");
320
+			return -1;
321
+		}
322
+		kazoo_kemi_enabled=1;
323
+	}
324
+
315 325
 
316 326
     int total_workers = dbk_consumer_workers + (dbk_consumer_processes * kz_server_counter) + 2;
317 327
 
... ...
@@ -341,6 +359,20 @@ static int mod_init(void) {
341 341
     return 0;
342 342
 }
343 343
 
344
+static sr_kemi_t kazoo_kemi_exports[] = {
345
+	{ str_init("kazoo"), str_init("kazoo_publish"),
346
+		SR_KEMIP_INT, ki_kz_amqp_publish,
347
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
348
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
349
+	},
350
+	{ str_init("kazoo"), str_init("kazoo_subscribe"),
351
+		SR_KEMIP_INT, ki_kz_amqp_subscribe,
352
+		{ SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE,
353
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
354
+	},
355
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
356
+};
357
+
344 358
 int mod_register(char *path, int *dlflags, void *p1, void *p2)
345 359
 {
346 360
 	if(kz_tr_init_buffers()<0)
... ...
@@ -348,6 +380,9 @@ int mod_register(char *path, int *dlflags, void *p1, void *p2)
348 348
 		LM_ERR("failed to initialize transformations buffers\n");
349 349
 		return -1;
350 350
 	}
351
+
352
+	sr_kemi_modules_add(kazoo_kemi_exports);
353
+
351 354
 	return register_trans_mod(path, mod_trans);
352 355
 }
353 356
 
... ...
@@ -430,16 +465,12 @@ static int mod_child_init(int rank)
430 430
 	return 0;
431 431
 }
432 432
 
433
-static int fire_init_event(int rank)
433
+static int fire_init_event_cfg(void)
434 434
 {
435 435
 	struct sip_msg *fmsg;
436 436
 	struct run_act_ctx ctx;
437 437
 	int rtb, rt;
438 438
 
439
-	LM_DBG("rank is (%d)\n", rank);
440
-	if (rank!=PROC_INIT)
441
-		return 0;
442
-
443 439
 	rt = route_get(&event_rt, "kazoo:mod-init");
444 440
 	if(rt>=0 && event_rt.rlist[rt]!=NULL) {
445 441
 		LM_DBG("executing event_route[kazoo:mod-init] (%d)\n", rt);
... ...
@@ -461,11 +492,50 @@ static int fire_init_event(int rank)
461 461
 	return 0;
462 462
 }
463 463
 
464
+static int fire_init_event_kemi(void)
465
+{
466
+	struct sip_msg *fmsg;
467
+	int rtb;
468
+	sr_kemi_eng_t *keng = NULL;
469
+
470
+	keng = sr_kemi_eng_get();
471
+	if(keng!=NULL) {
472
+		str evrtname = str_init("kazoo:mod-init");
473
+		rtb = get_route_type();
474
+		if(faked_msg_init()<0)
475
+			return -1;
476
+		fmsg = faked_msg_next();
477
+		if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &kazoo_event_callback, &evrtname)<0) {
478
+			LM_ERR("error running event route kemi callback\n");
479
+		}
480
+		set_route_type(rtb);
481
+	} 
482
+	else {
483
+		LM_ERR("no event route or kemi callback found for execution\n");
484
+	}
485
+
486
+	return 0;
487
+}
488
+
489
+static int fire_init_event(int rank)
490
+{
491
+	LM_DBG("rank is (%d)\n", rank);
492
+	if (rank!=PROC_INIT)
493
+		return 0;
494
+
495
+	if (kazoo_kemi_enabled) {
496
+		return fire_init_event_kemi();
497
+	}
498
+	else {
499
+		return fire_init_event_cfg();
500
+	}
501
+
502
+	return 0;
503
+}
504
+
464 505
 
465 506
 static void mod_destroy(void) {
466 507
 	kz_amqp_destroy();
467 508
     if (kz_worker_pipes_fds) { shm_free(kz_worker_pipes_fds); }
468 509
     if (kz_worker_pipes) { shm_free(kz_worker_pipes); }
469 510
 }
470
-
471
-
... ...
@@ -46,7 +46,7 @@
46 46
 #include "../../core/receive.h"
47 47
 #include "../../core/action.h"
48 48
 #include "../../core/script_cb.h"
49
-
49
+#include "../../core/kemi.h"
50 50
 
51 51
 #include "kz_amqp.h"
52 52
 #include "kz_json.h"
... ...
@@ -81,6 +81,9 @@ extern int kz_amqps_verify_hostname;
81 81
 
82 82
 extern pv_spec_t kz_query_timeout_spec;
83 83
 
84
+extern int kazoo_kemi_enabled;
85
+extern str kazoo_event_callback;
86
+
84 87
 const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
85 88
 const amqp_table_t kz_amqp_empty_table = { 0, NULL };
86 89
 
... ...
@@ -1215,7 +1218,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
1215 1215
 
1216 1216
 int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags)
1217 1217
 {
1218
-	  str json_s;
1218
+	  str pl_s;
1219 1219
 	  str exchange_s;
1220 1220
 	  str routing_key_s;
1221 1221
 
... ...
@@ -1229,7 +1232,7 @@ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, c
1229 1229
 			return -1;
1230 1230
 		}
1231 1231
 
1232
-		if (fixup_get_svalue(msg, (gparam_p)payload, &json_s) != 0) {
1232
+		if (fixup_get_svalue(msg, (gparam_p)payload, &pl_s) != 0) {
1233 1233
 			LM_ERR("cannot get json string value : %s\n", payload);
1234 1234
 			return -1;
1235 1235
 		}
... ...
@@ -1239,19 +1242,22 @@ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, c
1239 1239
 			return -1;
1240 1240
 		}
1241 1241
 
1242
-		struct json_object *j = json_tokener_parse(json_s.s);
1243
-
1244
-		if (j==NULL) {
1245
-			LM_ERR("empty or invalid JSON payload : %.*s\n", json_s.len, json_s.s);
1246
-			return -1;
1247
-		}
1248
-
1249
-		json_object_put(j);
1250
-
1251
-		return kz_amqp_pipe_send(&exchange_s, &routing_key_s, &json_s );
1242
+		return ki_kz_amqp_publish(msg, (char*)&exchange_s, (char*)&routing_key_s, (char*)&pl_s);
1243
+};
1252 1244
 
1245
+int ki_kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
1246
+{
1247
+	  char *pl = ((str*)payload)->s;
1248
+	  struct json_object *j = json_tokener_parse(pl);
1253 1249
 
1254
-};
1250
+	  if (j==NULL) {
1251
+	  	  LM_ERR("empty or invalid JSON payload : %.*s\n", ((str*)payload)->len, ((str*)payload)->s);
1252
+	  	  return -1;
1253
+	  }
1254
+	  
1255
+	  json_object_put(j);
1256
+	  return kz_amqp_pipe_send((str*)exchange, (str*)routing_key, (str*)payload);
1257
+}
1255 1258
 
1256 1259
 int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
1257 1260
 {
... ...
@@ -1794,9 +1800,20 @@ kz_amqp_exchange_binding_ptr kz_amqp_exchange_binding_from_json(json_object* JOb
1794 1794
 
1795 1795
 int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1796 1796
 {
1797
+	str payload_s = STR_NULL;
1798
+    
1799
+	if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) {
1800
+		LM_ERR("cannot get payload value\n");
1801
+		return -1;
1802
+	}
1803
+
1804
+	return ki_kz_amqp_subscribe(msg, (char*)(&payload_s));
1805
+}
1806
+
1807
+int ki_kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1808
+{
1797 1809
 	str exchange_s = STR_NULL;
1798 1810
 	str queue_s = STR_NULL;
1799
-	str payload_s = STR_NULL;
1800 1811
 	str key_s = STR_NULL;
1801 1812
 	str subkey_s = STR_NULL;
1802 1813
 	int no_ack = 1;
... ...
@@ -1816,12 +1833,9 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1816 1816
 	kz_amqp_bind_ptr bind = NULL;
1817 1817
 	kz_amqp_binding_ptr binding = NULL;
1818 1818
 
1819
-	if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) {
1820
-		LM_ERR("cannot get payload value\n");
1821
-		return -1;
1822
-	}
1819
+	char* pl = ((str*)payload)->s;
1820
+	json_obj = kz_json_parse(pl);
1823 1821
 
1824
-	json_obj = kz_json_parse(payload_s.s);
1825 1822
 	if (json_obj == NULL)
1826 1823
 		return -1;
1827 1824
     
... ...
@@ -1873,7 +1887,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1873 1873
 	}
1874 1874
 
1875 1875
 	if(routing == NULL) {
1876
-		LM_INFO("creating empty routing key : %s\n", payload_s.s);
1876
+		LM_INFO("creating empty routing key : %s\n", pl);
1877 1877
 		routing = kz_amqp_routing_new("");
1878 1878
 	}
1879 1879
 
... ...
@@ -2331,63 +2345,99 @@ int kz_amqp_consumer_fire_event(char *eventkey)
2331 2331
 	return 0;
2332 2332
 }
2333 2333
 
2334
-void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt)
2334
+static void kz_amqp_consumer_event_cfg(kz_amqp_consumer_delivery_ptr Evt, json_obj_ptr json_obj)
2335 2335
 {
2336
-    json_obj_ptr json_obj = NULL;
2337
-    str ev_name = {0, 0}, ev_category = {0, 0};
2338
-    char buffer[512];
2339
-    char * p;
2336
+	str ev_name = {0, 0}, ev_category = {0, 0};
2337
+	char buffer[512];
2338
+	char * p;
2340 2339
 
2341
-    eventData = Evt->payload;
2342
-    if(Evt->routing_key) {
2343
-    	eventKey = Evt->routing_key->s;
2344
-    }
2345 2340
 
2346
-    json_obj = kz_json_parse(Evt->payload);
2347
-    if (json_obj == NULL)
2348
-		return;
2349
-
2350
-    char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key);
2351
-    char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey);
2341
+	char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key);
2342
+	char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey);
2352 2343
 
2353
-    json_extract_field(key, ev_category);
2354
-    if(ev_category.len == 0 && Evt->event_key) {
2355
-	    ev_category.s = Evt->event_key;
2356
-	    ev_category.len = strlen(Evt->event_key);
2357
-    }
2344
+	json_extract_field(key, ev_category);
2345
+	if(ev_category.len == 0 && Evt->event_key) {
2346
+		ev_category.s = Evt->event_key;
2347
+		ev_category.len = strlen(Evt->event_key);
2348
+	}
2358 2349
 
2359
-    json_extract_field(subkey, ev_name);
2360
-    if(ev_name.len == 0 && Evt->event_subkey) {
2361
-	    ev_name.s = Evt->event_subkey;
2362
-	    ev_name.len = strlen(Evt->event_subkey);
2363
-    }
2350
+	json_extract_field(subkey, ev_name);
2351
+	if(ev_name.len == 0 && Evt->event_subkey) {
2352
+		ev_name.s = Evt->event_subkey;
2353
+		ev_name.len = strlen(Evt->event_subkey);
2354
+	}
2364 2355
 
2365
-    sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
2366
-    for (p=buffer ; *p; ++p) *p = tolower(*p);
2367
-    for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2368
-    if(kz_amqp_consumer_fire_event(buffer) != 0) {
2369
-        sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s);
2370
-        for (p=buffer ; *p; ++p) *p = tolower(*p);
2371
-        for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2372
-        if(kz_amqp_consumer_fire_event(buffer) != 0) {
2373
-            sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey);
2374
-            for (p=buffer ; *p; ++p) *p = tolower(*p);
2375
-            for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2376
-            if(kz_amqp_consumer_fire_event(buffer) != 0) {
2377
-                sprintf(buffer, "kazoo:consumer-event-%s", key);
2378
-                for (p=buffer ; *p; ++p) *p = tolower(*p);
2379
-                for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2356
+	sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
2357
+	for (p=buffer ; *p; ++p) *p = tolower(*p);
2358
+	for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2359
+	if(kz_amqp_consumer_fire_event(buffer) != 0) {
2360
+		sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s);
2361
+		for (p=buffer ; *p; ++p) *p = tolower(*p);
2362
+		for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2363
+		if(kz_amqp_consumer_fire_event(buffer) != 0) {
2364
+			sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey);
2365
+			for (p=buffer ; *p; ++p) *p = tolower(*p);
2366
+			for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2367
+			if(kz_amqp_consumer_fire_event(buffer) != 0) {
2368
+				sprintf(buffer, "kazoo:consumer-event-%s", key);
2369
+				for (p=buffer ; *p; ++p) *p = tolower(*p);
2370
+				for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
2380 2371
 				if(kz_amqp_consumer_fire_event(buffer) != 0) {
2381 2372
 					sprintf(buffer, "kazoo:consumer-event");
2382 2373
 					if(kz_amqp_consumer_fire_event(buffer) != 0) {
2383 2374
 						LM_ERR("kazoo:consumer-event not found\n");
2384 2375
 					}
2385 2376
 				}
2386
-            }
2387
-        }
2388
-    }
2377
+			}
2378
+		}
2379
+	}
2380
+}
2381
+
2382
+static void kz_amqp_consumer_event_kemi(void)
2383
+{
2384
+	sr_kemi_eng_t *keng = NULL;
2385
+    int rtb;
2386
+	
2387
+	keng = sr_kemi_eng_get();
2388
+	if(keng!=NULL) {
2389
+		sip_msg_t *msg;
2390
+		str evrtname = str_init("kazoo:consumer-event");
2391
+
2392
+		rtb = get_route_type();
2393
+		msg = faked_msg_next();
2394
+		if(sr_kemi_route(keng, msg, EVENT_ROUTE, &kazoo_event_callback, &evrtname)<0)                {
2395
+			LM_ERR("error running event route kemi callback\n");
2396
+		}
2397
+
2398
+		set_route_type(rtb);
2399
+	} else {
2400
+		LM_ERR("no event route or kemi callback found for execution\n");
2401
+	}
2402
+
2403
+}
2404
+
2405
+void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt)
2406
+{
2407
+	json_obj_ptr json_obj = NULL;
2408
+
2409
+	eventData = Evt->payload;
2410
+	if(Evt->routing_key) {
2411
+		eventKey = Evt->routing_key->s;
2412
+	}
2413
+
2414
+	json_obj = kz_json_parse(Evt->payload);
2415
+	if (json_obj == NULL)
2416
+		return;
2417
+
2418
+	if (kazoo_kemi_enabled) {
2419
+		kz_amqp_consumer_event_kemi();
2420
+	} 
2421
+	else {
2422
+		kz_amqp_consumer_event_cfg(Evt, json_obj);
2423
+	}
2424
+
2389 2425
 	if(json_obj)
2390
-    	json_object_put(json_obj);
2426
+		json_object_put(json_obj);
2391 2427
 
2392 2428
 	eventData = NULL;
2393 2429
 	eventKey = NULL;
... ...
@@ -274,9 +274,11 @@ int kz_amqp_add_connection(modparam_t type, void* val);
274 274
 
275 275
 int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
276 276
 int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags);
277
+int ki_kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
277 278
 int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
278 279
 int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
279 280
 int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
281
+int ki_kz_amqp_subscribe(struct sip_msg* msg, char* payload);
280 282
 int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key);
281 283
 int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
282 284
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);