Browse code

dmq: Let the handler know about the sending node

Try to find a node based on the from uri of the incoming request and hand
it to the request handler.

Alex Hermann authored on 28/08/2014 12:21:15
Showing 9 changed files
... ...
@@ -87,7 +87,7 @@ int dlg_dmq_broadcast(str* body) {
87 87
 /**
88 88
 * @brief ht dmq callback
89 89
 */
90
-int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
90
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
91 91
 {
92 92
 	int content_length;
93 93
 	str body;
... ...
@@ -42,7 +42,7 @@ typedef enum {
42 42
 } dlg_dmq_action_t;
43 43
 
44 44
 int dlg_dmq_initialize();
45
-int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
45
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
46 46
 int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock);
47 47
 int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
48 48
 #endif
... ...
@@ -187,7 +187,7 @@ int run_init_callbacks() {
187 187
 /**
188 188
  * @brief dmq notification callback
189 189
  */
190
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
190
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
191 191
 {
192 192
 	int nodes_recv;
193 193
 	str* response_body = NULL;
... ...
@@ -37,7 +37,7 @@ extern str notification_content_type;
37 37
 extern int *dmq_init_callback_done;
38 38
 
39 39
 int add_notification_peer();
40
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
40
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
41 41
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
42 42
 str* build_notification_body();
43 43
 int build_node_str(dmq_node_t* node, char* buf, int buflen);
... ...
@@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id)
97 97
 /**
98 98
  * @empty callback
99 99
  */
100
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
100
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
101 101
 {
102 102
 	return 0;
103 103
 }
... ...
@@ -28,6 +28,7 @@
28 28
 
29 29
 #include <string.h>
30 30
 #include <stdlib.h>
31
+#include "dmqnode.h"
31 32
 #include "../../lock_ops.h"
32 33
 #include "../../str.h"
33 34
 #include "../../mem/mem.h"
... ...
@@ -41,7 +42,7 @@ typedef struct peer_response {
41 42
 	str body;
42 43
 } peer_reponse_t;
43 44
 
44
-typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
45
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node);
45 46
 typedef int(*init_callback_t)();
46 47
 
47 48
 typedef struct dmq_peer {
... ...
@@ -66,7 +67,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
66 67
 
67 68
 dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
68 69
 dmq_peer_t* find_peer(str peer_id);
69
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp);
70
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
70 71
 
71 72
 #endif
72 73
 
... ...
@@ -29,6 +29,8 @@
29 29
 #include "../../data_lump_rpl.h"
30 30
 #include "../../mod_fix.h"
31 31
 #include "../../sip_msg_clone.h"
32
+#include "../../parser/parse_from.h"
33
+#include "../../parser/parse_to.h"
32 34
 
33 35
 /**
34 36
  * @brief set the body of a response
... ...
@@ -78,6 +80,7 @@ void worker_loop(int id)
78 80
 	dmq_job_t* current_job;
79 81
 	peer_reponse_t peer_response;
80 82
 	int ret_value;
83
+	dmq_node_t *dmq_node = NULL;
81 84
 
82 85
 	worker = &workers[id];
83 86
 	for(;;) {
... ...
@@ -92,7 +95,14 @@ void worker_loop(int id)
92 95
 			current_job = job_queue_pop(worker->queue);
93 96
 			/* job_queue_pop might return NULL if queue is empty */
94 97
 			if(current_job) {
95
-				ret_value = current_job->f(current_job->msg, &peer_response);
98
+				/* extract the from uri */
99
+				if (parse_from_header(current_job->msg) < 0) {
100
+					LM_ERR("bad sip message or missing From hdr\n");
101
+				} else {
102
+					dmq_node = find_dmq_node_uri(node_list, &((struct to_body*)current_job->msg->from->parsed)->uri);
103
+				}
104
+
105
+				ret_value = current_job->f(current_job->msg, &peer_response, dmq_node);
96 106
 				if(ret_value < 0) {
97 107
 					LM_ERR("running job failed\n");
98 108
 					continue;
... ...
@@ -89,7 +89,7 @@ int ht_dmq_broadcast(str* body) {
89 89
 /**
90 90
  * @brief ht dmq callback
91 91
  */
92
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
92
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
93 93
 {
94 94
 	int content_length;
95 95
 	str body;
... ...
@@ -40,7 +40,7 @@ typedef enum {
40 40
 } ht_dmq_action_t;
41 41
 
42 42
 int ht_dmq_initialize();
43
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
43
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
44 44
 int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
45 45
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
46 46
 int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);