Browse code

Merge f8b56529978440238f9d0210f013ef249e3bc39c into 27b2ddf926240b1dd269eb1d96767d73edf9c6e6

Sungtae Kim authored on 12/08/2020 11:25:50 • GitHub committed on 12/08/2020 11:25:50
Showing 7 changed files
... ...
@@ -3,7 +3,7 @@
3 3
 include ../../Makefile.defs
4 4
 auto_gen=
5 5
 NAME=rtpengine.so
6
-LIBS=
6
+LIBS=-lz
7 7
 
8 8
 SERLIBPATH=../../lib
9 9
 SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
10 10
new file mode 100644
... ...
@@ -0,0 +1,73 @@
1
+
2
+#include <zlib.h>
3
+#include <string.h>
4
+#include <errno.h>
5
+
6
+#include "../../core/dprint.h"
7
+
8
+#include "compress.h"
9
+
10
+#define windowBits 15
11
+
12
+#define GZIP_ENCODING 16
13
+#define ENABLE_ZLIB_GZIP 32
14
+
15
+static void init_stream(z_stream *stream);
16
+
17
+static void init_stream(z_stream *stream)
18
+{
19
+	stream->zalloc = Z_NULL;
20
+	stream->zfree = Z_NULL;
21
+	stream->opaque = Z_NULL;
22
+}
23
+
24
+// compress_data compress the data of the given length.
25
+// returns compressed buffer length and replace the original buffer.
26
+int compress_data(char *buf, int len) {
27
+	uLong comp_len = compressBound(len);
28
+	char tmp[comp_len];
29
+	int ret;
30
+
31
+	ret = compress2((Bytef *)tmp, &comp_len, (Bytef *)buf, len, Z_BEST_COMPRESSION);
32
+	if (ret != Z_OK) {
33
+		LM_ERR("Could not compress the message. ret: %d\n", ret);
34
+		return -1;
35
+	}
36
+
37
+	memcpy(buf, tmp, comp_len);
38
+	return comp_len;
39
+}
40
+
41
+// uncompress_data uncompress the compressed data with given length.
42
+// it compare the first byte to check the compressed data or not.
43
+int uncompress_data(char *buf, int len, char *buf_tmp, int buf_size) {
44
+	z_stream stream;
45
+	int ret;
46
+
47
+	init_stream(&stream);
48
+	ret = inflateInit2(&stream, windowBits | ENABLE_ZLIB_GZIP);
49
+	if (ret != Z_OK) {
50
+		LM_ERR("Could not initiate zstream.\n");
51
+		return -1;
52
+	}
53
+
54
+	stream.next_in = (Bytef *)buf;
55
+	stream.avail_in = len;
56
+
57
+	memset(buf_tmp, 0x00, buf_size);
58
+	stream.next_out = (Bytef *)buf_tmp;
59
+	stream.avail_out = buf_size - 1;
60
+
61
+	ret = inflate(&stream, Z_NO_FLUSH);
62
+	if (ret != Z_OK && ret != Z_STREAM_END) {
63
+		inflateEnd(&stream);
64
+		LM_ERR("Could not uncompress the data correctly.\n");
65
+		return -1;
66
+	}
67
+
68
+	memcpy(buf, buf_tmp, buf_size);
69
+	ret = stream.total_out;
70
+	inflateEnd(&stream);
71
+
72
+	return ret;
73
+}
0 74
new file mode 100644
... ...
@@ -0,0 +1,27 @@
1
+#ifndef __CODEC_H__
2
+#define __CODEC_H__
3
+
4
+#define INLINE static inline
5
+
6
+// returns 1 if the given buffer is compressed buffer.
7
+// Level | ZLIB  | GZIP
8
+//   1   | 78 01 | 1F 8B
9
+//   2   | 78 5E | 1F 8B
10
+//   3   | 78 5E | 1F 8B
11
+//   4   | 78 5E | 1F 8B
12
+//   5   | 78 5E | 1F 8B
13
+//   6   | 78 9C | 1F 8B
14
+//   7   | 78 DA | 1F 8B
15
+//   8   | 78 DA | 1F 8B
16
+//   9   | 78 DA | 1F 8B
17
+INLINE int is_compressed(char* buf) {
18
+	if (buf && ((buf[0] == 0x78) || (buf[0] == 0x1F))) {
19
+		return 1;
20
+	}
21
+	return 0;
22
+}
23
+
24
+int uncompress_data(char *buf, int len, char *buf_tmp, int buf_size);
25
+int compress_data(char *buf, int len);
26
+
27
+#endif
... ...
@@ -36,6 +36,7 @@ struct cfg_group_rtpengine	default_rtpengine_cfg = {
36 36
 		1000,	/* default wait timeout in milliseconds */
37 37
 		MAX_RTPP_TRIED_NODES,
38 38
         5, /* rtprengine retries */
39
+		0, /* compress size */
39 40
 	    };
