OSDN Git Service

Merge branch 'master' into trunk
[modchxj/mod_chxj.git] / src / chxj_serf.c
1 /*
2  * Copyright (C) 2005-2008 Atsushi Konno All rights reserved.
3  * Copyright (C) 2005 QSDN,Inc. All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #include "chxj_serf.h"
18 #include "mod_chxj.h"
19 #include "apr_pools.h"
20
21 typedef struct __app_ctx_t     app_ctx_t;
22 typedef struct __handler_ctx_t handler_ctx_t;
23
24 struct __app_ctx_t {
25   int                 ssl_flag;
26   serf_ssl_context_t  *ssl_ctx;
27   serf_bucket_alloc_t *bkt_alloc;
28 };
29
30 struct __handler_ctx_t {
31 #if APR_MAJOR_VERSION > 0
32   apr_uint32_t requests_outstanding;
33 #else
34   apr_atomic_t requests_outstanding;
35 #endif
36
37   serf_response_acceptor_t acceptor;
38   app_ctx_t                *acceptor_ctx;
39
40   serf_response_handler_t  handler;
41
42   const char *host;
43   const char *method;
44   const char *path;
45   const char *user_agent;
46
47   apr_status_t rv;
48   const char *reason;
49
50   char *response;
51   apr_size_t response_len;
52   char *post_data;
53   apr_size_t post_data_len;
54   apr_table_t *headers_out;
55   apr_pool_t *pool;
56   request_rec *r;
57 };
58
59 char *default_chxj_serf_get(request_rec *r, apr_pool_t *ppool, const char *url_path, int set_headers_flag, apr_size_t *response_len);
60 char *(*chxj_serf_get)(request_rec *r, apr_pool_t *ppool, const char *url_path, int set_headers_flag, apr_size_t *response_len) = default_chxj_serf_get;
61 char *default_chxj_serf_post(request_rec *r, apr_pool_t *ppool, const char *url_path, char *post_data, apr_size_t post_data_len, int set_headers_flag, apr_size_t *response_len);
62 char *(*chxj_serf_post)(request_rec *r, apr_pool_t *ppool, const char *url_path, char *post_data, apr_size_t post_data_len, int set_headers_flag, apr_size_t *response_len) = default_chxj_serf_post;
63
64
65 void
66 s_init(apr_pool_t *ppool, apr_pool_t **pool)
67 {
68   apr_pool_create(pool, ppool);
69   apr_atomic_init(*pool);
70 }
71
72
73
74 static serf_bucket_t *
75 s_connection_setup(apr_socket_t *skt, void *setup_ctx, apr_pool_t *UNUSED(pool))
76 {
77   serf_bucket_t  *c;
78   app_ctx_t      *ctx = (app_ctx_t *)setup_ctx;
79
80   c = serf_bucket_socket_create(skt, ctx->bkt_alloc);
81   if (ctx->ssl_flag) {
82     c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
83     if (!ctx->ssl_ctx) {
84       ctx->ssl_ctx = serf_bucket_ssl_decrypt_context_get(c);
85       serf_ssl_use_default_certificates(ctx->ssl_ctx);
86       serf_ssl_server_cert_callback_set(ctx->ssl_ctx, NULL, NULL);
87     }
88     return c;
89   }
90   return c;
91 }
92
93
94 static void 
95 s_connection_closed(serf_connection_t *UNUSED(conn), void *UNUSED(closed_baton), apr_status_t UNUSED(why), apr_pool_t *UNUSED(pool))
96 {
97   /* nothing */
98 }
99
100
101 static serf_bucket_t *
102 s_accept_response(serf_request_t *request, serf_bucket_t *stream, void *UNUSED(acceptor_baton), apr_pool_t *UNUSED(pool))
103 {
104     serf_bucket_alloc_t *bkt_alloc;
105     serf_bucket_t       *c;
106
107     bkt_alloc = serf_request_get_alloc(request);
108     c = serf_bucket_barrier_create(stream, bkt_alloc);
109     return serf_bucket_response_create(c, bkt_alloc);
110 }
111
112
113 static apr_status_t 
114 s_handle_response(serf_request_t *UNUSED(request), serf_bucket_t *response, void *handler_ctx, apr_pool_t *UNUSED(pool))
115 {
116   const char      *data;
117   apr_size_t      len;
118   serf_status_line sl;
119   apr_status_t     rv;
120   handler_ctx_t  *ctx = handler_ctx;
121
122   rv = serf_bucket_response_status(response, &sl);
123   if (rv != APR_SUCCESS) {
124     if (APR_STATUS_IS_EAGAIN(rv)) {
125       return rv;
126     }
127     ctx->rv = rv;
128     apr_atomic_dec32(&ctx->requests_outstanding); 
129     return rv;
130   }
131   ctx->reason = sl.reason;
132
133   while (1) {
134     rv = serf_bucket_read(response, 2048, &data, &len);
135     if (SERF_BUCKET_READ_ERROR(rv)) {
136       ctx->rv = rv;
137       apr_atomic_dec32(&ctx->requests_outstanding);
138       DBG(ctx->r, "REQ[%X] end of s_handle_response() (ERROR)", (unsigned int)(apr_size_t)ctx->r);
139       return rv;
140     }
141     if (APR_STATUS_IS_EAGAIN(rv)) {
142       /* 0 byte return if EAGAIN returned. */
143       DBG(ctx->r, "REQ[%X] end of s_handle_response() (EAGAIN) len:[%d]", (unsigned int)(apr_size_t)ctx->r, (int)len);
144       return rv;
145     }
146
147     if (len > 0) {
148       if (! ctx->response) {
149         ctx->response = apr_palloc(ctx->pool, len);
150         ctx->response[0] = 0;
151         ctx->response_len = 0;
152       }
153       else {
154         char *tmp = apr_palloc(ctx->pool, ctx->response_len);
155         memcpy(tmp, ctx->response, ctx->response_len);
156         ctx->response = apr_palloc(ctx->pool, ctx->response_len + len);
157         memcpy(ctx->response, tmp, ctx->response_len);
158       }
159       memcpy(&ctx->response[ctx->response_len], data, len);
160       ctx->response_len += len;
161       ctx->response[ctx->response_len] = 0;
162     }
163     
164     if (APR_STATUS_IS_EOF(rv)) {
165       serf_bucket_t *hdrs;
166       char *tmp_headers = "";
167       hdrs = serf_bucket_response_get_headers(response);
168       while (1) {
169         rv = serf_bucket_read(hdrs, 2048, &data, &len);
170         if (SERF_BUCKET_READ_ERROR(rv))
171           return rv;
172         tmp_headers = apr_pstrcat(ctx->pool, tmp_headers, apr_psprintf(ctx->pool , "%.*s", (unsigned int)len, data), NULL);
173         if (APR_STATUS_IS_EOF(rv)) {
174           break;
175         }
176       }
177       ctx->headers_out = apr_table_make(ctx->pool, 0);
178
179       char *pstat;
180       char *pair = NULL;
181       for (;;) {
182         pair = apr_strtok(tmp_headers, "\n", &pstat);
183         if (!pair) break;
184         tmp_headers = NULL;
185         char *key;
186         char *val;
187
188         char *tpair = apr_pstrdup(ctx->pool, pair);
189         key = tpair;
190         val = strchr(tpair, ':');
191         if (val) {
192           *val = 0;
193           val++;
194           key = qs_trim_string(ctx->pool, key);
195           val = qs_trim_string(ctx->pool, val);
196           DBG(ctx->r, "key:[%s], val:[%s]", key, val);
197           apr_table_add(ctx->headers_out, key, val);
198         }
199       }
200       ctx->rv = APR_SUCCESS;
201       apr_atomic_dec32(&ctx->requests_outstanding);
202       DBG(ctx->r, "REQ[%X] end of s_handle_response()(NORMAL)", (unsigned int)(apr_size_t)ctx->r);
203       return APR_EOF;
204     }
205
206     if (APR_STATUS_IS_EAGAIN(rv)) {
207       DBG(ctx->r, "REQ[%X] end of s_handle_response() (EAGAIN)", (unsigned int)(apr_size_t)ctx->r);
208       return rv;
209     }
210   }
211 }
212
213 static apr_status_t 
214 s_setup_request(serf_request_t           *request,
215                 void                     *setup_ctx,
216                 serf_bucket_t            **req_bkt,
217                 serf_response_acceptor_t *acceptor,
218                 void                     **acceptor_ctx,
219                 serf_response_handler_t  *handler,
220                 void                     **handler_ctx,
221                 apr_pool_t               *UNUSED(pool))
222 {
223   handler_ctx_t *ctx = setup_ctx;
224   serf_bucket_t *hdrs_bkt;
225   serf_bucket_t *body_bkt = NULL;
226   request_rec *r = ctx->r;
227   int ii;
228
229   if (ctx->post_data) {
230     body_bkt = serf_bucket_simple_create(ctx->post_data, ctx->post_data_len, NULL, NULL, serf_request_get_alloc(request));
231   }
232
233   *req_bkt = serf_bucket_request_create(ctx->method, ctx->path, body_bkt, serf_request_get_alloc(request));
234   hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
235
236
237   apr_array_header_t *headers = (apr_array_header_t*)apr_table_elts(r->headers_in);
238   apr_table_entry_t  *hentryp = (apr_table_entry_t*)headers->elts;
239   for (ii=headers->nelts-1; ii>=0; ii--) {
240     serf_bucket_headers_setc(hdrs_bkt, hentryp[ii].key, hentryp[ii].val);
241     DBG(ctx->r, "REQ[%X] REQUEST key:[%s], val:[%s]", (unsigned int)(apr_size_t)ctx->r, hentryp[ii].key, hentryp[ii].val);
242   }
243   if (ctx->post_data) {
244     serf_bucket_headers_setc(hdrs_bkt, "X-Chxj-Forward", "Done");
245     serf_bucket_headers_setc(hdrs_bkt, "X-Chxj-Content-Length", apr_psprintf(r->pool, "%" APR_SIZE_T_FMT , ctx->post_data_len));
246     DBG(ctx->r, "REQ[%X] REQUEST key:[%s], val:[%s]", (unsigned int)(apr_size_t)ctx->r, "X-Chxj-Forward", "Done");
247     DBG(ctx->r, "REQ[%X] REQUEST key:[%s], val:[%s]", (unsigned int)(apr_size_t)ctx->r, "X-Chxj-Content-Length", apr_psprintf(r->pool, "%" APR_SIZE_T_FMT, ctx->post_data_len));
248
249   }
250   DBG(ctx->r, "REQ[%X] REQUEST Content-Length:[%s]", (unsigned int)(apr_size_t)r, serf_bucket_headers_get(hdrs_bkt, "Content-Length"));
251
252   apr_atomic_inc32(&(ctx->requests_outstanding));
253   if (ctx->acceptor_ctx->ssl_flag) {
254     serf_bucket_alloc_t *req_alloc;
255     app_ctx_t *app_ctx = ctx->acceptor_ctx;
256
257     req_alloc = serf_request_get_alloc(request);
258
259     if (app_ctx->ssl_ctx == NULL) {
260       *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, NULL, app_ctx->bkt_alloc);
261       app_ctx->ssl_ctx = serf_bucket_ssl_encrypt_context_get(*req_bkt);
262     }
263     else {
264       *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, app_ctx->ssl_ctx, app_ctx->bkt_alloc);
265     }
266   }
267   *acceptor       = ctx->acceptor;
268   *acceptor_ctx   = ctx->acceptor_ctx;
269   *handler        = ctx->handler;
270   *handler_ctx    = ctx;
271
272   return APR_SUCCESS;
273 }
274
275 char *
276 default_chxj_serf_get(request_rec *r, apr_pool_t *ppool, const char *url_path, int set_headers_flag, apr_size_t *response_len)
277 {
278   apr_pool_t *pool;
279   apr_uri_t url;
280   apr_status_t rv;
281   apr_sockaddr_t *address = NULL;
282
283   serf_context_t *context;
284   serf_connection_t *connection;
285
286   app_ctx_t app_ctx;
287   handler_ctx_t handler_ctx;
288   char *ret;
289
290
291   s_init(ppool, &pool);
292
293   apr_uri_parse(pool, url_path, &url);
294   if (!url.port) {
295     url.port = apr_uri_port_of_scheme(url.scheme);
296   }
297   if (!url.port) {
298     url.port = 80;
299   }
300   if (!url.path) {
301     url.path = "/";
302   }
303   if (!url.hostname) {
304     url.hostname = "localhost";
305   }
306   if (url.query) {
307     url.path = apr_psprintf(pool, "%s?%s", url.path, url.query);
308   }
309
310   rv = apr_sockaddr_info_get(&address, url.hostname, APR_UNSPEC, url.port, 0, pool);
311   if (rv != APR_SUCCESS) {
312     char buf[256];
313     ERR(r, "apr_sockaddr_info_get() failed: rv:[%d|%s]", rv, apr_strerror(rv, buf, 256));
314     return NULL;
315   }
316   memset(&app_ctx, 0, sizeof(app_ctx_t));
317
318   app_ctx.bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
319   if (strcasecmp(url.scheme, "https") == 0) {
320     app_ctx.ssl_flag = 1;
321   }
322
323   context = serf_context_create(pool);
324   connection = serf_connection_create(context, address, s_connection_setup, &app_ctx, s_connection_closed, &app_ctx, pool);
325
326   memset(&handler_ctx, 0, sizeof(handler_ctx_t));
327   handler_ctx.requests_outstanding = 0;
328   handler_ctx.host = url.hostinfo;
329   handler_ctx.method = "GET";
330   handler_ctx.path = url.path;
331   handler_ctx.user_agent = (char *)apr_table_get(r->headers_in, "User-Agent");
332   handler_ctx.post_data = NULL;
333   handler_ctx.post_data_len = 0;
334
335   handler_ctx.acceptor     = s_accept_response;
336   handler_ctx.acceptor_ctx = &app_ctx;
337   handler_ctx.handler      = s_handle_response;
338   handler_ctx.pool         = pool;
339   handler_ctx.r            = r;
340   handler_ctx.response_len = 0;
341   handler_ctx.response     = NULL;
342
343   serf_connection_request_create(connection, s_setup_request, &handler_ctx);
344
345   while (1) {
346     rv = serf_context_run(context, SERF_DURATION_FOREVER, pool);
347     if (APR_STATUS_IS_TIMEUP(rv))
348       continue;
349     if (rv) {
350       char buf[200];
351       ERR(r, "Error running context: (%d) %s\n", rv, apr_strerror(rv, buf, sizeof(buf)));
352       break;
353     }
354     if (!apr_atomic_read32(&handler_ctx.requests_outstanding)) {
355       if (handler_ctx.rv != APR_SUCCESS) {
356         char buf[200];
357         ERR(r, "Error running context: (%d) %s\n", handler_ctx.rv, apr_strerror(handler_ctx.rv, buf, sizeof(buf)));
358       }
359       break;
360     }
361   }
362
363   serf_connection_close(connection);
364   ret = apr_pstrdup(ppool, handler_ctx.response);
365   if (set_headers_flag) {
366     r->headers_out = apr_table_copy(pool, handler_ctx.headers_out);
367     *response_len = handler_ctx.response_len;
368     char *contentType = (char *)apr_table_get(handler_ctx.headers_out, "Content-Type");
369     if (contentType) {
370       chxj_set_content_type(r, contentType);
371     }
372   }
373   return ret;
374 }
375
376
377 char *
378 default_chxj_serf_post(request_rec *r, apr_pool_t *ppool, const char *url_path, char *post_data, apr_size_t post_data_len, int set_headers_flag, apr_size_t *response_len)
379 {
380   apr_pool_t *pool;
381   apr_uri_t url;
382   apr_status_t rv;
383   apr_sockaddr_t *address = NULL;
384
385   serf_context_t *context;
386   serf_connection_t *connection;
387
388   app_ctx_t app_ctx;
389   handler_ctx_t handler_ctx;
390   char *ret;
391
392   DBG(r, "REQ:[%X] start chxj_serf_post()", (unsigned int)(apr_size_t)r);
393
394
395   s_init(ppool, &pool);
396
397   apr_uri_parse(pool, url_path, &url);
398   if (!url.port) {
399     url.port = apr_uri_port_of_scheme(url.scheme);
400   }
401   if (!url.port) {
402     url.port = 80;
403   }
404   if (!url.path) {
405     url.path = "/";
406   }
407   if (!url.hostname) {
408     url.hostname = "localhost";
409   }
410   if (url.query) {
411     url.path = apr_psprintf(pool, "%s?%s", url.path, url.query);
412   }
413
414   rv = apr_sockaddr_info_get(&address, url.hostname, APR_UNSPEC, url.port, 0, pool);
415   if (rv != APR_SUCCESS) {
416     char buf[256];
417     ERR(r, "apr_sockaddr_info_get() failed: rv:[%d|%s]", rv, apr_strerror(rv, buf, 256));
418     return NULL;
419   }
420   memset(&app_ctx, 0, sizeof(app_ctx_t));
421
422   app_ctx.bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
423   if (strcasecmp(url.scheme, "https") == 0) {
424     app_ctx.ssl_flag = 1;
425   }
426
427   context = serf_context_create(pool);
428   connection = serf_connection_create(context, address, s_connection_setup, &app_ctx, s_connection_closed, &app_ctx, pool);
429
430   memset(&handler_ctx, 0, sizeof(handler_ctx_t));
431   handler_ctx.requests_outstanding = 0;
432   handler_ctx.host = url.hostinfo;
433   handler_ctx.method = "POST";
434   handler_ctx.path = url.path;
435   handler_ctx.user_agent = (char *)apr_table_get(r->headers_in, "User-Agent");
436   handler_ctx.post_data = post_data;
437   handler_ctx.post_data_len = post_data_len;
438
439   handler_ctx.acceptor     = s_accept_response;
440   handler_ctx.acceptor_ctx = &app_ctx;
441   handler_ctx.handler      = s_handle_response;
442   handler_ctx.pool         = pool;
443   handler_ctx.r            = r;
444   handler_ctx.response_len = 0;
445   handler_ctx.response     = NULL;
446
447   serf_connection_request_create(connection, s_setup_request, &handler_ctx);
448
449   while (1) {
450     rv = serf_context_run(context, SERF_DURATION_FOREVER, pool);
451     if (APR_STATUS_IS_TIMEUP(rv))
452       continue;
453     if (rv) {
454       char buf[200];
455       ERR(r, "Error running context: (%d) %s\n", rv, apr_strerror(rv, buf, sizeof(buf)));
456       break;
457     }
458     if (!apr_atomic_read32(&handler_ctx.requests_outstanding)) {
459       if (handler_ctx.rv != APR_SUCCESS) {
460         char buf[200];
461         ERR(r, "Error running context: (%d) %s\n", handler_ctx.rv, apr_strerror(handler_ctx.rv, buf, sizeof(buf)));
462       }
463       break;
464     }
465   }
466
467   DBG(r, "end of serf request");
468   DBG(r, "response:[%s][%" APR_SIZE_T_FMT "]", handler_ctx.response, handler_ctx.response_len);
469   serf_connection_close(connection);
470   ret = apr_pstrdup(ppool, handler_ctx.response);
471   if (set_headers_flag) {
472     r->headers_out = apr_table_copy(pool, handler_ctx.headers_out);
473     *response_len = handler_ctx.response_len;
474     char *contentType = (char *)apr_table_get(handler_ctx.headers_out, "Content-Type");
475     if (contentType) {
476       DBG(r, "response content type[%s]", contentType);
477       chxj_set_content_type(r, apr_pstrdup(r->pool, contentType));
478     }
479   }
480   DBG(r, "REQ:[%X] end chxj_serf_post()", (unsigned int)(apr_size_t)r);
481   return ret;
482 }
483 /*
484  * vim:ts=2 et
485  */