Browse code

kazoo: Allow optional AMQP-headers in kazoo_query and kazoo_publish - implements GH #2895

mihovilkolaric authored on 09/11/2021 22:25:02 • Henning Westerholt committed on 19/11/2021 11:43:16
Showing 4 changed files
... ...
@@ -560,10 +560,10 @@ modparam("kazoo", "pua_mode", 0)
560 560
 			<section>
561 561
 				<title>
562 562
 					<function moreinfo="none">kazoo_publish(exchange, routing_key,
563
-						json_payload)</function>
563
+						json_payload [, amqp_headers])</function>
564 564
 				</title>
565 565
 				<para>The function publishes a json payload to rabbitmq. The routing_key parameter
566
-					should be encoded.</para>
566
+					should be encoded. Optional AMQP-Headers are specified in the format key1=value1;key2=value2</para>
567 567
 				<para>This function can be used from ANY ROUTE.</para>
568 568
 
569 569
 				<example>
... ...
@@ -581,12 +581,12 @@ kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));
581 581
 			<section>
582 582
 				<title>
583 583
 					<function moreinfo="none">kazoo_query(exchange, routing_key, json_payload [,
584
-						target_var])</function>
584
+						target_var] [, amqp_headers])</function>
585 585
 				</title>
586 586
 				<para>The function publishes a json payload to rabbitmq, waits for a correlated
587 587
 					messageand puts the result in target_var. The routing_key parameter should be
588 588
 					encoded. target_var is optional as the function also puts the result in
589
-					pseudo-variable $kzR.</para>
589
+					pseudo-variable $kzR. Optional AMQP-Headers are specified in the format key1=value1;key2=value2</para>
590 590
 				<para>This function can be used from ANY ROUTE.</para>
591 591
 
592 592
 				<example>
