Browse code

Merge pull request #2112 from kamailio/vhernando/kafka_module_branch2

kafka: module to produce and send messages to a Kafka server

vhernando authored on 04/11/2019 17:01:36
Showing 7 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,23 @@
0
+#
1
+# WARNING: do not run this directly, it should be run by the master Makefile
2
+
3
+include ../../Makefile.defs
4
+auto_gen=
5
+NAME=kafka.so
6
+
7
+ifeq ($(CROSS_COMPILE),)
8
+RDKAFKA_SUPPORTED=$(shell  \
9
+	if pkg-config --exists rdkafka; then \
10
+		echo 'librdkafka found'; \
11
+	fi)
12
+endif
13
+
14
+ifneq ($(RDKAFKA_SUPPORTED),)
15
+	DEFS+= $(shell pkg-config --cflags rdkafka)
16
+	LIBS+= $(shell pkg-config --libs rdkafka)
17
+else
18
+	DEFS+=-I$(LOCALBASE)/include
19
+	LIBS+=-L$(SYSBASE)/include/lib -L$(LOCALBASE)/lib -lrdkafka
20
+endif
21
+
22
+include ../../Makefile.modules
0 23
new file mode 100644
... ...
@@ -0,0 +1,4 @@
0
+docs = kafka.xml
1
+
2
+docbook_dir = ../../../../doc/docbook
3
+include $(docbook_dir)/Makefile.module
0 4
new file mode 100644
... ...
@@ -0,0 +1,41 @@
0
+<?xml version="1.0" encoding='ISO-8859-1'?>
1
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
2
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
3
+
4
+<!-- Include general documentation entities -->
5
+<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
6
+%docentities;
7
+
8
+]>
9
+
10
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
11
+	<bookinfo>
12
+		<title>Kafka Module</title>
13
+		<productname class="trade">&kamailioname;</productname>
14
+		<authorgroup>
15
+			<author>
16
+				<firstname>Vicente</firstname>
17
+				<surname>Hernando</surname>
18
+				<email>vhernando@sonoc.io</email>
19
+			</author>
20
+			<editor>
21
+				<firstname>Vicente</firstname>
22
+				<surname>Hernando</surname>
23
+				<email>vhernando@sonoc.io</email>
24
+			</editor>
25
+			<author>
26
+			    <firstname>Javier</firstname>
27
+				<surname>Gallart</surname>
28
+				<email>jgallart@sonoc.io</email>
29
+			</author>
30
+		</authorgroup>
31
+		<copyright>
32
+			<year>2019</year>
33
+			<holder>www.sonoc.io</holder>
34
+		</copyright>
35
+	</bookinfo>
36
+	<toc></toc>
37
+
38
+	<xi:include href="kafka_admin.xml"/>
39
+
40
+</book>
0 41
new file mode 100644
... ...
@@ -0,0 +1,231 @@
0
+<?xml version="1.0" encoding='ISO-8859-1'?>
1
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
2
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
3
+
4
+<!-- Include general documentation entities -->
5
+<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
6
+%docentities;
7
+
8
+]>
9
+<!-- Module User's Guide -->
10
+
11
+<chapter>
12
+
13
+  <title>&adminguide;</title>
14
+  
15
+  <section>
16
+	<title>Overview</title>
17
+	<para>
18
+	  This module produces and sends messages to a Kafka server.
19
+	</para>
20
+  </section>
21
+  <section>
22
+	<title>Dependencies</title>
23
+	<section>
24
+	  <title>&kamailio; Modules</title>
25
+	  <para>
26
+		The following modules must be loaded before this module:
27
+		<itemizedlist>
28
+		  <listitem>
29
+			<para>
30
+			  <emphasis>none</emphasis>.
31
+			</para>
32
+		  </listitem>
33
+		</itemizedlist>
34
+	  </para>
35
+	</section>
36
+	<section>
37
+	  <title>External Libraries or Applications</title>
38
+	  <para>
39
+		The following libraries or applications must be installed before running
40
+		&kamailio; with this module loaded:
41
+		<itemizedlist>
42
+		  <listitem>
43
+			<para>
44
+			  <emphasis>librdkafka</emphasis>: the Apache Kafka C/C++ client library.
45
+			  <ulink
46
+				  url='https://github.com/edenhill/librdkafka'>
47
+			  https://github.com/edenhill/librdkafka</ulink>
48
+			</para>
49
+			<para>
50
+			  Old librdkafka version like 1.1.0 has been reported to work.
51
+			  Newer versions should work fine (E.g: Version 1.2.2-RC1 also works OK)
52
+			  <ulink
53
+				  url='https://github.com/edenhill/librdkafka/releases'>
54
+			  https://github.com/edenhill/librdkafka/releases</ulink>
55
+			</para>
56
+		  </listitem>
57
+		</itemizedlist>
58
+	  </para>
59
+	</section>
60
+	<section>
61
+	  <title>Parameters</title>
62
+	  <section id="kafka.p.brokers">
63
+		<title><varname>brokers</varname> (string)</title>
64
+		<para>
65
+		  Specifies a list of brokers separated by commas.
66
+		</para>
67
+		<para>
68
+		  From librdkafka documentation:
69
+		</para>
70
+		<para>
71
+		  brokerlist is a ,-separated list of brokers in the format:
72
+		  &lt;broker1&gt;,&lt;broker2&gt;,
73
+		</para>
74
+		<para>
75
+		  Where each broker is in either the host or URL based format:
76
+		  <itemizedlist>
77
+			<listitem>&lt;host&gt;[:&lt;port&gt;]</listitem>
78
+			<listitem>&lt;proto&gt;://&lt;host&gt;[:port]</listitem>
79
+		  </itemizedlist>
80
+		</para>
81
+		<para>
82
+		  &lt;proto&gt; is either PLAINTEXT, SSL, SASL, SASL_PLAINTEXT
83
+		</para>
84
+		<para>
85
+		  The two formats can be mixed but ultimately the value of the
86
+		  <emphasis>security.protocol</emphasis> config property decides what brokers are allowed.
87
+		</para>
88
+		<para>
89
+		  <emphasis>
90
+			This parameter is mandatory. There is no default value.
91
+		  </emphasis>
92
+		</para>
93
+		<example>
94
+		  <title>Set <varname>brokers</varname> parameter</title>
95
+		  <programlisting format="linespecific">
96
+...
97
+modparam("kafka", "brokers", "localhost:9092")
98
+modparam("kafka", "brokers", "broker1:10000,broker2")
99
+modparam("kafka", "brokers", "SSL://broker3:9000,ssl://broker2")
100
+...
101
+		  </programlisting>
102
+		</example>
103
+	  </section>
104
+	  <section id="kafka.p.configuration">
105
+		<title><varname>configuration</varname> (string)</title>
106
+		<para>
107
+		  Specifies a set of general properties.
108
+		</para>
109
+		<para>
110
+		  Each configuration property follows: <emphasis>name = value</emphasis> pattern.
111
+		  And configuration properties are separated by <emphasis>;</emphasis>
112
+		</para>
113
+		<para>
114
+		  This parameter is optional, but if it exists it can be configured only once.
115
+		</para>
116
+		<example>
117
+		  <title>Set <varname>configuration</varname> parameter</title>
118
+		  <programlisting format="linespecific">
119
+...
120
+modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;queue.buffering.max.messages=1000000;metadata.request.timeout.ms=90000")
121
+
122
+modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;queue.buffering.max.messages=500000;debug=all;metadata.request.timeout.ms=900000")
123
+...
124
+		  </programlisting>
125
+		</example>
126
+	  </section>
127
+	  <section id="kafka.p.topic">
128
+		<title><varname>topic</varname> (string)</title>
129
+		<para>
130
+		  Specifies a topic name and a set of topic properties.
131
+		</para>
132
+		<para>
133
+		  The topic defined in topic parameter has to already exist in Kafka servers.
134
+		</para>
135
+		<para>
136
+		  Each topic property is a list of <emphasis>attribute = value</emphasis> separated by semicolon.
137
+		</para>
138
+		<para>
139
+		  name atribute indicates the topic name. It is mandatory.
140
+		  Other attributes mean names of properties and are optional.
141
+		</para>
142
+		<para>
143
+		  This parameter is optional. Each topic needs a topic parameter so several topic parameters are allowed.
144
+		</para>
145
+		<example>
146
+		  <title>Set <varname>topic</varname> parameter</title>
147
+		  <programlisting format="linespecific">
148
+...
149
+modparam("kafka", "topic", "name=my_topic;request.required.acks=0;request.timeout.ms=10000")
150
+modparam("kafka", "topic", "name=second_topic;request.required.acks=0;request.timeout.ms=10000")
151
+modparam("kafka", "topic", "name=third_topic")
152
+...
153
+		  </programlisting>
154
+		</example>
155
+	  </section>
156
+	</section>
157
+	<section>
158
+	  <title>Functions</title>
159
+	  <section id="kafka.f.kafka_send">
160
+		<title>
161
+		  <function moreinfo="none">kafka_send(topic, msg)</function>
162
+		</title>
163
+		<para>
164
+		  Send a message to a specific topic via Kafka server.
165
+		</para>
166
+		<para>
167
+		  This function returns -1 for all sort of errors. (So execution of script continues)
168
+		</para>
169
+		<para>
170
+		  Parameters:
171
+		  <itemizedlist>
172
+			<listitem><emphasis>topic</emphasis>: (string) name of the topic.
173
+			It is mandatory.</listitem>
174
+			<listitem><emphasis>msg</emphasis>: (string) message to send.
175
+			It is mandatory.</listitem>
176
+		  </itemizedlist>
177
+		</para>
178
+		<para>
179
+		  Available via KEMI framework as <emphasis>kafka.send</emphasis>.
180
+		</para>
181
+		<example>
182
+		  <title><function>kafka_send</function> usage</title>
183
+		  <programlisting format="linespecific">
184
+...
185
+# Send "test message" to topic "my_topic"			
186
+kafka_send("my_topic", "test message");
187
+...
188
+		  </programlisting>
189
+		</example>
190
+	  </section>
191
+	</section>
192
+	<section>
193
+	  <title><acronym>RPC</acronym> Commands</title>
194
+	  <section  id="kafka.stats">
195
+		<title><function moreinfo="none">kafka.stats</function></title>
196
+		<para>
197
+		  Show statistics about total sent messages and failed to deliver ones.
198
+		</para>
199
+		<example>
200
+		  <title><function>kafka.stats</function> usage</title>
201
+		  <programlisting format="linespecific">
202
+...
203
+&kamcmd; kafka.stats
204
+Total messages: 26  Errors: 0
205
+...
206
+		  </programlisting>
207
+		</example>
208
+	  </section>
209
+	  <section>
210
+		<title><function moreinfo="none">kafka.stats_topic</function></title>
211
+		<para>
212
+		  Show statistics about sent messages and failed to deliver ones for a specific topic.
213
+		</para>
214
+		<para>
215
+		  Parameter: <emphasis>topic</emphasis> (string) name of the topic. Required.
216
+		</para>
217
+		<example>
218
+		  <title><function>kafka.stats</function> usage</title>
219
+		  <programlisting format="linespecific">
220
+...
221
+# Show statistics for my_topic.
222
+&kamcmd; kafka.stats_topic "my_topic"
223
+Topic: my_topic  Total messages: 17  Errors: 0
224
+...
225
+		  </programlisting>
226
+		</example>
227
+	  </section>
228
+	</section>
229
+  </section>
230
+</chapter>
0 231
new file mode 100644
... ...
@@ -0,0 +1,335 @@
0
+/*
1
+ * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
2
+ *
3
+ * This file is part of Kamailio, a free SIP server.
4
+ *
5
+ * Kamailio is free software; you can redistribute it and/or modify
6
+ * it under the terms of the GNU General Public License as published by
7
+ * the Free Software Foundation; either version 2 of the License, or
8
+ * (at your option) any later version
9
+ *
10
+ * Kamailio is distributed in the hope that it will be useful,
11
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
+ * GNU General Public License for more details.
14
+ *
15
+ * You should have received a copy of the GNU General Public License
16
+ * along with this program; if not, write to the Free Software
17
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18
+ *
19
+ */
20
+
21
+/**
22
+ * \file
23
+ * \brief Kafka :: Module Core
24
+ * \ingroup kfk
25
+ *
26
+ * - Module: \ref kfk
27
+ */
28
+
29
+/**
30
+ * \defgroup kfk Kafka :: Kafka module for Kamailio
31
+ *
32
+ * This module contains functions related to Apache Kafka initialization and closing,
33
+ * as well as the module interface.
34
+ * It uses librdkafka library.
35
+ * Currently it only provides producer capabilites.
36
+ */
37
+
38
+/* Headers */
39
+#include <inttypes.h>
40
+
41
+#include "../../core/sr_module.h"
42
+#include "../../core/dprint.h"
43
+#include "../../core/mod_fix.h"
44
+#include "../../core/kemi.h"
45
+#include "../../core/rpc.h"
46
+#include "../../core/rpc_lookup.h"
47
+
48
+#include "kfk.h"
49
+
50
+MODULE_VERSION
51
+
52
+/* Declaration of static variables and functions. */
53
+
54
+static rpc_export_t rpc_cmds[];
55
+static int mod_init(void);
56
+static void mod_destroy(void);
57
+static int child_init(int rank);
58
+static int fixup_kafka_send(void** param, int param_no);
59
+static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
60
+
61
+/*
62
+ * Variables and functions to deal with module parameters.
63
+ */
64
+char *brokers_param = NULL; /**< List of brokers. */
65
+static int kafka_conf_param(modparam_t type, void *val);
66
+static int kafka_topic_param(modparam_t type, void *val);
67
+
68
+/**
69
+ * \brief Module commands
70
+ */
71
+static cmd_export_t cmds[] = {
72
+	{"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
73
+	 0, ANY_ROUTE},
74
+    { 0, 0, 0, 0, 0, 0}
75
+};
76
+
77
+/**
78
+ * \brief Structure for module parameters.
79
+ */
80
+static param_export_t params[]={
81
+	{"brokers", PARAM_STRING, &brokers_param},
82
+	{"configuration", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_conf_param},
83
+	{"topic", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_topic_param},
84
+    {0, 0, 0}
85
+};
86
+
87
+/**
88
+ * \brief Kafka :: Module interface
89
+ */
90
+struct module_exports exports = {
91
+	"kafka",
92
+	DEFAULT_DLFLAGS, /* dlopen flags */
93
+	cmds,
94
+	params,
95
+	0,              /* exported RPC methods */
96
+	0,         	/* exported pseudo-variables */
97
+	0,              /* response function */
98
+	mod_init,       /* module initialization function */
99
+	child_init,		/* per child init function */
100
+	mod_destroy     /* destroy function */
101
+};
102
+
103
+static int mod_init(void)
104
+{
105
+	/* Register RPC commands. */
106
+	if (rpc_register_array(rpc_cmds) != 0) {
107
+		LM_ERR("Failed to register RPC commands\n");
108
+		return -1;
109
+	}
110
+
111
+	/* Initialize statistics. */
112
+	if (kfk_stats_init()) {
113
+		LM_ERR("Failed to initialize statistics\n");
114
+		return -1;
115
+	}
116
+	
117
+	return 0;
118
+}
119
+
120
+static int child_init(int rank)
121
+{
122
+	/* skip child init for non-worker process ranks */
123
+	/* if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) */
124
+	/* We execute kfk_init in PROC_MAIN so it cleans messages, etc right 
125
+	   when destroying the module. */
126
+	if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
127
+		return 0;
128
+
129
+	if (kfk_init(brokers_param)) {
130
+		LM_ERR("Failed to initialize Kafka\n");
131
+		return -1;
132
+	}
133
+	return 0;
134
+}
135
+
136
+static void mod_destroy(void)
137
+{
138
+	LM_DBG("cleaning up\n");
139
+
140
+	kfk_close();
141
+
142
+	kfk_stats_close();
143
+}
144
+
145
+/**
146
+ * \brief Parse configuration parameter.
147
+ */
148
+static int kafka_conf_param(modparam_t type, void *val)
149
+{
150
+	return kfk_conf_parse((char*)val);
151
+}
152
+
153
+/**
154
+ * \brief Parse topic parameter.
155
+ */
156
+static int kafka_topic_param(modparam_t type, void *val)
157
+{
158
+	return kfk_topic_parse((char*)val);
159
+}
160
+
161
+static int fixup_kafka_send(void** param, int param_no)
162
+{
163
+	return fixup_spve_null(param, 1);
164
+}
165
+
166
+/**
167
+ * \brief Send a message via Kafka
168
+ */
169
+static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
170
+{
171
+	str s_topic;
172
+
173
+	if (ptopic == NULL) {
174
+		LM_ERR("Invalid topic parameter\n");
175
+		return -1;
176
+	}
177
+
178
+	if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
179
+		LM_ERR("No topic\n");
180
+		return -1;
181
+	}
182
+	if (s_topic.s == NULL || s_topic.len == 0) {
183
+		LM_ERR("Invalid topic string\n");
184
+		return -1;
185
+	}
186
+
187
+	str s_message;
188
+
189
+	if (pmessage == NULL) {
190
+		LM_ERR("Invalid message parameter\n");
191
+		return -1;
192
+	}
193
+
194
+	if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
195
+		LM_ERR("No message\n");
196
+		return -1;
197
+	}
198
+	if (s_message.s == NULL || s_message.len == 0) {
199
+		LM_ERR("Invalid message string\n");
200
+		return -1;
201
+	}
202
+
203
+	if (kfk_message_send(&s_topic, &s_message)) {
204
+		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
205
+			   s_topic.len, s_topic.s,
206
+			   s_message.len, s_message.s);
207
+		return -1;
208
+	}
209
+
210
+	LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
211
+		   s_topic.len, s_topic.s,
212
+		   s_message.len, s_message.s);
213
+	return 1;
214
+}
215
+
216
+/**
217
+ * \brief KEMI function to send a Kafka message.
218
+ */
219
+static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
220
+{
221
+	if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
222
+		LM_ERR("Invalid topic string\n");
223
+		return -1;
224
+	}
225
+
226
+	if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
227
+		LM_ERR("Invalid message string\n");
228
+		return -1;
229
+	}
230
+
231
+	if (kfk_message_send(s_topic, s_message)) {
232
+		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
233
+			   s_topic->len, s_topic->s,
234
+			   s_message->len, s_message->s);
235
+		return -1;
236
+	}
237
+
238
+	LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
239
+		   s_topic->len, s_topic->s,
240
+		   s_message->len, s_message->s);
241
+	return 1;
242
+}
243
+
244
+/**
245
+ * \brief Kafka :: Array with KEMI functions
246
+ */
247
+static sr_kemi_t sr_kemi_kafka_exports[] = {
248
+	{ str_init("kafka"), str_init("send"),
249
+	  SR_KEMIP_INT, ki_kafka_send,
250
+	  { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
251
+		SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
252
+	},
253
+
254
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
255
+};
256
+
257
+/**
258
+ * \brief Kafka :: register Kafka module
259
+ */
260
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
261
+{
262
+	sr_kemi_modules_add(sr_kemi_kafka_exports);
263
+	return 0;
264
+}
265
+
266
+static void rpc_kafka_stats(rpc_t *rpc, void *ctx)
267
+{
268
+	uint64_t msg_total = 0;
269
+	uint64_t msg_error = 0;
270
+
271
+	if (kfk_stats_get(&msg_total, &msg_error)) {
272
+		LM_ERR("Failed to get total statistics\n");
273
+		rpc->fault(ctx, 500, "Failed to get total statistics");
274
+		return;
275
+	}
276
+
277
+	LM_DBG("Total messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
278
+		   msg_total, msg_error);
279
+	if (rpc->rpl_printf(ctx, "Total messages: %" PRIu64 "  Errors: %" PRIu64,
280
+						msg_total, msg_error) < 0) {
281
+		rpc->fault(ctx, 500, "Internal error showing total statistics");
282
+		return;
283
+	}
284
+}
285
+
286
+static void rpc_kafka_stats_topic(rpc_t *rpc, void *ctx)
287
+{
288
+	str s_topic;
289
+
290
+	if (rpc->scan(ctx, "S", &s_topic) < 1) {
291
+		rpc->fault(ctx, 400, "required topic string");
292
+		return;
293
+	}
294
+
295
+	if (s_topic.len == 0 || s_topic.s == NULL) {
296
+		LM_ERR("Bad topic name\n");
297
+		rpc->fault(ctx, 400, "Bad topic name");
298
+		return;
299
+	}
300
+	
301
+	uint64_t msg_total = 0;
302
+	uint64_t msg_error = 0;
303
+
304
+	if (kfk_stats_topic_get(&s_topic, &msg_total, &msg_error)) {
305
+		LM_ERR("Failed to get statistics for topic: %.*s\n", s_topic.len, s_topic.s);
306
+		rpc->fault(ctx, 500, "Failed to get per topic statistics");
307
+		return;
308
+	}
309
+
310
+	LM_DBG("Topic: %.*s   messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
311
+		   s_topic.len, s_topic.s, msg_total, msg_error);
312
+	if (rpc->rpl_printf(ctx, "Topic: %.*s  Total messages: %" PRIu64 "  Errors: %" PRIu64,
313
+						s_topic.len, s_topic.s, msg_total, msg_error) < 0) {
314
+		rpc->fault(ctx, 500, "Internal error showing statistics for topic: %.*s",
315
+				   s_topic.len, s_topic.s);
316
+		return;
317
+	}
318
+}
319
+
320
+static const char* rpc_kafka_stats_doc[2] = {
321
+	"Print general topic independent statistics",
322
+	0
323
+};
324
+
325
+static const char* rpc_kafka_stats_topic_doc[2] = {
326
+	"Print statistics based on topic",
327
+	0
328
+};
329
+
330
+static rpc_export_t rpc_cmds[] = {
331
+	{"kafka.stats", rpc_kafka_stats, rpc_kafka_stats_doc, 0},
332
+	{"kafka.stats_topic", rpc_kafka_stats_topic, rpc_kafka_stats_topic_doc, 0},
333
+	{0, 0, 0, 0}
334
+};
0 335
new file mode 100644
... ...
@@ -0,0 +1,1147 @@
0
+/*
1
+ * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
2
+ *
3
+ * This file is part of Kamailio, a free SIP server.
4
+ *
5
+ * Kamailio is free software; you can redistribute it and/or modify
6
+ * it under the terms of the GNU General Public License as published by
7
+ * the Free Software Foundation; either version 2 of the License, or
8
+ * (at your option) any later version
9
+ *
10
+ * Kamailio is distributed in the hope that it will be useful,
11
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
+ * GNU General Public License for more details.
14
+ *
15
+ * You should have received a copy of the GNU General Public License
16
+ * along with this program; if not, write to the Free Software
17
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18
+ *
19
+ */
20
+
21
+/**
22
+ * \file
23
+ * \brief Kafka :: Apache Kafka functions via librdkafka
24
+ * \ingroup kfk
25
+ *
26
+ * - Module: \ref kfk
27
+ */
28
+
29
+#include <syslog.h> /* For log levels. */
30
+#include <librdkafka/rdkafka.h>
31
+
32
+#include "../../core/dprint.h"
33
+#include "../../core/parser/parse_param.h"
34
+#include "../../core/mem/pkg.h"
35
+#include "../../core/mem/shm_mem.h"
36
+#include "../../core/locking.h"
37
+
38
+/**
39
+ * \brief data type for a configuration property.
40
+ */
41
+typedef struct kfk_conf_node_s {
42
+	str *sname; /**< name of property */
43
+	str *svalue; /**< value of property */
44
+	struct kfk_conf_node_s *next; /**< next property in list */
45
+} kfk_conf_node_t;
46
+
47
+/**
48
+ * \brief list of configuration properties.
49
+ */
50
+typedef struct kfk_conf_s {
51
+	param_t *attrs; /**< parsed attributes from configuration parameter. */
52
+	char *spec; /**< original string of configuration. */
53
+	kfk_conf_node_t *property; /**< list of configuration properties. */
54
+} kfk_conf_t;
55
+
56
+/**
57
+ * \brief data type for a topic.
58
+ *
59
+ * This is an element in a topic list.
60
+ */
61
+typedef struct kfk_topic_s {
62
+	str *topic_name; /**< Name of the topic. */
63
+	rd_kafka_topic_t *rd_topic; /**< rd kafkfa topic structure. */
64
+	param_t *attrs; /**< parsed attributes for topic configuration. */
65
+	char *spec; /**< original string for topic configuration. */
66
+	kfk_conf_node_t *property; /**< list of configuration properties for a topic. */
67
+	struct kfk_topic_s *next; /**< Next element in topic list. */
68
+} kfk_topic_t;
69
+
70
+/**
71
+ * \brief stats about a topic.
72
+ */
73
+typedef struct kfk_stats_s {
74
+	str *topic_name; /**< Name of the topic, or NULL for general statistics. */
75
+	uint64_t total; /**< Total number of messages sent. */
76
+	uint64_t error; /**< Number of failed messages to sent. */
77
+	struct kfk_stats_s *next; /**< Next element in stats list. */
78
+} kfk_stats_t;
79
+
80
+/* Static variables. */
81
+static rd_kafka_conf_t *rk_conf = NULL;  /* Configuration object */
82
+static rd_kafka_t *rk = NULL; /* Producer instance handle */
83
+static kfk_conf_t *kfk_conf = NULL; /* List for Kafka configuration properties. */
84
+static kfk_topic_t *kfk_topic = NULL; /* List for Kafka topics. */
85
+
86
+#define ERRSTR_LEN 512 /**< Length of internal buffer for errors. */
87
+static char errstr[ERRSTR_LEN]; /* librdkafka API error reporting buffer */
88
+gen_lock_t *stats_lock = NULL; /**< Lock to protect shared statistics data. */
89
+
90
+/**
91
+ * \brief Total statistics
92
+ *
93
+ * First node (mandatory) is the general one with NULL topic.
94
+ * Next nodes are topic dependant ones and are optional.
95
+ * This way because general node is created in kfk_stats_init in mod_init is
96
+ * shared among every Kamailio process.
97
+ */
98
+static kfk_stats_t *stats_general;
99
+
100
+/* Static functions. */
101
+static void kfk_conf_free(kfk_conf_t *kconf);
102
+static void kfk_topic_free(kfk_topic_t *ktopic);
103
+static int kfk_conf_configure();
104
+static int kfk_topic_list_configure();
105
+static int kfk_topic_exist(str *topic_name);
106
+static rd_kafka_topic_t* kfk_topic_get(str *topic_name);
107
+static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err);
108
+static void kfk_stats_topic_free(kfk_stats_t *st_topic);
109
+
110
+/**
111
+ * \brief Kafka logger callback
112
+ */
113
+static void kfk_logger (const rd_kafka_t *rk, int level,
114
+		    const char *fac, const char *buf) {
115
+
116
+	switch(level) {
117
+		case LOG_EMERG:
118
+			LM_NPRL("RDKAFKA fac: %s : %s : %s\n",
119
+					fac, rk ? rd_kafka_name(rk) : NULL,
120
+					buf);
121
+			break;
122
+			
123
+		case LOG_ALERT:
124
+			LM_ALERT("RDKAFKA fac: %s : %s : %s\n",
125
+					 fac, rk ? rd_kafka_name(rk) : NULL,
126
+					 buf);
127
+			break;
128
+			
129
+		case LOG_CRIT:
130
+			LM_CRIT("RDKAFKA fac: %s : %s : %s\n",
131
+					fac, rk ? rd_kafka_name(rk) : NULL,
132
+					buf);
133
+			break;
134
+
135
+		case LOG_ERR:
136
+			LM_ERR("RDKAFKA fac: %s : %s : %s\n",
137
+				   fac, rk ? rd_kafka_name(rk) : NULL,
138
+				   buf);
139
+			break;
140
+
141
+		case LOG_WARNING:
142
+			LM_WARN("RDKAFKA fac: %s : %s : %s\n",
143
+					fac, rk ? rd_kafka_name(rk) : NULL,
144
+					buf);
145
+			break;
146
+
147
+		case LOG_NOTICE:
148
+			LM_NOTICE("RDKAFKA fac: %s : %s : %s\n",
149
+					fac, rk ? rd_kafka_name(rk) : NULL,
150
+					buf);
151
+			break;
152
+			
153
+		case LOG_INFO:
154
+			LM_INFO("RDKAFKA fac: %s : %s : %s\n",
155
+					fac, rk ? rd_kafka_name(rk) : NULL,
156
+					buf);
157
+			break;
158
+
159
+		case LOG_DEBUG:
160
+			LM_DBG("RDKAFKA fac: %s : %s : %s\n",
161
+				   fac, rk ? rd_kafka_name(rk) : NULL,
162
+				   buf);
163
+			break;
164
+
165
+		default:
166
+			LM_ERR("Unsupported kafka log level: %d\n", level);
167
+			break;
168
+	}
169
+}
170
+
171
+/**
172
+ * \brief Message delivery report callback using the richer rd_kafka_message_t object.
173
+ */
174
+static void kfk_msg_delivered (rd_kafka_t *rk,
175
+							   const rd_kafka_message_t *rkmessage, void *opaque) {
176
+
177
+	LM_DBG("Message delivered callback\n");
178
+	
179
+	const char *topic_name = NULL;
180
+	topic_name = rd_kafka_topic_name(rkmessage->rkt);
181
+	if (!topic_name) {
182
+		LM_ERR("Cannot get topic name for delivered message\n");
183
+		return;
184
+	}
185
+	
186
+	kfk_stats_add(topic_name, rkmessage->err);
187
+	
188
+	if (rkmessage->err) {
189
+		LM_ERR("RDKAFKA Message delivery failed: %s\n",
190
+			   rd_kafka_err2str(rkmessage->err));
191
+	} else {
192
+		LM_DBG("RDKAFKA Message delivered (%zd bytes, offset %"PRId64", "
193
+			   "partition %"PRId32"): %.*s\n",
194
+			   rkmessage->len, rkmessage->offset,
195
+			   rkmessage->partition,
196
+			   (int)rkmessage->len, (const char *)rkmessage->payload);
197
+	}
198
+}
199
+
200
+/**
201
+ * \brief Initialize kafka functionality.
202
+ *
203
+ * \param brokers brokers to add.
204
+ * \return 0 on success.
205
+ */
206
+int kfk_init(char *brokers)
207
+{
208
+	LM_DBG("Initializing Kafka\n");
209
+
210
+	if (brokers == NULL) {
211
+		LM_ERR("brokers parameter not set\n");
212
+		return -1;
213
+	}
214
+	
215
+	/*
216
+	 * Create Kafka client configuration place-holder
217
+	 */
218
+	rk_conf = rd_kafka_conf_new();
219
+
220
+	/* Set logger */
221
+	rd_kafka_conf_set_log_cb(rk_conf, kfk_logger);
222
+
223
+	/* Set message delivery callback. */
224
+	rd_kafka_conf_set_dr_msg_cb(rk_conf, kfk_msg_delivered);
225
+
226
+	/* Configure properties: */
227
+	if (kfk_conf_configure()) {
228
+		LM_ERR("Failed to configure general properties\n");
229
+		return -1;
230
+	}
231
+
232
+	/*
233
+	 * Create producer instance.
234
+	 *
235
+	 * NOTE: rd_kafka_new() takes ownership of the conf object
236
+	 *       and the application must not reference it again after
237
+	 *       this call.
238
+	 */
239
+	rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
240
+	if (!rk) {
241
+		LM_ERR("Failed to create new producer: %s\n", errstr);
242
+		return -1;
243
+	}
244
+	rk_conf = NULL; /* Now owned by producer. */
245
+	LM_DBG("Producer handle created\n");
246
+
247
+	LM_DBG("Adding broker: %s\n", brokers);
248
+	/* Add brokers */
249
+	if (rd_kafka_brokers_add(rk, brokers) == 0) {
250
+		LM_ERR("No valid brokers specified: %s\n", brokers);
251
+		return -1;
252
+	}
253
+	LM_DBG("Added broker: %s\n", brokers);
254
+
255
+	/* Topic creation and configuration. */
256
+	if (kfk_topic_list_configure()) {
257
+		LM_ERR("Failed to configure topics\n");
258
+		return -1;
259
+	}
260
+
261
+	return 0;
262
+}
263
+
264
+/**
265
+ * \brief Close kafka related functionality.
266
+ */
267
+void kfk_close()
268
+{
269
+	rd_kafka_resp_err_t err;
270
+	
271
+	LM_DBG("Closing Kafka\n");
272
+
273
+    /* Destroy the producer instance */
274
+	if (rk) {
275
+		/* Flushing messages. */
276
+		LM_DBG("Flushing messages\n");
277
+		err = rd_kafka_flush(rk, 0);
278
+		if (err) {
279
+			LM_ERR("Failed to flush messages: %s\n", rd_kafka_err2str(err));
280
+		}
281
+
282
+		/* Destroy producer. */
283
+		LM_DBG("Destroying instance of Kafka producer\n");
284
+		rd_kafka_destroy(rk);
285
+	}
286
+
287
+	/* Destroy configuration if not freed by rd_kafka_destroy. */
288
+	if (rk_conf) {
289
+		LM_DBG("Destroying instance of Kafka configuration\n");
290
+		rd_kafka_conf_destroy(rk_conf);
291
+	}
292
+
293
+	/* Free list of configuration properties. */
294
+	if (kfk_conf) {
295
+		kfk_conf_free(kfk_conf);
296
+	}
297
+
298
+	/* Free list of topics. */
299
+	while (kfk_topic) {
300
+		kfk_topic_t *next = kfk_topic->next;
301
+		kfk_topic_free(kfk_topic);
302
+		kfk_topic = next;
303
+	}
304
+}
305
+
306
+/**
307
+ * \brief Free a general configuration object.
308
+ */
309
+static void kfk_conf_free(kfk_conf_t *kconf)
310
+{
311
+	if (kconf == NULL) {
312
+		/* Nothing to free. */
313
+		return;
314
+	}
315
+
316
+	kfk_conf_node_t *knode = kconf->property;
317
+	while (knode) {
318
+		kfk_conf_node_t *next = knode->next;
319
+		pkg_free(knode);
320
+		knode = next;
321
+	}
322
+
323
+	free_params(kconf->attrs);
324
+	pkg_free(kconf);
325
+}
326
+
327
+/**
328
+ * \brief Parse general configuration properties for Kafka.
329
+ */
330
+int kfk_conf_parse(char *spec)
331
+{
332
+	param_t *pit = NULL;
333
+	param_hooks_t phooks;
334
+	kfk_conf_t *kconf = NULL;
335
+
336
+	if (kfk_conf != NULL) {
337
+		LM_ERR("Configuration already set\n");
338
+		goto error;
339
+	}
340
+	
341
+	str s;
342
+	s.s = spec;
343
+	s.len = strlen(spec);
344
+	if(s.s[s.len-1]==';') {
345
+		s.len--;
346
+	}
347
+	if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
348
+		LM_ERR("Failed parsing params value\n");
349
+		goto error;
350
+	}
351
+
352
+	kconf = (kfk_conf_t*)pkg_malloc(sizeof(kfk_conf_t));
353
+	if (kconf == NULL) {
354
+		LM_ERR("No more pkg memory\n");
355
+		goto error;
356
+	}
357
+	memset(kconf, 0, sizeof(kfk_conf_t));
358
+	kconf->attrs = pit;
359
+	kconf->spec = spec;
360
+	for (pit = kconf->attrs; pit; pit=pit->next)
361
+	{
362
+		/* Parse a property. */
363
+		kfk_conf_node_t *knode = NULL;
364
+		knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
365
+		if (knode == NULL) {
366
+			LM_ERR("No more pkg memory\n");
367
+			goto error;
368
+		}
369
+		memset(knode, 0, sizeof(kfk_conf_node_t));
370
+		
371
+		knode->sname = &pit->name;
372
+		knode->svalue = &pit->body;
373
+		if (knode->sname && knode->svalue) {
374
+			LM_DBG("Parsed property: %.*s -> %.*s\n",
375
+				   knode->sname->len, knode->sname->s,
376
+				   knode->svalue->len, knode->svalue->s);
377
+		}
378
+
379
+		/* Place node at beginning of knode list. */
380
+	    knode->next = kconf->property;
381
+		kconf->property = knode;
382
+	} /* for pit */
383
+
384
+	kfk_conf = kconf;
385
+	return 0;
386
+
387
+error:
388
+	if(pit!=NULL) {
389
+		free_params(pit);
390
+	}
391
+
392
+	if(kconf != NULL) {
393
+		kfk_conf_free(kconf);
394
+	}
395
+	return -1;
396
+}
397
+
398
+/**
399
+ * \brief Configure Kafka properties.
400
+ *
401
+ * \return 0 on success. 
402
+ */
403
+static int kfk_conf_configure()
404
+{
405
+	if (kfk_conf == NULL) {
406
+		/* Nothing to configure. */
407
+		LM_DBG("No properties to configure\n");
408
+		return 0;
409
+	}
410
+
411
+	LM_DBG("Configuring properties\n");
412
+	
413
+	kfk_conf_node_t *knode = kfk_conf->property;
414
+	while (knode) {
415
+		kfk_conf_node_t *next = knode->next;
416
+		str *sname = knode->sname;
417
+		str *svalue = knode->svalue;
418
+		knode = next;
419
+		
420
+		if (sname == NULL || sname->len == 0 || sname->s == NULL) {
421
+			LM_ERR("Bad name in configuration property\n");
422
+			continue;
423
+		}
424
+
425
+		if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
426
+			LM_ERR("Bad value in configuration property\n");
427
+			continue;
428
+		}
429
+
430
+		/* We temporarily convert to zstring. */
431
+		char cname = sname->s[sname->len];
432
+		sname->s[sname->len] = '\0';
433
+		char cvalue = svalue->s[svalue->len];
434
+		svalue->s[svalue->len] = '\0';
435
+		
436
+		LM_DBG("Setting property: %s -> %s\n", sname->s, svalue->s);
437
+		
438
+		if (rd_kafka_conf_set(rk_conf, sname->s, svalue->s,
439
+							  errstr, sizeof(errstr)) !=
440
+			RD_KAFKA_CONF_OK) {
441
+			LM_ERR("Configuration failed: %s\n", errstr);
442
+
443
+			/* We restore zstrings back to str */
444
+			sname->s[sname->len] = cname;
445
+			svalue->s[svalue->len] = cvalue;
446
+			return -1;
447
+		}
448
+
449
+		/* We restore zstrings back to str */
450
+		sname->s[sname->len] = cname;
451
+		svalue->s[svalue->len] = cvalue;
452
+
453
+	} /* while knode */
454
+	
455
+	return 0;
456
+}
457
+
458
+/**
459
+ * \brief Free a topic object.
460
+ */
461
+static void kfk_topic_free(kfk_topic_t *ktopic)
462
+{
463
+	if (ktopic == NULL) {
464
+		/* Nothing to free. */
465
+		return;
466
+	}
467
+
468
+	kfk_conf_node_t *knode = ktopic->property;
469
+	while (knode) {
470
+		kfk_conf_node_t *next = knode->next;
471
+		pkg_free(knode);
472
+		knode = next;
473
+	}
474
+
475
+	/* Destroy rd Kafka topic. */
476
+	if (ktopic->rd_topic) {
477
+		rd_kafka_topic_destroy(ktopic->rd_topic);
478
+	}
479
+	
480
+	free_params(ktopic->attrs);
481
+	pkg_free(ktopic);
482
+}
483
+
484
+/**
485
+ * \brief Parse topic properties for Kafka.
486
+ */
487
+int kfk_topic_parse(char *spec)
488
+{
489
+	param_t *pit = NULL;
490
+	param_hooks_t phooks;
491
+	kfk_topic_t *ktopic = NULL;
492
+
493
+	str s;
494
+	s.s = spec;
495
+	s.len = strlen(spec);
496
+	if(s.s[s.len-1]==';') {
497
+		s.len--;
498
+	}
499
+	if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
500
+		LM_ERR("Failed parsing params value\n");
501
+		goto error;
502
+	}
503
+
504
+	ktopic = (kfk_topic_t*)pkg_malloc(sizeof(kfk_topic_t));
505
+	if (ktopic == NULL) {
506
+		LM_ERR("No more pkg memory\n");
507
+		goto error;
508
+	}
509
+	memset(ktopic, 0, sizeof(kfk_topic_t));
510
+	ktopic->attrs = pit;
511
+	ktopic->spec = spec;
512
+	for (pit = ktopic->attrs; pit; pit=pit->next)
513
+	{
514
+		/* Check for topic name. */
515
+		if (pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
516
+			if (ktopic->topic_name != NULL) {
517
+				LM_ERR("Topic name already set\n");
518
+				goto error;
519
+			}
520
+			ktopic->topic_name = &pit->body;
521
+			LM_DBG("Topic name: %.*s\n", pit->body.len, pit->body.s);
522
+			
523
+		} else {
524
+
525
+			/* Parse a property. */
526
+			kfk_conf_node_t *knode = NULL;
527
+			knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
528
+			if (knode == NULL) {
529
+				LM_ERR("No more pkg memory\n");
530
+				goto error;
531
+			}
532
+			memset(knode, 0, sizeof(kfk_conf_node_t));
533
+			
534
+			knode->sname = &pit->name;
535
+			knode->svalue = &pit->body;
536
+			if (knode->sname && knode->svalue) {
537
+				LM_DBG("Topic parsed property: %.*s -> %.*s\n",
538
+					   knode->sname->len, knode->sname->s,
539
+					   knode->svalue->len, knode->svalue->s);
540
+			}
541
+			
542
+			/* Place node at beginning of ktopic list. */
543
+			knode->next = ktopic->property;
544
+			ktopic->property = knode;
545
+		} /* if pit->name.len == 4 */
546
+	} /* for pit */
547
+
548
+	/* Topic name is mandatory. */
549
+	if(ktopic->topic_name == NULL)
550
+	{
551
+		LM_ERR("No topic name\n");
552
+		goto error;
553
+	}
554
+
555
+	/* Place topic at beginning of topic list. */
556
+	ktopic->next = kfk_topic;
557
+	kfk_topic = ktopic;
558
+	return 0;
559
+
560
+error:
561
+	if(pit!=NULL) {
562
+		free_params(pit);
563
+	}
564
+
565
+	if(ktopic != NULL) {
566
+		kfk_topic_free(ktopic);
567
+	}
568
+	return -1;
569
+}
570
+
571
+/**
572
+ * \brief Create and configure a topic.
573
+ *
574
+ * \return 0 on success.
575
+ */
576
+static int kfk_topic_configure(kfk_topic_t *ktopic)
577
+{
578
+	rd_kafka_topic_conf_t *topic_conf = NULL;
579
+	rd_kafka_topic_t *rkt = NULL;
580
+	
581
+	if (ktopic == NULL) {
582
+		LM_ERR("No topic to create\n");
583
+		goto error;
584
+	}
585
+	
586
+	/* Check topic name. */
587
+	if (!ktopic->topic_name || !ktopic->topic_name->s || ktopic->topic_name->len == 0) {
588
+		LM_ERR("Bad topic name\n");
589
+		goto error;
590
+	}
591
+
592
+	int topic_found = kfk_topic_exist(ktopic->topic_name);
593
+	if (topic_found == -1) {
594
+		LM_ERR("Failed to search for topic %.*s in cluster\n",
595
+			   ktopic->topic_name->len, ktopic->topic_name->s);
596
+		goto error;
597
+	} else if (topic_found == 0) {
598
+		LM_ERR("Topic not found %.*s in cluster\n",
599
+			   ktopic->topic_name->len, ktopic->topic_name->s);
600
+		goto error;
601
+	}
602
+	
603
+	LM_DBG("Creating topic: %.*s\n",
604
+		   ktopic->topic_name->len, ktopic->topic_name->s);
605
+
606
+	/* Topic configuration */
607
+
608
+	topic_conf = rd_kafka_topic_conf_new();
609
+
610
+	kfk_conf_node_t *knode = kfk_topic->property;
611
+	while (knode) {
612
+		kfk_conf_node_t *next = knode->next;
613
+		str *sname = knode->sname;
614
+		str *svalue = knode->svalue;
615
+		knode = next;
616
+		
617
+		if (sname == NULL || sname->len == 0 || sname->s == NULL) {
618
+			LM_ERR("Bad name in topic configuration property\n");
619
+			continue;
620
+		}
621
+
622
+		if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
623
+			LM_ERR("Bad value in topic configuration property\n");
624
+			continue;
625
+		}
626
+
627
+		/* We temporarily convert to zstring. */
628
+		char cname = sname->s[sname->len];
629
+		sname->s[sname->len] = '\0';
630
+		char cvalue = svalue->s[svalue->len];
631
+		svalue->s[svalue->len] = '\0';
632
+		
633
+		LM_DBG("Setting topic property: %s -> %s\n", sname->s, svalue->s);
634
+
635
+		rd_kafka_conf_res_t res;
636
+		res = rd_kafka_topic_conf_set(topic_conf, sname->s, svalue->s,
637
+									  errstr, sizeof(errstr));
638
+		if (res != RD_KAFKA_CONF_OK) {
639
+			LM_ERR("Failed to set topic configuration: %s -> %s\n",
640
+				   sname->s, svalue->s);
641
+
642
+			/* We restore zstrings back to str */
643
+			sname->s[sname->len] = cname;
644
+			svalue->s[svalue->len] = cvalue;
645
+
646
+			goto error;
647
+		}
648
+
649
+		/* We restore zstrings back to str */
650
+		sname->s[sname->len] = cname;
651
+		svalue->s[svalue->len] = cvalue;
652
+
653
+	} /* while knode */
654
+
655
+	/* We temporarily convert to zstring. */
656
+	char c_topic_name = ktopic->topic_name->s[ktopic->topic_name->len];
657
+	ktopic->topic_name->s[ktopic->topic_name->len] = '\0';
658
+
659
+	rkt = rd_kafka_topic_new(rk, ktopic->topic_name->s, topic_conf);
660
+	if (!rkt) {
661
+		LM_ERR("Failed to create topic (%s): %s\n",
662
+			   ktopic->topic_name->s,
663
+			   rd_kafka_err2str(rd_kafka_last_error()));
664
+
665
+		/* We restore zstrings back to str */
666
+		ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
667
+
668
+		goto error;
669
+	}
670
+	topic_conf = NULL; /* Now owned by topic */
671
+	LM_DBG("Topic created: %s\n", ktopic->topic_name->s);
672
+	
673
+	/* We restore zstrings back to str */
674
+	ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
675
+
676
+	/* Everything went fine. */
677
+	ktopic->rd_topic = rkt;
678
+	return 0;
679
+
680
+error:
681
+
682
+	/* Destroy topic configuration. */
683
+	if (topic_conf) {
684
+		rd_kafka_topic_conf_destroy(topic_conf);
685
+	}
686
+
687
+	/* Destroy topic */
688
+	if (rkt) {
689
+		LM_DBG("Destroying topic\n");
690
+		rd_kafka_topic_destroy(rkt);
691
+	}
692
+
693
+	return -1;
694
+}
695
+	
696
+/**
697
+ * \brief Create and configure a list of topics.
698
+ *
699
+ * \return 0 on success.
700
+ */
701
+static int kfk_topic_list_configure()
702
+{
703
+	kfk_topic_t *ktopic = kfk_topic;
704
+
705
+	while (ktopic) {
706
+		kfk_topic_t *next = ktopic->next;
707
+		/* Create current topic. */
708
+		if (kfk_topic_configure(ktopic)) {
709
+			LM_ERR("Failed to create topic: %.*s\n",
710
+				   ktopic->topic_name->len, ktopic->topic_name->s);
711
+			return -1;
712
+		}
713
+		ktopic = next;
714
+	}
715
+
716
+	return 0;
717
+}
718
+
719
+/* -1 means RD_POLL_INFINITE */
720
+/* 100000 means 100 seconds */
721
+#define METADATA_TIMEOUT 100000 /**< Timeout when asking for metadata in milliseconds. */
722
+
723
+/**
724
+ * \brief check that a topic exists in cluster.
725
+ *
726
+ * \return 0 if topic does not exist.
727
+ * \return 1 if topic does exist.
728
+ * \return -1 on error.
729
+ */
730
+static int kfk_topic_exist(str *topic_name)
731
+{
732
+	/* Where to receive metadata. */
733
+	const struct rd_kafka_metadata *metadatap = NULL;
734
+
735
+	if (!topic_name || topic_name->len == 0 || topic_name->s == NULL) {
736
+		LM_ERR("Bad topic name\n");
737
+		goto error;
738
+	}
739
+
740
+	/* Get metadata for all topics. */
741
+	rd_kafka_resp_err_t res;
742
+	res = rd_kafka_metadata(rk, 1, NULL, &metadatap, METADATA_TIMEOUT);
743
+	if (res != RD_KAFKA_RESP_ERR_NO_ERROR) {
744
+		LM_ERR("Failed to get metadata: %s\n", rd_kafka_err2str(res));
745
+		goto error;
746
+	}
747
+
748
+	/* List topics */
749
+	int topic_found = 0; /* Topic not found by default. */
750
+
751
+	for (int i=0; i<metadatap->topic_cnt; i++) {
752
+		rd_kafka_metadata_topic_t *t = &metadatap->topics[i];
753
+		if (t->topic) {
754
+			LM_DBG("Metadata Topic: %s\n", t->topic);
755
+			if (strncmp(topic_name->s, t->topic, topic_name->len) == 0) {
756
+				topic_found = 1;
757
+				LM_DBG("Metadata Topic (%s) found!\n", t->topic);
758
+				break;
759
+			}
760
+		}
761
+	} // for (i=0; i<m->topic_cnt; i++)
762
+
763
+	/* Destroy metadata. */
764
+	rd_kafka_metadata_destroy(metadatap);
765
+
766
+	if (topic_found == 0) {
767
+		LM_DBG("Topic not found: %.*s\n", topic_name->len, topic_name->s);
768
+		return 0;
769
+	}