OSDN Git Service

libgo: Update to weekly.2012-01-27.
[pf3gnuchains/gcc-fork.git] / libgo / go / net / rpc / client.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package rpc
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "io"
12         "log"
13         "net"
14         "net/http"
15         "sync"
16 )
17
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
21
22 func (e ServerError) Error() string {
23         return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28 // Call represents an active RPC.
29 type Call struct {
30         ServiceMethod string      // The name of the service and method to call.
31         Args          interface{} // The argument to the function (*struct).
32         Reply         interface{} // The reply from the function (*struct).
33         Error         error       // After completion, the error status.
34         Done          chan *Call  // Strobes when call is complete; value is the error status.
35         seq           uint64
36 }
37
38 // Client represents an RPC Client.
39 // There may be multiple outstanding Calls associated
40 // with a single Client.
41 type Client struct {
42         mutex    sync.Mutex // protects pending, seq, request
43         sending  sync.Mutex
44         request  Request
45         seq      uint64
46         codec    ClientCodec
47         pending  map[uint64]*Call
48         closing  bool
49         shutdown bool
50 }
51
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses.  The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
59 // discarded.
60 type ClientCodec interface {
61         WriteRequest(*Request, interface{}) error
62         ReadResponseHeader(*Response) error
63         ReadResponseBody(interface{}) error
64
65         Close() error
66 }
67
68 func (client *Client) send(c *Call) {
69         // Register this call.
70         client.mutex.Lock()
71         if client.shutdown {
72                 c.Error = ErrShutdown
73                 client.mutex.Unlock()
74                 c.done()
75                 return
76         }
77         c.seq = client.seq
78         client.seq++
79         client.pending[c.seq] = c
80         client.mutex.Unlock()
81
82         // Encode and send the request.
83         client.sending.Lock()
84         defer client.sending.Unlock()
85         client.request.Seq = c.seq
86         client.request.ServiceMethod = c.ServiceMethod
87         if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
88                 c.Error = err
89                 c.done()
90         }
91 }
92
93 func (client *Client) input() {
94         var err error
95         var response Response
96         for err == nil {
97                 response = Response{}
98                 err = client.codec.ReadResponseHeader(&response)
99                 if err != nil {
100                         if err == io.EOF && !client.closing {
101                                 err = io.ErrUnexpectedEOF
102                         }
103                         break
104                 }
105                 seq := response.Seq
106                 client.mutex.Lock()
107                 c := client.pending[seq]
108                 delete(client.pending, seq)
109                 client.mutex.Unlock()
110
111                 if response.Error == "" {
112                         err = client.codec.ReadResponseBody(c.Reply)
113                         if err != nil {
114                                 c.Error = errors.New("reading body " + err.Error())
115                         }
116                 } else {
117                         // We've got an error response. Give this to the request;
118                         // any subsequent requests will get the ReadResponseBody
119                         // error if there is one.
120                         c.Error = ServerError(response.Error)
121                         err = client.codec.ReadResponseBody(nil)
122                         if err != nil {
123                                 err = errors.New("reading error body: " + err.Error())
124                         }
125                 }
126                 c.done()
127         }
128         // Terminate pending calls.
129         client.mutex.Lock()
130         client.shutdown = true
131         for _, call := range client.pending {
132                 call.Error = err
133                 call.done()
134         }
135         client.mutex.Unlock()
136         if err != io.EOF || !client.closing {
137                 log.Println("rpc: client protocol error:", err)
138         }
139 }
140
141 func (call *Call) done() {
142         select {
143         case call.Done <- call:
144                 // ok
145         default:
146                 // We don't want to block here.  It is the caller's responsibility to make
147                 // sure the channel has enough buffer space. See comment in Go().
148                 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
149         }
150 }
151
152 // NewClient returns a new Client to handle requests to the
153 // set of services at the other end of the connection.
154 // It adds a buffer to the write side of the connection so
155 // the header and payload are sent as a unit.
156 func NewClient(conn io.ReadWriteCloser) *Client {
157         encBuf := bufio.NewWriter(conn)
158         client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
159         return NewClientWithCodec(client)
160 }
161
162 // NewClientWithCodec is like NewClient but uses the specified
163 // codec to encode requests and decode responses.
164 func NewClientWithCodec(codec ClientCodec) *Client {
165         client := &Client{
166                 codec:   codec,
167                 pending: make(map[uint64]*Call),
168         }
169         go client.input()
170         return client
171 }
172
173 type gobClientCodec struct {
174         rwc    io.ReadWriteCloser
175         dec    *gob.Decoder
176         enc    *gob.Encoder
177         encBuf *bufio.Writer
178 }
179
180 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
181         if err = c.enc.Encode(r); err != nil {
182                 return
183         }
184         if err = c.enc.Encode(body); err != nil {
185                 return
186         }
187         return c.encBuf.Flush()
188 }
189
190 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
191         return c.dec.Decode(r)
192 }
193
194 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
195         return c.dec.Decode(body)
196 }
197
198 func (c *gobClientCodec) Close() error {
199         return c.rwc.Close()
200 }
201
202 // DialHTTP connects to an HTTP RPC server at the specified network address
203 // listening on the default HTTP RPC path.
204 func DialHTTP(network, address string) (*Client, error) {
205         return DialHTTPPath(network, address, DefaultRPCPath)
206 }
207
208 // DialHTTPPath connects to an HTTP RPC server 
209 // at the specified network address and path.
210 func DialHTTPPath(network, address, path string) (*Client, error) {
211         var err error
212         conn, err := net.Dial(network, address)
213         if err != nil {
214                 return nil, err
215         }
216         io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
217
218         // Require successful HTTP response
219         // before switching to RPC protocol.
220         resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
221         if err == nil && resp.Status == connected {
222                 return NewClient(conn), nil
223         }
224         if err == nil {
225                 err = errors.New("unexpected HTTP response: " + resp.Status)
226         }
227         conn.Close()
228         return nil, &net.OpError{"dial-http", network + " " + address, nil, err}
229 }
230
231 // Dial connects to an RPC server at the specified network address.
232 func Dial(network, address string) (*Client, error) {
233         conn, err := net.Dial(network, address)
234         if err != nil {
235                 return nil, err
236         }
237         return NewClient(conn), nil
238 }
239
240 func (client *Client) Close() error {
241         client.mutex.Lock()
242         if client.shutdown || client.closing {
243                 client.mutex.Unlock()
244                 return ErrShutdown
245         }
246         client.closing = true
247         client.mutex.Unlock()
248         return client.codec.Close()
249 }
250
251 // Go invokes the function asynchronously.  It returns the Call structure representing
252 // the invocation.  The done channel will signal when the call is complete by returning
253 // the same Call object.  If done is nil, Go will allocate a new channel.
254 // If non-nil, done must be buffered or Go will deliberately crash.
255 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
256         call := new(Call)
257         call.ServiceMethod = serviceMethod
258         call.Args = args
259         call.Reply = reply
260         if done == nil {
261                 done = make(chan *Call, 10) // buffered.
262         } else {
263                 // If caller passes done != nil, it must arrange that
264                 // done has enough buffer for the number of simultaneous
265                 // RPCs that will be using that channel.  If the channel
266                 // is totally unbuffered, it's best not to run at all.
267                 if cap(done) == 0 {
268                         log.Panic("rpc: done channel is unbuffered")
269                 }
270         }
271         call.Done = done
272         if client.shutdown {
273                 call.Error = ErrShutdown
274                 call.done()
275                 return call
276         }
277         client.send(call)
278         return call
279 }
280
281 // Call invokes the named function, waits for it to complete, and returns its error status.
282 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
283         if client.shutdown {
284                 return ErrShutdown
285         }
286         call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
287         return call.Error
288 }