Browse code

tls: added a minimum overhead shm buffer queue

Minimum overhead buffer queue in shm memory, based on
tcp_wbuffer_queue (tcp_conn.h).

Andrei Pelinescu-Onciul authored on 20/05/2010 14:22:16
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,292 @@
1
+/* 
2
+ * $Id$
3
+ * 
4
+ * Copyright (C) 2010 iptelorg GmbH
5
+ *
6
+ * Permission to use, copy, modify, and distribute this software for any
7
+ * purpose with or without fee is hereby granted, provided that the above
8
+ * copyright notice and this permission notice appear in all copies.
9
+ *
10
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17
+ */
18
+/** minimal overhead buffer queue in shm memory.
19
+ * @file modules/tls/sbufq.h
20
+ * @ingroup: tls
21
+ * Module: @ref tls
22
+ */
23
+/*
24
+ * History:
25
+ * --------
26
+ *  2010-03-31  initial version, based on tcp_conn.h tcp_wbuffer_queue (andrei)
27
+*/
28
+
29
+#ifndef __sbufq_h
30
+#define __sbufq_h
31
+
32
+#include "../../compiler_opt.h"
33
+#include "../../ut.h"
34
+#include "../../mem/shm_mem.h"
35
+#include "../../timer_ticks.h"
36
+#include "../../timer.h"
37
+#include "../../dprint.h"
38
+#include <string.h>
39
+
40
+
41
+struct sbuf_elem {
42
+	struct sbuf_elem* next;
43
+	unsigned int b_size; /**< buf size */
44
+	char buf[1]; /**< variable size buffer */
45
+};
46
+
47
+struct sbuffer_queue {
48
+	struct sbuf_elem* first;
49
+	struct sbuf_elem* last;
50
+	ticks_t last_chg; /**< last change (creation time or partial flush)*/
51
+	unsigned int queued; /**< total size */
52
+	unsigned int offset; /**< offset in the first buffer where unflushed data
53
+							starts */
54
+	unsigned int last_used; /**< how much of the last buffer is used */
55
+};
56
+
57
+
58
+/* sbufq_flush() output flags */
59
+#define F_BUFQ_EMPTY 1
60
+#define F_BUFQ_ERROR_FLUSH 2
61
+
62
+
63
+#define sbufq_empty(bq) ((bq)->first==0)
64
+#define sbufq_non_empty(bq) ((bq)->first!=0)
65
+
66
+
67
+
68
+/** adds/appends data to a buffer queue.
69
+ * WARNING: it does no attempt to synchronize access/lock. If needed it should
70
+ * be called under lock.
71
+ * @param q - buffer queue
72
+ * @param data
73
+ * @param size
74
+ * @param min_buf_size - min size to allocate for new buffer elements
75
+ * @return 0 on success, -1 on error (mem. allocation)
76
+ */
77
+inline static int sbufq_add(struct sbuffer_queue* q, const void* data,
78
+							unsigned int size, unsigned int min_buf_size)
79
+{
80
+	struct sbuf_elem* b;
81
+	unsigned int last_free;
82
+	unsigned int b_size;
83
+	unsigned int crt_size;
84
+	ticks_t t;
85
+	
86
+	t=get_ticks_raw();
87
+	
88
+	if (likely(q->last==0)) {
89
+		b_size=MAX_unsigned(min_buf_size, size);
90
+		b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
91
+		if (unlikely(b==0))
92
+			goto error;
93
+		b->b_size=b_size;
94
+		b->next=0;
95
+		q->last=b;
96
+		q->first=b;
97
+		q->last_used=0;
98
+		q->offset=0;
99
+		q->last_chg=get_ticks_raw();
100
+		last_free=b_size;
101
+		crt_size=size;
102
+		goto data_cpy;
103
+	}else{
104
+		b=q->last;
105
+	}
106
+	
107
+	while(size){
108
+		last_free=b->b_size-q->last_used;
109
+		if (last_free==0){
110
+			b_size=MAX_unsigned(min_buf_size, size);
111
+			b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
112
+			if (unlikely(b==0))
113
+				goto error;
114
+			b->b_size=b_size;
115
+			b->next=0;
116
+			q->last->next=b;
117
+			q->last=b;
118
+			q->last_used=0;
119
+			last_free=b->b_size;
120
+		}
121
+		crt_size=MIN_unsigned(last_free, size);
122
+data_cpy:
123
+		memcpy(b->buf+q->last_used, data, crt_size);
124
+		q->last_used+=crt_size;
125
+		size-=crt_size;
126
+		data+=crt_size;
127
+		q->queued+=crt_size;
128
+	}
129
+	return 0;
130
+error:
131
+	return -1;
132
+}
133
+
134
+
135
+
136
+/** inserts data (at the beginning) in a buffer queue.
137
+ * Note: should never be called after sbufq_run().
138
+ * WARNING: it does no attempt to synchronize access/lock. If needed it should
139
+ * be called under lock.
140
+ * @param q - buffer queue
141
+ * @param data
142
+ * @param size
143
+ * @param min_buf_size - min size to allocate for new buffer elements
144
+ * @return 0 on success, -1 on error (mem. allocation)
145
+ */
146
+inline static int sbufq_insert(struct sbuffer_queue* q, const void* data, 
147
+							unsigned int size, unsigned int min_buf_size)
148
+{
149
+	struct sbuf_elem* b;
150
+	
151
+	if (likely(q->first==0)) /* if empty, use sbufq_add */
152
+		return sbufq_add(q, data, size, min_buf_size);
153
+	
154
+	if (unlikely(q->offset)){
155
+		LOG(L_CRIT, "BUG: non-null offset %d (bad call, should"
156
+				"never be called after sbufq_run())\n", q->offset);
157
+		goto error;
158
+	}
159
+	if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
160
+		/* one block with enough space in it for size bytes */
161
+		memmove(q->first->buf+size, q->first->buf, size);
162
+		memcpy(q->first->buf, data, size);
163
+		q->last_used+=size;
164
+	}else{
165
+		/* create a size bytes block directly */
166
+		b=shm_malloc(sizeof(*b)+size-sizeof(b->buf));
167
+		if (unlikely(b==0))
168
+			goto error;
169
+		b->b_size=size;
170
+		/* insert it */
171
+		b->next=q->first;
172
+		q->first=b;
173
+		memcpy(b->buf, data, size);
174
+	}
175
+	
176
+	q->queued+=size;
177
+	return 0;
178
+error:
179
+	return -1;
180
+}
181
+
182
+
183
+/** destroy a buffer queue.
184
+ * Only the content is destroyed (shm_free()'d), the queue head is
185
+ * re-intialized.
186
+ * WARNING: it does no attempt to synchronize access/lock. If needed it should
187
+ * be called under lock.
188
+ * @param q - buffer queue
189
+ * @return - number of bytes that used to be queued (>=0).
190
+ */
191
+inline static unsigned int sbufq_destroy(struct  sbuffer_queue* q)
192
+{
193
+	struct sbuf_elem* b;
194
+	struct sbuf_elem* next_b;
195
+	int unqueued;
196
+	
197
+	unqueued=0;
198
+	if (likely(q->first)){
199
+		b=q->first;
200
+		do{
201
+			next_b=b->next;
202
+			unqueued+=(b==q->last)?q->last_used:b->b_size;
203
+			if (b==q->first)
204
+				unqueued-=q->offset;
205
+			shm_free(b);
206
+			b=next_b;
207
+		}while(b);
208
+	}
209
+	memset(q, 0, sizeof(*q));
210
+	return unqueued;
211
+}
212
+
213
+
214
+
215
+/** tries to flush the queue.
216
+ * Tries to flush as much as possible from the given queue, using the 
217
+ * given callback.
218
+ * WARNING: it does no attempt to synchronize access/lock. If needed it should
219
+ * be called under lock.
220
+ * @param q - buffer queue
221
+ * @param *flags - set to:
222
+ *                   F_BUFQ_EMPTY if the queued is completely flushed
223
+ *                   F_BUFQ_ERROR_FLUSH if the flush_f callback returned error.
224
+ * @param flush_f - flush function (callback). modeled after write():
225
+ *                    flush_f(param1, param2, const void* buf, unsigned size).
226
+ *                    It should return the number of bytes "flushed" on
227
+ *                    success, or <0 on error. If the number of bytes
228
+ *                    "flushed" is smaller then the requested size, it
229
+ *                    would be assumed that no more bytes can be flushed
230
+ *                    and sbufq_flush will exit.
231
+ * @param flush_p1 - parameter for the flush function callback.
232
+ * @param flush_p2 - parameter for the flush function callback.
233
+ * @return -1 on internal error, or the number of bytes flushed on
234
+ *            success (>=0). Note that the flags param is
235
+ *            always set and it should be used to check for errors, since
236
+ *            a flush_f() failure will not result in a negative return.
237
+ */
238
+inline static int sbufq_flush(struct sbuffer_queue* q, int* flags,
239
+								int (*flush_f)(void* p1, void* p2,
240
+												const void* buf,
241
+												unsigned size),
242
+								void* flush_p1, void* flush_p2)
243
+{
244
+	struct sbuf_elem *b;
245
+	int n;
246
+	int ret;
247
+	int block_size;
248
+	char* buf;
249
+	
250
+	*flags=0;
251
+	ret=0;
252
+	while(q->first){
253
+		block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
254
+						q->offset;
255
+		buf=q->first->buf+q->offset;
256
+		n=flush_f(flush_p1, flush_p2, buf, block_size);
257
+		if (likely(n>0)){
258
+			ret+=n;
259
+			if (likely(n==block_size)){
260
+				b=q->first;
261
+				q->first=q->first->next; 
262
+				shm_free(b);
263
+				q->offset=0;
264
+				q->queued-=block_size;
265
+				ret+=block_size;
266
+			}else{
267
+				q->offset+=n;
268
+				q->queued-=n;
269
+				ret+=n;
270
+				break;
271
+			}
272
+		}else{
273
+			if (unlikely(n<0))
274
+				*flags|=F_BUFQ_ERROR_FLUSH;
275
+			break;
276
+		}
277
+	}
278
+	if (likely(q->first==0)){
279
+		q->last=0;
280
+		q->last_used=0;
281
+		q->offset=0;
282
+		*flags|=F_BUFQ_EMPTY;
283
+	}
284
+	return ret;
285
+}
286
+
287
+
288
+
289
+
290
+#endif /*__sbufq_h*/
291
+
292
+/* vi: set ts=4 sw=4 tw=79:ai:cindent: */