Browse code

kazoo: add new functions & params

lazedo authored on 07/11/2019 22:27:56
Showing 4 changed files
... ...
@@ -88,6 +88,7 @@ int dbk_internal_loop_count = 5;
88 88
 int dbk_consumer_loop_count = 10;
89 89
 int dbk_consumer_ack_loop_count = 20;
90 90
 int dbk_include_entity = 1;
91
+int dbk_use_full_entity = 0;
91 92
 int dbk_pua_mode = 1;
92 93
 db_locking_t kz_pua_lock_type = DB_LOCKING_WRITE;
93 94
 int dbk_use_hearbeats = 0;
... ...
@@ -140,6 +141,7 @@ static pv_export_t kz_mod_pvs[] = {
140 140
  */
141 141
 static cmd_export_t cmds[] = {
142 142
     {"kazoo_publish", (cmd_function) kz_amqp_publish, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
143
+    {"kazoo_publish", (cmd_function) kz_amqp_publish_ex, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
143 144
     {"kazoo_query", (cmd_function) kz_amqp_query, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
144 145
     {"kazoo_query", (cmd_function) kz_amqp_query_ex, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
145 146
     {"kazoo_pua_publish", (cmd_function) kz_pua_publish, 1, 0, 0, ANY_ROUTE},
... ...
@@ -156,6 +158,9 @@ static cmd_export_t cmds[] = {
156 156
     {"kazoo_encode", (cmd_function) kz_amqp_encode, 2, fixup_kz_amqp_encode, fixup_kz_amqp_encode_free, ANY_ROUTE},
157 157
 
158 158
     {"kazoo_async_query", (cmd_function) kz_amqp_async_query, 5, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
159
+    {"kazoo_async_query", (cmd_function) kz_amqp_async_query_ex, 6, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
160
+    {"kazoo_query_async", (cmd_function) kz_amqp_async_query, 5, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
161
+    {"kazoo_query_async", (cmd_function) kz_amqp_async_query_ex, 6, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
159 162
 
160 163
 	{0, 0, 0, 0, 0, 0}
161 164
 };
... ...
@@ -181,6 +186,7 @@ static param_export_t params[] = {
181 181
     {"amqp_consumer_loop_count", INT_PARAM, &dbk_consumer_loop_count},
182 182
     {"amqp_consumer_ack_loop_count", INT_PARAM, &dbk_consumer_ack_loop_count},
183 183
     {"pua_include_entity", INT_PARAM, &dbk_include_entity},
184
+	{"presence_use_full_entity", INT_PARAM, &dbk_use_full_entity},
184 185
     {"presentity_table", PARAM_STR, &kz_presentity_table},
185 186
 	{"db_url", PARAM_STR, &kz_db_url},
186 187
     {"pua_mode", INT_PARAM, &dbk_pua_mode},
... ...
@@ -1213,7 +1213,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
1213 1213
     return ret;
1214 1214
 }
1215 1215
 
1216
-int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
1216
+int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags)
1217 1217
 {
1218 1218
 	  str json_s;
1219 1219
 	  str exchange_s;
... ...
@@ -1253,6 +1253,10 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char
1253 1253
 
1254 1254
 };
1255 1255
 
1256
+int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
1257
+{
1258
+	return kz_amqp_publish_ex(msg, exchange, routing_key, payload, NULL);
1259
+}
1256 1260
 
1257 1261
 char* last_payload_result = NULL;
1258 1262
 
... ...
@@ -1261,7 +1265,7 @@ int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value
1261 1261
 	return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
1262 1262
 }
1263 1263
 
1264
-int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route)
1264
+int kz_amqp_async_query_ex(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route, char* _pub_flags)
1265 1265
 {
1266 1266
 	  str json_s;
1267 1267
 	  str exchange_s;
... ...
@@ -1406,6 +1410,11 @@ exit:
1406 1406
 	    return ret;
1407 1407
 };
1408 1408
 
1409
+int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route)
1410
+{
1411
+	return kz_amqp_async_query_ex(msg, _exchange, _routing_key, _payload, _cb_route, _err_route, NULL);
1412
+}
1413
+
1409 1414
 void kz_amqp_reset_last_result()
1410 1415
 {
1411 1416
 	if(last_payload_result)
... ...
@@ -3230,8 +3239,8 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe)
3230 3230
 	set_non_blocking(cmd_pipe);
3231 3231
 	event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, kz_amqp_consumer_worker_cb, &pipe_ev);
3232 3232
 	event_add(&pipe_ev, NULL);
3233
-	event_dispatch();
3234
-	return 0;
3233
+
3234
+	return event_dispatch();
3235 3235
 }
3236 3236
 
3237 3237
 void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer)
... ...
@@ -273,6 +273,7 @@ void kz_amqp_destroy();
273 273
 int kz_amqp_add_connection(modparam_t type, void* val);
274 274
 
275 275
 int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
276
+int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags);
276 277
 int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
277 278
 int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
278 279
 int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
... ...
@@ -281,6 +282,7 @@ int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
281 281
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
282 282
 
283 283
 int kz_amqp_async_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _cb_route, char* _err_route);
284
+int kz_amqp_async_query_ex(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route, char* _pub_flags);
284 285
 
285 286
 //void kz_amqp_generic_consumer_loop(int child_no);
286 287
 void kz_amqp_manager_loop(int child_no);
... ...
@@ -108,38 +108,12 @@ int fixup_kz_amqp_encode_free(void** param, int param_no)
108 108
 
109 109
 int fixup_kz_amqp(void** param, int param_no)
110 110
 {
111
-  if (param_no == 1 || param_no == 2 || param_no == 3) {
112
-		return fixup_spve_null(param, 1);
113
-	}
114
-
115
-	if (param_no == 4) {
116
-		if (fixup_pvar_null(param, 1) != 0) {
117
-		    LM_ERR("failed to fixup result pvar\n");
118
-		    return -1;
119
-		}
120
-		if (((pv_spec_t *)(*param))->setf == NULL) {
121
-		    LM_ERR("result pvar is not writeble\n");
122
-		    return -1;
123
-		}
124
-		return 0;
125
-	}
126
-
127
-	LM_ERR("invalid parameter number <%d>\n", param_no);
128
-	return -1;
111
+	return fixup_spve_null(param, 1);
129 112
 }
130 113
 
131 114
 int fixup_kz_amqp_free(void** param, int param_no)
132 115
 {
133
-	if (param_no == 1 || param_no == 2 || param_no == 3) {
134
-		return fixup_free_spve_null(param, 1);
135
-	}
136
-
137
-	if (param_no == 4) {
138
-		return fixup_free_pvar_null(param, 1);
139
-	}
140
-
141
-	LM_ERR("invalid parameter number <%d>\n", param_no);
142
-	return -1;
116
+	return fixup_free_spve_null(param, 1);
143 117
 }
144 118
 
145 119