OSDN Git Service

* Added serf library.
[modchxj/mod_chxj.git] / src / serf / buckets / aggregate_buckets.c
1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "serf.h"
17 #include "serf_bucket_util.h"
18
19
20 /* Should be an APR_RING? */
21 typedef struct bucket_list {
22     serf_bucket_t *bucket;
23     struct bucket_list *next;
24 } bucket_list_t;
25
26 typedef struct {
27     bucket_list_t *list; /* active buckets */
28     bucket_list_t *done; /* we finished reading this; now pending a destroy */
29 } aggregate_context_t;
30
31
32 static void cleanup_aggregate(aggregate_context_t *ctx,
33                               serf_bucket_alloc_t *allocator)
34 {
35     bucket_list_t *next_list;
36
37     /* If we finished reading a bucket during the previous read, then
38      * we can now toss that bucket.
39      */
40     while (ctx->done != NULL) {
41         next_list = ctx->done->next;
42
43         serf_bucket_destroy(ctx->done->bucket);
44         serf_bucket_mem_free(allocator, ctx->done);
45
46         ctx->done = next_list;
47     }
48 }
49
50 SERF_DECLARE(serf_bucket_t *) serf_bucket_aggregate_create(
51     serf_bucket_alloc_t *allocator)
52 {
53     aggregate_context_t *ctx;
54
55     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
56     ctx->list = NULL;
57     ctx->done = NULL;
58
59     return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
60 }
61
62 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
63 {
64     aggregate_context_t *ctx = bucket->data;
65     bucket_list_t *next_ctx;
66
67     while (ctx->list) {
68         serf_bucket_destroy(ctx->list->bucket);
69         next_ctx = ctx->list->next;
70         serf_bucket_mem_free(bucket->allocator, ctx->list);
71         ctx->list = next_ctx;
72     }
73     cleanup_aggregate(ctx, bucket->allocator);
74
75     serf_default_destroy_and_data(bucket);
76 }
77
78 SERF_DECLARE(void) serf_bucket_aggregate_become(serf_bucket_t *bucket)
79 {
80     aggregate_context_t *ctx;
81
82     ctx = serf_bucket_mem_alloc(bucket->allocator, sizeof(*ctx));
83     ctx->list = NULL;
84     ctx->done = NULL;
85
86     bucket->type = &serf_bucket_type_aggregate;
87     bucket->data = ctx;
88
89     /* The allocator remains the same. */
90 }
91
92
93 SERF_DECLARE(void) serf_bucket_aggregate_prepend(
94     serf_bucket_t *aggregate_bucket,
95     serf_bucket_t *prepend_bucket)
96 {
97     aggregate_context_t *ctx = aggregate_bucket->data;
98     bucket_list_t *new_list;
99
100     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
101                                      sizeof(*new_list));
102     new_list->bucket = prepend_bucket;
103     new_list->next = ctx->list;
104
105     ctx->list = new_list;
106 }
107
108 SERF_DECLARE(void) serf_bucket_aggregate_append(
109     serf_bucket_t *aggregate_bucket,
110     serf_bucket_t *append_bucket)
111 {
112     aggregate_context_t *ctx = aggregate_bucket->data;
113     bucket_list_t *new_list;
114
115     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
116                                      sizeof(*new_list));
117     new_list->bucket = append_bucket;
118     new_list->next = NULL;
119
120     /* If we use APR_RING, this is trivial.  So, wait. 
121     new_list->next = ctx->list;
122     ctx->list = new_list;
123     */
124     if (ctx->list == NULL) {
125         ctx->list = new_list;
126     }
127     else {
128         bucket_list_t *scan = ctx->list;
129
130         while (scan->next != NULL)
131             scan = scan->next;
132         scan->next = new_list;
133     }
134 }
135
136 SERF_DECLARE(void) serf_bucket_aggregate_prepend_iovec(
137     serf_bucket_t *aggregate_bucket,
138     struct iovec *vecs,
139     int vecs_count)
140 {
141     int i;
142
143     /* Add in reverse order. */
144     for (i = vecs_count - 1; i > 0; i--) {
145         serf_bucket_t *new_bucket;
146
147         new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
148                                                vecs[i].iov_len,
149                                                NULL, NULL,
150                                                aggregate_bucket->allocator);
151
152         serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
153
154     }
155 }
156
157 SERF_DECLARE(void) serf_bucket_aggregate_append_iovec(
158     serf_bucket_t *aggregate_bucket,
159     struct iovec *vecs,
160     int vecs_count)
161 {
162     int i;
163
164     for (i = 0; i < vecs_count; i++) {
165         serf_bucket_t *new_bucket;
166
167         new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
168                                                vecs[i].iov_len,
169                                                NULL, NULL,
170                                                aggregate_bucket->allocator);
171
172         serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
173
174     }
175 }
176
177 static apr_status_t read_aggregate(serf_bucket_t *bucket,
178                                    apr_size_t requested,
179                                    int vecs_size, struct iovec *vecs,
180                                    int *vecs_used)
181 {
182     aggregate_context_t *ctx = bucket->data;
183     int cur_vecs_used;
184
185     *vecs_used = 0;
186
187     if (!ctx->list) {
188         return APR_EOF;
189     }
190
191     while (1) {
192         serf_bucket_t *head = ctx->list->bucket;
193         apr_status_t status;
194
195         status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
196                                         &cur_vecs_used);
197
198         if (SERF_BUCKET_READ_ERROR(status))
199             return status;
200
201         /* Add the number of vecs we read to our running total. */
202         *vecs_used += cur_vecs_used;
203
204         if (cur_vecs_used > 0 || status) {
205             bucket_list_t *next_list;
206
207             /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
208              * as it isn't safe to read more without returning to our caller.
209              */
210             if (!status || APR_STATUS_IS_EAGAIN(status)) {
211                 return status;
212             }
213
214             /* However, if we read EOF, we can stash this bucket in a
215              * to-be-freed list and move on to the next bucket.  This ensures
216              * that the bucket stays alive (so as not to violate our read
217              * semantics).  We'll destroy this list of buckets the next time
218              * we are asked to perform a read operation - thus ensuring the
219              * proper read lifetime.
220              */
221             next_list = ctx->list->next;
222             ctx->list->next = ctx->done;
223             ctx->done = ctx->list;
224             ctx->list = next_list;
225
226             /* If we have no more in our list, return EOF. */
227             if (!ctx->list) {
228                 return status;
229             }
230
231             /* At this point, it safe to read the next bucket - if we can. */
232
233             /* If the caller doesn't want ALL_AVAIL, decrement the size
234              * of the items we just read from the list.
235              */
236             if (requested != SERF_READ_ALL_AVAIL) {
237                 int i;
238
239                 for (i = 0; i < cur_vecs_used; i++)
240                     requested -= vecs[i].iov_len;
241             }
242
243             /* Adjust our vecs to account for what we just read. */
244             vecs_size -= cur_vecs_used;
245             vecs += cur_vecs_used;
246
247             /* We reached our max.  Oh well. */
248             if (!requested || !vecs_size) {
249                 return APR_SUCCESS;
250             }
251         }
252     }
253     /* NOTREACHED */
254 }
255
256 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
257                                         apr_size_t requested,
258                                         const char **data, apr_size_t *len)
259 {
260     aggregate_context_t *ctx = bucket->data;
261     struct iovec vec;
262     int vecs_used;
263     apr_status_t status;
264
265     cleanup_aggregate(ctx, bucket->allocator);
266
267     status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
268
269     if (!vecs_used) {
270         *len = 0;
271     }
272     else {
273         *data = vec.iov_base;
274         *len = vec.iov_len;
275     }
276
277     return status;
278 }
279
280 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
281                                               apr_size_t requested,
282                                               int vecs_size,
283                                               struct iovec *vecs,
284                                               int *vecs_used)
285 {
286     aggregate_context_t *ctx = bucket->data;
287
288     cleanup_aggregate(ctx, bucket->allocator);
289
290     return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
291 }
292
293 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
294                                             int acceptable, int *found,
295                                             const char **data, apr_size_t *len)
296 {
297     /* Follow pattern from serf_aggregate_read. */
298     return APR_ENOTIMPL;
299 }
300
301 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
302                                         const char **data,
303                                         apr_size_t *len)
304 {
305     /* Follow pattern from serf_aggregate_read. */
306     return APR_ENOTIMPL;
307 }
308
309 static serf_bucket_t * serf_aggregate_read_bucket(
310     serf_bucket_t *bucket,
311     const serf_bucket_type_t *type)
312 {
313     aggregate_context_t *ctx = bucket->data;
314     serf_bucket_t *found_bucket;
315
316     if (!ctx->list) {
317         return NULL;
318     }
319
320     if (ctx->list->bucket->type == type) {
321         /* Got the bucket. Consume it from our list. */
322         found_bucket = ctx->list->bucket;
323         ctx->list = ctx->list->next;
324         return found_bucket;
325     }
326
327     /* Call read_bucket on first one in our list. */
328     return serf_bucket_read_bucket(ctx->list->bucket, type);
329 }
330
331 SERF_DECLARE_DATA const serf_bucket_type_t serf_bucket_type_aggregate = {
332     "AGGREGATE",
333     serf_aggregate_read,
334     serf_aggregate_readline,
335     serf_aggregate_read_iovec,
336     serf_default_read_for_sendfile,
337     serf_aggregate_read_bucket,
338     serf_aggregate_peek,
339     serf_aggregate_destroy_and_data,
340 };