Browse code

kazoo: fix blocked sockets & zones

lazedo authored on 09/02/2018 21:38:03
Showing 2 changed files
... ...
@@ -74,6 +74,8 @@ struct timeval kz_amqp_tv = (struct timeval){0,100000};
74 74
 struct timeval kz_qtimeout_tv = (struct timeval){2,0};
75 75
 struct timeval kz_ack_tv = (struct timeval){0,100000};
76 76
 struct timeval kz_timer_tv = (struct timeval){0,200000};
77
+struct timeval kz_amqp_connect_timeout_tv = (struct timeval){0,200000};
78
+
77 79
 int kz_timer_ms = 200;
78 80
 
79 81
 str kz_json_escape_str = str_init("%");
... ...
@@ -199,6 +201,8 @@ static param_export_t params[] = {
199 201
     {"amqps_verify_peer", INT_PARAM, &kz_amqps_verify_peer},
200 202
     {"amqps_verify_hostname", INT_PARAM, &kz_amqps_verify_hostname},
201 203
 	{"pua_lock_type", INT_PARAM, &kz_pua_lock_type},
204
+    {"amqp_connect_timeout_micro", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_usec},
205
+    {"amqp_connect_timeout_sec", INT_PARAM, &kz_amqp_connect_timeout_tv.tv_sec},
202 206
     {0, 0, 0}
203 207
 };
204 208
 
... ...
@@ -71,6 +71,7 @@ extern int kz_cmd_pipe;
71 71
 extern struct timeval kz_amqp_tv;
72 72
 extern struct timeval kz_qtimeout_tv;
73 73
 extern struct timeval kz_timer_tv;
74
+extern struct timeval kz_amqp_connect_timeout_tv;
74 75
 
75 76
 extern str kz_amqps_ca_cert;
76 77
 extern str kz_amqps_cert;
... ...
@@ -841,7 +842,7 @@ int kz_amqp_connection_open_ssl(kz_amqp_conn_ptr rmq) {
841 842
     amqp_ssl_socket_set_verify_hostname(rmq->socket, kz_amqps_verify_hostname);
842 843
 #endif
843 844
 
844
-    if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) {
845
+    if (amqp_socket_open_noblock(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port, &kz_amqp_connect_timeout_tv)) {
845 846
 	LM_ERR("Failed to open SSL socket to AMQP broker : %s : %i\n",
846 847
 	rmq->server->connection->info.host, rmq->server->connection->info.port);
847 848
 	goto nosocket;
... ...
@@ -895,7 +896,7 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
895 896
     	goto nosocket;
896 897
     }
897 898
 
898
-    if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) {
899
+    if (amqp_socket_open_noblock(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port, &kz_amqp_connect_timeout_tv)) {
899 900
     	LM_DBG("Failed to open TCP socket to AMQP broker\n");
900 901
     	goto nosocket;
901 902
     }
... ...
@@ -2656,12 +2657,13 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg)
2656 2657
 	LM_DBG("attempting to reconnect now.\n");
2657 2658
 	kz_amqp_conn_ptr connection = (kz_amqp_conn_ptr)arg;
2658 2659
 
2659
-	if (connection->state == KZ_AMQP_CONNECTION_OPEN) {
2660
-		LM_WARN("trying to connect an already connected server.\n");
2661
-		return;
2662
-	}
2663
-
2664 2660
 	kz_amqp_timer_destroy(&connection->reconnect);
2661
+
2662
+//	if (connection->state == KZ_AMQP_CONNECTION_OPEN) {
2663
+//		LM_WARN("trying to connect an already connected server.\n");
2664
+//		return;
2665
+//	}
2666
+
2665 2667
 	kz_amqp_connect(connection);
2666 2668
 }
2667 2669
 
... ...
@@ -2684,9 +2686,10 @@ int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd)
2684 2686
 	int sent = 0;
2685 2687
 	kz_amqp_zone_ptr g;
2686 2688
 	kz_amqp_server_ptr s;
2689
+	kz_amqp_zone_ptr primary = kz_amqp_get_primary_zone();
2687 2690
 	for (g = kz_amqp_get_zones(); g != NULL && sent == 0; g = g->next) {
2688 2691
 		for (s = g->servers->head; s != NULL && sent == 0; s = s->next) {
2689
-			if(cmd->server_id == s->id || cmd->server_id == 0) {
2692
+			if(cmd->server_id == s->id || (cmd->server_id == 0 && g == primary)) {
2690 2693
 				if(s->producer->state == KZ_AMQP_CONNECTION_OPEN) {
2691 2694
 					if(cmd->type == KZ_AMQP_CMD_PUBLISH
2692 2695
 							|| cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST
... ...
@@ -3130,20 +3133,19 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
3130 3133
 			amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0);
3131 3134
 			switch(reply.reply_type) {
3132 3135
 			case AMQP_RESPONSE_LIBRARY_EXCEPTION:
3136
+				OK=0;
3133 3137
 				switch(reply.library_error) {
3134 3138
 				case AMQP_STATUS_HEARTBEAT_TIMEOUT:
3135 3139
 					LM_ERR("AMQP_STATUS_HEARTBEAT_TIMEOUT\n");
3136
-					OK=0;
3137 3140
 					break;
3138 3141
 				case AMQP_STATUS_TIMEOUT:
3139 3142
 					break;
3140 3143
 				case AMQP_STATUS_UNEXPECTED_STATE:
3141 3144
 					LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n");
3142
-					OK = kz_amqp_consume_error(consumer);
3145
+					kz_amqp_consume_error(consumer);
3143 3146
 					break;
3144 3147
 				default:
3145 3148
 					LM_ERR("AMQP_RESPONSE_LIBRARY_EXCEPTION %i\n", reply.library_error);
3146
-					OK = 0;
3147 3149
 					break;
3148 3150
 				};
3149 3151
 				break;
... ...
@@ -3326,6 +3328,7 @@ void kz_amqp_heartbeat_proc(int fd, short event, void *arg)
3326 3328
 	LM_DBG("sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id);
3327 3329
 	if (connection->state != KZ_AMQP_CONNECTION_OPEN) {
3328 3330
 		kz_amqp_timer_destroy(&connection->heartbeat);
3331
+		kz_amqp_handle_server_failure(connection);
3329 3332
 		return;
3330 3333
 	}
3331 3334
 	heartbeat.channel = 0;