OSDN Git Service

libgo: Update to weekly.2012-02-07.
[pf3gnuchains/gcc-fork.git] / libgo / go / net / rpc / client.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package rpc
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "io"
12         "log"
13         "net"
14         "net/http"
15         "sync"
16 )
17
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
21
22 func (e ServerError) Error() string {
23         return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28 // Call represents an active RPC.
29 type Call struct {
30         ServiceMethod string      // The name of the service and method to call.
31         Args          interface{} // The argument to the function (*struct).
32         Reply         interface{} // The reply from the function (*struct).
33         Error         error       // After completion, the error status.
34         Done          chan *Call  // Strobes when call is complete.
35 }
36
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client.
40 type Client struct {
41         mutex    sync.Mutex // protects pending, seq, request
42         sending  sync.Mutex
43         request  Request
44         seq      uint64
45         codec    ClientCodec
46         pending  map[uint64]*Call
47         closing  bool
48         shutdown bool
49 }
50
51 // A ClientCodec implements writing of RPC requests and
52 // reading of RPC responses for the client side of an RPC session.
53 // The client calls WriteRequest to write a request to the connection
54 // and calls ReadResponseHeader and ReadResponseBody in pairs
55 // to read responses.  The client calls Close when finished with the
56 // connection. ReadResponseBody may be called with a nil
57 // argument to force the body of the response to be read and then
58 // discarded.
59 type ClientCodec interface {
60         WriteRequest(*Request, interface{}) error
61         ReadResponseHeader(*Response) error
62         ReadResponseBody(interface{}) error
63
64         Close() error
65 }
66
67 func (client *Client) send(call *Call) {
68         client.sending.Lock()
69         defer client.sending.Unlock()
70
71         // Register this call.
72         client.mutex.Lock()
73         if client.shutdown {
74                 call.Error = ErrShutdown
75                 client.mutex.Unlock()
76                 call.done()
77                 return
78         }
79         seq := client.seq
80         client.seq++
81         client.pending[seq] = call
82         client.mutex.Unlock()
83
84         // Encode and send the request.
85         client.request.Seq = seq
86         client.request.ServiceMethod = call.ServiceMethod
87         err := client.codec.WriteRequest(&client.request, call.Args)
88         if err != nil {
89                 client.mutex.Lock()
90                 delete(client.pending, seq)
91                 client.mutex.Unlock()
92                 call.Error = err
93                 call.done()
94         }
95 }
96
97 func (client *Client) input() {
98         var err error
99         var response Response
100         for err == nil {
101                 response = Response{}
102                 err = client.codec.ReadResponseHeader(&response)
103                 if err != nil {
104                         if err == io.EOF && !client.closing {
105                                 err = io.ErrUnexpectedEOF
106                         }
107                         break
108                 }
109                 seq := response.Seq
110                 client.mutex.Lock()
111                 call := client.pending[seq]
112                 delete(client.pending, seq)
113                 client.mutex.Unlock()
114
115                 if response.Error == "" {
116                         err = client.codec.ReadResponseBody(call.Reply)
117                         if err != nil {
118                                 call.Error = errors.New("reading body " + err.Error())
119                         }
120                 } else {
121                         // We've got an error response. Give this to the request;
122                         // any subsequent requests will get the ReadResponseBody
123                         // error if there is one.
124                         call.Error = ServerError(response.Error)
125                         err = client.codec.ReadResponseBody(nil)
126                         if err != nil {
127                                 err = errors.New("reading error body: " + err.Error())
128                         }
129                 }
130                 call.done()
131         }
132         // Terminate pending calls.
133         client.sending.Lock()
134         client.mutex.Lock()
135         client.shutdown = true
136         closing := client.closing
137         for _, call := range client.pending {
138                 call.Error = err
139                 call.done()
140         }
141         client.mutex.Unlock()
142         client.sending.Unlock()
143         if err != io.EOF || !closing {
144                 log.Println("rpc: client protocol error:", err)
145         }
146 }
147
148 func (call *Call) done() {
149         select {
150         case call.Done <- call:
151                 // ok
152         default:
153                 // We don't want to block here.  It is the caller's responsibility to make
154                 // sure the channel has enough buffer space. See comment in Go().
155                 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
156         }
157 }
158
159 // NewClient returns a new Client to handle requests to the
160 // set of services at the other end of the connection.
161 // It adds a buffer to the write side of the connection so
162 // the header and payload are sent as a unit.
163 func NewClient(conn io.ReadWriteCloser) *Client {
164         encBuf := bufio.NewWriter(conn)
165         client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
166         return NewClientWithCodec(client)
167 }
168
169 // NewClientWithCodec is like NewClient but uses the specified
170 // codec to encode requests and decode responses.
171 func NewClientWithCodec(codec ClientCodec) *Client {
172         client := &Client{
173                 codec:   codec,
174                 pending: make(map[uint64]*Call),
175         }
176         go client.input()
177         return client
178 }
179
180 type gobClientCodec struct {
181         rwc    io.ReadWriteCloser
182         dec    *gob.Decoder
183         enc    *gob.Encoder
184         encBuf *bufio.Writer
185 }
186
187 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
188         if err = c.enc.Encode(r); err != nil {
189                 return
190         }
191         if err = c.enc.Encode(body); err != nil {
192                 return
193         }
194         return c.encBuf.Flush()
195 }
196
197 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
198         return c.dec.Decode(r)
199 }
200
201 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
202         return c.dec.Decode(body)
203 }
204
205 func (c *gobClientCodec) Close() error {
206         return c.rwc.Close()
207 }
208
209 // DialHTTP connects to an HTTP RPC server at the specified network address
210 // listening on the default HTTP RPC path.
211 func DialHTTP(network, address string) (*Client, error) {
212         return DialHTTPPath(network, address, DefaultRPCPath)
213 }
214
215 // DialHTTPPath connects to an HTTP RPC server 
216 // at the specified network address and path.
217 func DialHTTPPath(network, address, path string) (*Client, error) {
218         var err error
219         conn, err := net.Dial(network, address)
220         if err != nil {
221                 return nil, err
222         }
223         io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
224
225         // Require successful HTTP response
226         // before switching to RPC protocol.
227         resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
228         if err == nil && resp.Status == connected {
229                 return NewClient(conn), nil
230         }
231         if err == nil {
232                 err = errors.New("unexpected HTTP response: " + resp.Status)
233         }
234         conn.Close()
235         return nil, &net.OpError{
236                 Op:   "dial-http",
237                 Net:  network + " " + address,
238                 Addr: nil,
239                 Err:  err,
240         }
241 }
242
243 // Dial connects to an RPC server at the specified network address.
244 func Dial(network, address string) (*Client, error) {
245         conn, err := net.Dial(network, address)
246         if err != nil {
247                 return nil, err
248         }
249         return NewClient(conn), nil
250 }
251
252 func (client *Client) Close() error {
253         client.mutex.Lock()
254         if client.shutdown || client.closing {
255                 client.mutex.Unlock()
256                 return ErrShutdown
257         }
258         client.closing = true
259         client.mutex.Unlock()
260         return client.codec.Close()
261 }
262
263 // Go invokes the function asynchronously.  It returns the Call structure representing
264 // the invocation.  The done channel will signal when the call is complete by returning
265 // the same Call object.  If done is nil, Go will allocate a new channel.
266 // If non-nil, done must be buffered or Go will deliberately crash.
267 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
268         call := new(Call)
269         call.ServiceMethod = serviceMethod
270         call.Args = args
271         call.Reply = reply
272         if done == nil {
273                 done = make(chan *Call, 10) // buffered.
274         } else {
275                 // If caller passes done != nil, it must arrange that
276                 // done has enough buffer for the number of simultaneous
277                 // RPCs that will be using that channel.  If the channel
278                 // is totally unbuffered, it's best not to run at all.
279                 if cap(done) == 0 {
280                         log.Panic("rpc: done channel is unbuffered")
281                 }
282         }
283         call.Done = done
284         client.send(call)
285         return call
286 }
287
288 // Call invokes the named function, waits for it to complete, and returns its error status.
289 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
290         call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
291         return call.Error
292 }