... ...
@@ -149,6 +149,7 @@ static cmd_export_t cmds[] = {
149 149
     {"kazoo_publish", (cmd_function) kz_amqp_publish, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
150 150
     {"kazoo_publish", (cmd_function) kz_amqp_publish_ex, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
151 151
     {"kazoo_query", (cmd_function) kz_amqp_query, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
152
+    {"kazoo_query", (cmd_function) kz_amqp_query, 5, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
152 153
     {"kazoo_query", (cmd_function) kz_amqp_query_ex, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
153 154
     {"kazoo_pua_publish", (cmd_function) kz_pua_publish, 1, 0, 0, ANY_ROUTE},
154 155
     {"kazoo_pua_publish_mwi", (cmd_function) kz_pua_publish_mwi, 1, 0, 0, ANY_ROUTE},
... ...
@@ -352,6 +352,8 @@ void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
352 352
 		shm_free(cmd->cb_route);
353 353
 	if (cmd->err_route)
354 354
 		shm_free(cmd->err_route);
355
+	if (cmd->headers)
356
+		shm_free(cmd->headers);
355 357
 	lock_release(&cmd->lock);
356 358
 	lock_destroy(&cmd->lock);
357 359
 	shm_free(cmd);
... ...
@@ -1035,7 +1037,7 @@ void kz_amqp_add_payload_common_properties(json_obj_ptr json_obj, char* server_i
1035 1037
 
1036 1038
 }
1037 1039
 
1038
-int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
1040
+int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload, str *str_headers)
1039 1041
 {
1040 1042
 	int ret = 1;
1041 1043
     json_obj_ptr json_obj = NULL;
... ...
@@ -1109,6 +1111,9 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
1109 1111
 	lock_get(&cmd->lock);
1110 1112
 	cmd->type = KZ_AMQP_CMD_PUBLISH;
1111 1113
 	cmd->consumer = getpid();
1114
+	if (str_headers != NULL) {
1115
+		cmd->headers = kz_amqp_str_dup(str_headers);
1116
+	}
1112 1117
 	if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
1113 1118
 		LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
1114 1119
 	} else {
... ...
@@ -1130,7 +1135,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
1130 1135
 	return ret;
1131 1136
 }
1132 1137
 
1133
-int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_payload, struct timeval* kz_timeout, json_obj_ptr* json_ret )
1138
+int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_payload, struct timeval* kz_timeout, json_obj_ptr* json_ret , str* str_headers )
1134 1139
 {
1135 1140
 	int ret = 1;
1136 1141
     json_obj_ptr json_obj = NULL;
... ...
@@ -1172,6 +1177,10 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
1172 1177
 	cmd->payload = kz_amqp_string_dup(payload);
1173 1178
 	cmd->message_id = kz_str_dup(&unique_string);
1174 1179
 
1180
+	if (str_headers != NULL) {
1181
+		cmd->headers = kz_amqp_str_dup(str_headers);
1182
+	}
1183
+
1175 1184
 	cmd->timeout = *kz_timeout;
1176 1185
 
1177 1186
 	if(cmd->payload == NULL || cmd->routing_key == NULL || cmd->exchange == NULL) {
... ...
@@ -1216,11 +1225,12 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
1216 1225
     return ret;
1217 1226
 }
1218 1227
 
1219
-int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags)
1228
+int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* headers)
1220 1229
 {
1221 1230
 	  str pl_s;
1222 1231
 	  str exchange_s;
1223 1232
 	  str routing_key_s;
1233
+	  str headers_s;
1224 1234
 
1225 1235
 		if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
1226 1236
 			LM_ERR("cannot get exchange string value\n");
... ...
@@ -1242,10 +1252,20 @@ int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, c
1242 1252
 			return -1;
1243 1253
 		}
1244 1254
 
1245
-		return ki_kz_amqp_publish(msg, &exchange_s, &routing_key_s, &pl_s);
1255
+		if (headers != NULL) {
1256
+			if (fixup_get_svalue(msg, (gparam_p)headers, &headers_s) != 0) {
1257
+				LM_ERR("cannot get amqp_headers string value\n");
1258
+				return -1;
1259
+			}
1260
+		} else {
1261
+			headers_s.len = 0;
1262
+			headers_s.s = "";
1263
+		}
1264
+
1265
+		return ki_kz_amqp_publish(msg, &exchange_s, &routing_key_s, &pl_s, &headers_s);
1246 1266
 };
1247 1267
 
1248
-int ki_kz_amqp_publish(sip_msg_t* msg, str* exchange, str* routing_key, str* payload)
1268
+int ki_kz_amqp_publish(sip_msg_t* msg, str* exchange, str* routing_key, str* payload, str* headers)
1249 1269
 {
1250 1270
 	  char *pl = ((str*)payload)->s;
1251 1271
 	  struct json_object *j = json_tokener_parse(pl);
... ...
@@ -1256,7 +1276,7 @@ int ki_kz_amqp_publish(sip_msg_t* msg, str* exchange, str* routing_key, str* pay
1256 1276
 	  }
1257 1277
 
1258 1278
 	  json_object_put(j);
1259
-	  return kz_amqp_pipe_send(exchange, routing_key, payload);
1279
+	  return kz_amqp_pipe_send(exchange, routing_key, payload, headers);
1260 1280
 }
1261 1281
 
1262 1282
 int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
... ...
@@ -1438,11 +1458,12 @@ void kz_amqp_set_last_result(char* json)
1438 1458
 	last_payload_result = value;
1439 1459
 }
1440 1460
 
