1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include "serf_bucket_util.h"
20 /* Should be an APR_RING? */
21 typedef struct bucket_list {
22 serf_bucket_t *bucket;
23 struct bucket_list *next;
27 bucket_list_t *list; /* active buckets */
28 bucket_list_t *done; /* we finished reading this; now pending a destroy */
29 } aggregate_context_t;
32 static void cleanup_aggregate(aggregate_context_t *ctx,
33 serf_bucket_alloc_t *allocator)
35 bucket_list_t *next_list;
37 /* If we finished reading a bucket during the previous read, then
38 * we can now toss that bucket.
40 while (ctx->done != NULL) {
41 next_list = ctx->done->next;
43 serf_bucket_destroy(ctx->done->bucket);
44 serf_bucket_mem_free(allocator, ctx->done);
46 ctx->done = next_list;
50 SERF_DECLARE(serf_bucket_t *) serf_bucket_aggregate_create(
51 serf_bucket_alloc_t *allocator)
53 aggregate_context_t *ctx;
55 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
59 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
62 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
64 aggregate_context_t *ctx = bucket->data;
65 bucket_list_t *next_ctx;
68 serf_bucket_destroy(ctx->list->bucket);
69 next_ctx = ctx->list->next;
70 serf_bucket_mem_free(bucket->allocator, ctx->list);
73 cleanup_aggregate(ctx, bucket->allocator);
75 serf_default_destroy_and_data(bucket);
78 SERF_DECLARE(void) serf_bucket_aggregate_become(serf_bucket_t *bucket)
80 aggregate_context_t *ctx;
82 ctx = serf_bucket_mem_alloc(bucket->allocator, sizeof(*ctx));
86 bucket->type = &serf_bucket_type_aggregate;
89 /* The allocator remains the same. */
93 SERF_DECLARE(void) serf_bucket_aggregate_prepend(
94 serf_bucket_t *aggregate_bucket,
95 serf_bucket_t *prepend_bucket)
97 aggregate_context_t *ctx = aggregate_bucket->data;
98 bucket_list_t *new_list;
100 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
102 new_list->bucket = prepend_bucket;
103 new_list->next = ctx->list;
105 ctx->list = new_list;
108 SERF_DECLARE(void) serf_bucket_aggregate_append(
109 serf_bucket_t *aggregate_bucket,
110 serf_bucket_t *append_bucket)
112 aggregate_context_t *ctx = aggregate_bucket->data;
113 bucket_list_t *new_list;
115 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
117 new_list->bucket = append_bucket;
118 new_list->next = NULL;
120 /* If we use APR_RING, this is trivial. So, wait.
121 new_list->next = ctx->list;
122 ctx->list = new_list;
124 if (ctx->list == NULL) {
125 ctx->list = new_list;
128 bucket_list_t *scan = ctx->list;
130 while (scan->next != NULL)
132 scan->next = new_list;
136 SERF_DECLARE(void) serf_bucket_aggregate_prepend_iovec(
137 serf_bucket_t *aggregate_bucket,
143 /* Add in reverse order. */
144 for (i = vecs_count - 1; i > 0; i--) {
145 serf_bucket_t *new_bucket;
147 new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
150 aggregate_bucket->allocator);
152 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
157 SERF_DECLARE(void) serf_bucket_aggregate_append_iovec(
158 serf_bucket_t *aggregate_bucket,
164 for (i = 0; i < vecs_count; i++) {
165 serf_bucket_t *new_bucket;
167 new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
170 aggregate_bucket->allocator);
172 serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
177 static apr_status_t read_aggregate(serf_bucket_t *bucket,
178 apr_size_t requested,
179 int vecs_size, struct iovec *vecs,
182 aggregate_context_t *ctx = bucket->data;
192 serf_bucket_t *head = ctx->list->bucket;
195 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
198 if (SERF_BUCKET_READ_ERROR(status))
201 /* Add the number of vecs we read to our running total. */
202 *vecs_used += cur_vecs_used;
204 if (cur_vecs_used > 0 || status) {
205 bucket_list_t *next_list;
207 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
208 * as it isn't safe to read more without returning to our caller.
210 if (!status || APR_STATUS_IS_EAGAIN(status)) {
214 /* However, if we read EOF, we can stash this bucket in a
215 * to-be-freed list and move on to the next bucket. This ensures
216 * that the bucket stays alive (so as not to violate our read
217 * semantics). We'll destroy this list of buckets the next time
218 * we are asked to perform a read operation - thus ensuring the
219 * proper read lifetime.
221 next_list = ctx->list->next;
222 ctx->list->next = ctx->done;
223 ctx->done = ctx->list;
224 ctx->list = next_list;
226 /* If we have no more in our list, return EOF. */
231 /* At this point, it safe to read the next bucket - if we can. */
233 /* If the caller doesn't want ALL_AVAIL, decrement the size
234 * of the items we just read from the list.
236 if (requested != SERF_READ_ALL_AVAIL) {
239 for (i = 0; i < cur_vecs_used; i++)
240 requested -= vecs[i].iov_len;
243 /* Adjust our vecs to account for what we just read. */
244 vecs_size -= cur_vecs_used;
245 vecs += cur_vecs_used;
247 /* We reached our max. Oh well. */
248 if (!requested || !vecs_size) {
256 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
257 apr_size_t requested,
258 const char **data, apr_size_t *len)
260 aggregate_context_t *ctx = bucket->data;
265 cleanup_aggregate(ctx, bucket->allocator);
267 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
273 *data = vec.iov_base;
280 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
281 apr_size_t requested,
286 aggregate_context_t *ctx = bucket->data;
288 cleanup_aggregate(ctx, bucket->allocator);
290 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
293 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
294 int acceptable, int *found,
295 const char **data, apr_size_t *len)
297 /* Follow pattern from serf_aggregate_read. */
301 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
305 /* Follow pattern from serf_aggregate_read. */
309 static serf_bucket_t * serf_aggregate_read_bucket(
310 serf_bucket_t *bucket,
311 const serf_bucket_type_t *type)
313 aggregate_context_t *ctx = bucket->data;
314 serf_bucket_t *found_bucket;
320 if (ctx->list->bucket->type == type) {
321 /* Got the bucket. Consume it from our list. */
322 found_bucket = ctx->list->bucket;
323 ctx->list = ctx->list->next;
327 /* Call read_bucket on first one in our list. */
328 return serf_bucket_read_bucket(ctx->list->bucket, type);
331 SERF_DECLARE_DATA const serf_bucket_type_t serf_bucket_type_aggregate = {
334 serf_aggregate_readline,
335 serf_aggregate_read_iovec,
336 serf_default_read_for_sendfile,
337 serf_aggregate_read_bucket,
339 serf_aggregate_destroy_and_data,