40 41
 
41 42
 void	*rtpengine_cfg = &default_rtpengine_cfg;
... ...
@@ -54,5 +55,7 @@ cfg_def_t	rtpengine_cfg_def[] = {
54 55
 		"Timeout value expressed in milliseconds to wait for reply from RTPEngine"},
55 56
 	{"rtpengine_retr",	CFG_VAR_INT | CFG_ATOMIC,	0, 0, 0, 0,
56 57
 		"How many times the module should retry to send and receive after timeout was generated"},
58
+	{"compress_size",	CFG_VAR_INT | CFG_ATOMIC,	0, 0, 0, 0,
59
+		"The max send message size for compress"},
57 60
 	{0, 0, 0, 0, 0, 0}
58 61
 };
... ...
@@ -32,6 +32,7 @@ struct cfg_group_rtpengine {
32 32
 	unsigned int	rtpengine_tout_ms;
33 33
 	unsigned int    queried_nodes_limit;
34 34
 	unsigned int	rtpengine_retr;
35
+	unsigned int	compress_size;
35 36
 };
36 37
 
37 38
 extern struct cfg_group_rtpengine	default_rtpengine_cfg;
... ...
@@ -730,6 +730,26 @@ modparam("rtpengine", "media_duration", "$avp(MEDIA_DURATION)")
730 730
 	</section>
731 731
 
732 732
 
733
+	<section id="rtpengine.p.compress_size">
734
+		<title><varname>compress_size</varname> (string)</title>
735
+		<para>
736
+			Maximum compression message size. If the message length is over this 
737
+			amount of size, the Kamailio sends the message after compress.
738
+		</para>
739
+		<para>
740
+			By default, this parameter is set to 0(no compress).
741
+		</para>
742
+		<example>
743
+		<title>Set <varname>compress_size</varname> parameter</title>
744
+<programlisting format="linespecific">
745
+...
746
+modparam("rtpengine", "compress_size", 1400)
747
+...
748
+</programlisting>
749
+		</example>
750
+	</section>
751
+
752
+
733 753
 	<section id="rtpengine.p.mos_min_pv">
734 754
 		<title><varname>mos_min_pv</varname> (string)</title>
735 755
 		<para>
... ...
@@ -84,6 +84,7 @@
84 84
 #include "rtpengine_funcs.h"
85 85
 #include "rtpengine_hash.h"
86 86
 #include "bencode.h"
87
+#include "compress.h"
87 88
 #include "config.h"
88 89
 
89 90
 MODULE_VERSION
... ...
@@ -454,6 +455,7 @@ static param_export_t params[] = {
454 455
 	{"setid_default",         INT_PARAM, &setid_default          },
455 456
 	{"media_duration",        PARAM_STR, &media_duration_pvar_str},
456 457
 	{"hash_algo",             INT_PARAM, &hash_algo},
458
+	{"compress_size",         INT_PARAM, &default_rtpengine_cfg.compress_size            },
457 459
 
458 460
 	/* MOS stats output */
459 461
 	/* global averages */
... ...
@@ -2799,6 +2801,10 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2799 2801
 	int rtpengine_retr, rtpengine_tout_ms = 1000;
2800 2802
 	char *cp;
2801 2803
 	static char buf[0x10000];
2804
+	static char buf_tmp[0x10000];
2805
+	int compress_size;
2806
+	char *tmp;
2807
+	int total_len, ret;
2802 2808
 	struct pollfd fds[1];
2803 2809
 	struct iovec *v;
2804 2810
 	str cmd = STR_NULL;
... ...
@@ -2831,8 +2837,24 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2831 2837
 			goto badproxy;
2832 2838
 		}