1441
-int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
1461
+int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* headers)
1442 1462
 {
1443 1463
 	  str json_s;
1444 1464
 	  str exchange_s;
1445 1465
 	  str routing_key_s;
1466
+	  str headers_s;
1446 1467
 	  struct timeval kz_timeout = kz_qtimeout_tv;
1447 1468
 
1448 1469
 	  if(last_payload_result)
... ...
@@ -1479,6 +1500,16 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
1479 1500
 
1480 1501
 		json_object_put(j);
1481 1502
 
1503
+		if (headers != NULL) {
1504
+			if (fixup_get_svalue(msg, (gparam_p)headers, &headers_s) != 0) {
1505
+				LM_ERR("cannot get amqp_headers string value\n");
1506
+				return -1;
1507
+			}
1508
+		} else {
1509
+			headers_s.len = 0;
1510
+			headers_s.s = "";
1511
+		}
1512
+
1482 1513
 		if(kz_query_timeout_spec.type != PVT_NONE) {
1483 1514
 			pv_value_t pv_val;
1484 1515
 			if(pv_get_spec_value( msg, &kz_query_timeout_spec, &pv_val) == 0) {
... ...
@@ -1491,7 +1522,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
1491 1522
 		}
1492 1523
 
1493 1524
 		json_obj_ptr ret = NULL;
1494
-		int res = kz_amqp_pipe_send_receive(&exchange_s, &routing_key_s, &json_s, &kz_timeout, &ret );
1525
+		int res = kz_amqp_pipe_send_receive(&exchange_s, &routing_key_s, &json_s, &kz_timeout, &ret , &headers_s);
1495 1526
 
1496 1527
 		if(res != 0) {
1497 1528
 			return -1;
... ...
@@ -1508,13 +1539,13 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
1508 1539
 		return 1;
1509 1540
 };
1510 1541
 
1511
-int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst)
1542
+int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst, char* headers)
1512 1543
 {
1513 1544
 
1514 1545
 	  pv_spec_t *dst_pv;
1515 1546
 	  pv_value_t dst_val;
1516 1547
 
1517
-	  int result = kz_amqp_query_ex(msg, exchange, routing_key, payload);
1548
+	  int result = kz_amqp_query_ex(msg, exchange, routing_key, payload, headers);
1518 1549
 	  if(result == -1)
1519 1550
 		  return result;
1520 1551
 
... ...
@@ -2243,6 +2274,11 @@ int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel
2243 2274
     routing_key = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->routing_key));
2244 2275
     payload = amqp_bytes_malloc_dup(amqp_cstring_bytes(cmd->payload));
2245 2276
 
2277
+    int num_headers = 0;
2278
+    if ( (cmd->headers != NULL) &&  (strlen (cmd->headers) > 0 ) ) {
2279
+        num_headers = add_amqp_headers(cmd->headers, &props);
2280
+    }
2281
+    
2246 2282
     json_obj = kz_json_parse(cmd->payload);
2247 2283
     if (json_obj == NULL) {
2248 2284
 	    LM_ERR("error parsing json when publishing %s\n", cmd->payload);
... ...
@@ -2284,10 +2320,71 @@ error:
2284 2320
 		amqp_bytes_free(routing_key);
2285 2321
 	if(payload.bytes)
2286 2322
 		amqp_bytes_free(payload);
2323
+	if (num_headers > 0) {
2324
+		shm_free(props.headers.entries);
2325
+	}
2287 2326
 
2288 2327
 	return ret;
2289 2328
 }
2290 2329
 
2330
+int add_amqp_headers (char * headers, amqp_basic_properties_t * props )
2331
+{
2332
+	int num_headers = 0;
2333
+	const char headers_delim[2] = ";"; // several key/val-pairs separated by ";"
2334
+	const char val_delim[2] = "=";     // key/value separated by "="
2335
+	char * headers_buffer;
2336
+
2337
+	char * kv_pair_str, * header_name, * header_value;
2338
+	char * header_saveptr, * val_saveptr; // savepointers for strtok_r
2339
+
2340
+	headers_buffer = pkg_malloc(strlen(headers) + 1);
2341
+	strcpy(headers_buffer, headers);
2342
+
2343
+	//count correct header/value-pairs
2344
+	kv_pair_str = strtok_r(headers_buffer, headers_delim, &header_saveptr);
2345
+	while( kv_pair_str != NULL) {
2346
+		header_name = strtok_r (kv_pair_str, val_delim, &val_saveptr);
2347
+		if (header_name != NULL) {
2348
+			header_value = strtok_r (NULL, val_delim, &val_saveptr);
2349
+			if (header_value != NULL) {
2350
+				num_headers++;
2351
+			} else {
2352
+				LM_ERR("Header-Value cant be parsed - skipping!\n");
2353
+			}
2354
+		} else {
2355
+			LM_ERR("Header-Name cant be parsed - skipping!\n");
2356
+		}
2357
+		kv_pair_str = strtok_r(NULL, headers_delim, &header_saveptr);
2358
+	}
2359
+	pkg_free(headers_buffer);
2360
+
2361
+	if (num_headers > 0) {
2362
+		headers_buffer = pkg_malloc(strlen(headers) + 1);
2363
+		strcpy(headers_buffer, headers);
2364
+		//allocate size
2365
+		props->headers.num_entries=num_headers;
2366
+		props->headers.entries=shm_malloc(props->headers.num_entries * sizeof(amqp_table_entry_t));
2367
+		num_headers = 0;
2368
+		kv_pair_str = strtok_r(headers_buffer, headers_delim, &header_saveptr);
2369
+		while( kv_pair_str != NULL) {
2370
+			header_name = strtok_r (kv_pair_str, val_delim, &val_saveptr);
2371
+			if (header_name != NULL) {
2372
+				header_value = strtok_r (NULL, val_delim, &val_saveptr);
2373
+				if (header_value != NULL) {
2374
+					props->headers.entries[num_headers].key =  amqp_cstring_bytes(header_name);
2375
+					props->headers.entries[num_headers].value.kind=AMQP_FIELD_KIND_UTF8;
2376
+					props->headers.entries[num_headers].value.value.bytes=amqp_cstring_bytes(header_value);
2377
+					num_headers++;
2378
+				}
2379
+			}
2380
+			kv_pair_str = strtok_r(NULL, headers_delim, &header_saveptr);
2381
+		}
2382
+		props->_flags |= AMQP_BASIC_HEADERS_FLAG;
2383
+		pkg_free(headers_buffer);
2384
+	}
2385
+	return num_headers;
2386
+}
2387
+
2291 2388
 int kz_amqp_send(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd)
2292 2389
 {
2293 2390
 	return kz_amqp_send_ex(srv, cmd, KZ_AMQP_CHANNEL_PUBLISHING , -1);
... ...
@@ -122,6 +122,7 @@ typedef struct {
122 122
 	char* queue;
123 123
 	char* payload;
124 124
 	char* return_payload;
125
+	char* headers;
125 126
 	str* message_id;
126 127
 	int   return_code;
127 128
 	int   consumer;
... ...
@@ -273,10 +274,10 @@ void kz_amqp_destroy();
273 274
 int kz_amqp_add_connection(modparam_t type, void* val);
274 275
 
275 276
 int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
276
-int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags);
277
-int ki_kz_amqp_publish(sip_msg_t* msg, str* exchange, str* routing_key, str* payload);
278
-int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
279
-int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
277
+int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* headers);
278
+int ki_kz_amqp_publish(sip_msg_t* msg, str* exchange, str* routing_key, str* payload, str* headers);
279
+int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst, char* headers);
280
+int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* headers);
280 281
 int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
281 282
 int ki_kz_amqp_subscribe(sip_msg_t* msg, str* payload);
282 283
 int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key);
... ...
@@ -327,6 +328,8 @@ kz_amqp_queue_ptr kz_amqp_queue_new(str *name);
327 328
 kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type);
328 329
 kz_amqp_routings_ptr kz_amqp_routing_new(char* routing);
329 330
 
331
+int add_amqp_headers (char * headers, amqp_basic_properties_t * props );
332
+
330 333
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
331 334
 {
332 335
 	amqp_connection_close_t *mconn;