2833 2839
 
2840
+		tmp = buf;
2841
+		total_len = 0;
2842
+		for (i = 0; i < vcnt; i++) {
2843
+			memcpy(tmp, v[i].iov_base, v[i].iov_len);
2844
+			tmp += v[i].iov_len;
2845
+			total_len += v[i].iov_len;
2846
+		}
2847
+
2848
+		compress_size = cfg_get(rtpengine,rtpengine_cfg,compress_size);
2849
+		if ((compress_size > 0) && (total_len > compress_size)) {
2850
+			ret = compress_data(buf, total_len);
2851
+			if (ret > 0) {
2852
+				total_len = ret;
2853
+			}
2854
+		}
2855
+
2834 2856
 		do {
2835
-			len = writev(fd, v + 1, vcnt);
2857
+			len = write(fd, buf, total_len);
2836 2858
 		} while (len == -1 && errno == EINTR);
2837 2859
 		if (len <= 0) {
2838 2860
 			close(fd);
... ...
@@ -2841,6 +2863,15 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2841 2863
 		}
2842 2864
 		do {
2843 2865
 			len = read(fd, buf, sizeof(buf) - 1);
2866
+
2867
+			if (len > 0) {
2868
+				// uncompress the message if it's compressed
2869
+				ret = uncompress_data(buf, len, buf_tmp, sizeof(buf_tmp));
2870
+				if (ret > 0) {
2871
+					len = ret;
2872
+				}
2873
+				LM_DBG("Uncompressed message: %s\n", buf);
2874
+			}
2844 2875
 		} while (len == -1 && errno == EINTR);
2845 2876
 		close(fd);
2846 2877
 		if (len <= 0) {
... ...
@@ -2860,10 +2891,27 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2860 2891
 		}
2861 2892
 		v[0].iov_base = gencookie();
2862 2893
 		v[0].iov_len = strlen(v[0].iov_base);
2894
+
2895
+		tmp = buf;
2896
+		total_len = 0;
2897
+		for (i = 0; i < vcnt; i++) {
2898
+			memcpy(tmp, v[i].iov_base, v[i].iov_len);
2899
+			tmp += v[i].iov_len;
2900
+			total_len += v[i].iov_len;
2901
+		}
2902
+
2903
+		compress_size = cfg_get(rtpengine,rtpengine_cfg,compress_size);
2904
+		if ((compress_size > 0) && (total_len > compress_size)) {
2905
+			ret = compress_data(buf, total_len);
2906
+			if (ret > 0) {
2907
+				total_len = ret;
2908
+			}
2909
+		}
2910
+
2863 2911
 		rtpengine_retr = cfg_get(rtpengine,rtpengine_cfg,rtpengine_retr);
2864 2912
 		for (i = 0; i < rtpengine_retr; i++) {
2865 2913
 			do {
2866
-				len = writev(rtpp_socks[node->idx], v, vcnt + 1);
2914
+				len = write(rtpp_socks[node->idx], buf, total_len);
2867 2915
 			} while (len == -1 && (errno == EINTR || errno == ENOBUFS));
2868 2916
 			if (len <= 0) {
2869 2917
 				bencode_get_str(bencode_dictionary_get(dict, "command"), &cmd);
... ...
@@ -2876,6 +2924,13 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2876 2924
 				(fds[0].revents & POLLIN) != 0) {
2877 2925
 				do {
2878 2926
 					len = recv(rtpp_socks[node->idx], buf, sizeof(buf)-1, 0);
2927
+					if (len > 0) {
2928
+						ret = uncompress_data(buf, len, buf_tmp, sizeof(buf_tmp));
2929
+						if (ret > 0) {
2930
+							len = ret;
2931
+						}
2932
+						LM_DBG("Uncompressed message: %s\n", buf);
2933
+					}
2879 2934
 				} while (len == -1 && errno == EINTR);
2880 2935
 				if (len <= 0) {
2881 2936
 					bencode_get_str(bencode_dictionary_get(dict, "command"), &cmd);