OSDN Git Service

Update to current Go library.
[pf3gnuchains/gcc-fork.git] / libgo / go / rpc / client.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package rpc
6
7 import (
8         "bufio"
9         "gob"
10         "http"
11         "io"
12         "log"
13         "net"
14         "os"
15         "sync"
16 )
17
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
21
22 func (e ServerError) String() string {
23         return string(e)
24 }
25
26 const ErrShutdown = os.ErrorString("connection is shut down")
27
28 // Call represents an active RPC.
29 type Call struct {
30         ServiceMethod string      // The name of the service and method to call.
31         Args          interface{} // The argument to the function (*struct).
32         Reply         interface{} // The reply from the function (*struct).
33         Error         os.Error    // After completion, the error status.
34         Done          chan *Call  // Strobes when call is complete; value is the error status.
35         seq           uint64
36 }
37
38 // Client represents an RPC Client.
39 // There may be multiple outstanding Calls associated
40 // with a single Client.
41 type Client struct {
42         mutex    sync.Mutex // protects pending, seq, request
43         sending  sync.Mutex
44         request  Request
45         seq      uint64
46         codec    ClientCodec
47         pending  map[uint64]*Call
48         closing  bool
49         shutdown bool
50 }
51
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses.  The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
59 // discarded.
60 type ClientCodec interface {
61         WriteRequest(*Request, interface{}) os.Error
62         ReadResponseHeader(*Response) os.Error
63         ReadResponseBody(interface{}) os.Error
64
65         Close() os.Error
66 }
67
68 func (client *Client) send(c *Call) {
69         // Register this call.
70         client.mutex.Lock()
71         if client.shutdown {
72                 c.Error = ErrShutdown
73                 client.mutex.Unlock()
74                 c.done()
75                 return
76         }
77         c.seq = client.seq
78         client.seq++
79         client.pending[c.seq] = c
80         client.mutex.Unlock()
81
82         // Encode and send the request.
83         client.sending.Lock()
84         defer client.sending.Unlock()
85         client.request.Seq = c.seq
86         client.request.ServiceMethod = c.ServiceMethod
87         if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
88                 panic("rpc: client encode error: " + err.String())
89         }
90 }
91
92 func (client *Client) input() {
93         var err os.Error
94         var response Response
95         for err == nil {
96                 response = Response{}
97                 err = client.codec.ReadResponseHeader(&response)
98                 if err != nil {
99                         if err == os.EOF && !client.closing {
100                                 err = io.ErrUnexpectedEOF
101                         }
102                         break
103                 }
104                 seq := response.Seq
105                 client.mutex.Lock()
106                 c := client.pending[seq]
107                 client.pending[seq] = c, false
108                 client.mutex.Unlock()
109
110                 if response.Error == "" {
111                         err = client.codec.ReadResponseBody(c.Reply)
112                         if err != nil {
113                                 c.Error = os.ErrorString("reading body " + err.String())
114                         }
115                 } else {
116                         // We've got an error response. Give this to the request;
117                         // any subsequent requests will get the ReadResponseBody
118                         // error if there is one.
119                         c.Error = ServerError(response.Error)
120                         err = client.codec.ReadResponseBody(nil)
121                         if err != nil {
122                                 err = os.ErrorString("reading error body: " + err.String())
123                         }
124                 }
125                 c.done()
126         }
127         // Terminate pending calls.
128         client.mutex.Lock()
129         client.shutdown = true
130         for _, call := range client.pending {
131                 call.Error = err
132                 call.done()
133         }
134         client.mutex.Unlock()
135         if err != os.EOF || !client.closing {
136                 log.Println("rpc: client protocol error:", err)
137         }
138 }
139
140 func (call *Call) done() {
141         select {
142         case call.Done <- call:
143                 // ok
144         default:
145                 // We don't want to block here.  It is the caller's responsibility to make
146                 // sure the channel has enough buffer space. See comment in Go().
147         }
148 }
149
150 // NewClient returns a new Client to handle requests to the
151 // set of services at the other end of the connection.
152 // It adds a buffer to the write side of the connection so
153 // the header and payload are sent as a unit.
154 func NewClient(conn io.ReadWriteCloser) *Client {
155         encBuf := bufio.NewWriter(conn)
156         client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
157         return NewClientWithCodec(client)
158 }
159
160 // NewClientWithCodec is like NewClient but uses the specified
161 // codec to encode requests and decode responses.
162 func NewClientWithCodec(codec ClientCodec) *Client {
163         client := &Client{
164                 codec:   codec,
165                 pending: make(map[uint64]*Call),
166         }
167         go client.input()
168         return client
169 }
170
171 type gobClientCodec struct {
172         rwc    io.ReadWriteCloser
173         dec    *gob.Decoder
174         enc    *gob.Encoder
175         encBuf *bufio.Writer
176 }
177
178 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err os.Error) {
179         if err = c.enc.Encode(r); err != nil {
180                 return
181         }
182         if err = c.enc.Encode(body); err != nil {
183                 return
184         }
185         return c.encBuf.Flush()
186 }
187
188 func (c *gobClientCodec) ReadResponseHeader(r *Response) os.Error {
189         return c.dec.Decode(r)
190 }
191
192 func (c *gobClientCodec) ReadResponseBody(body interface{}) os.Error {
193         return c.dec.Decode(body)
194 }
195
196 func (c *gobClientCodec) Close() os.Error {
197         return c.rwc.Close()
198 }
199
200
201 // DialHTTP connects to an HTTP RPC server at the specified network address
202 // listening on the default HTTP RPC path.
203 func DialHTTP(network, address string) (*Client, os.Error) {
204         return DialHTTPPath(network, address, DefaultRPCPath)
205 }
206
207 // DialHTTPPath connects to an HTTP RPC server 
208 // at the specified network address and path.
209 func DialHTTPPath(network, address, path string) (*Client, os.Error) {
210         var err os.Error
211         conn, err := net.Dial(network, address)
212         if err != nil {
213                 return nil, err
214         }
215         io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
216
217         // Require successful HTTP response
218         // before switching to RPC protocol.
219         resp, err := http.ReadResponse(bufio.NewReader(conn), "CONNECT")
220         if err == nil && resp.Status == connected {
221                 return NewClient(conn), nil
222         }
223         if err == nil {
224                 err = os.ErrorString("unexpected HTTP response: " + resp.Status)
225         }
226         conn.Close()
227         return nil, &net.OpError{"dial-http", network + " " + address, nil, err}
228 }
229
230 // Dial connects to an RPC server at the specified network address.
231 func Dial(network, address string) (*Client, os.Error) {
232         conn, err := net.Dial(network, address)
233         if err != nil {
234                 return nil, err
235         }
236         return NewClient(conn), nil
237 }
238
239 func (client *Client) Close() os.Error {
240         client.mutex.Lock()
241         if client.shutdown || client.closing {
242                 client.mutex.Unlock()
243                 return ErrShutdown
244         }
245         client.closing = true
246         client.mutex.Unlock()
247         return client.codec.Close()
248 }
249
250 // Go invokes the function asynchronously.  It returns the Call structure representing
251 // the invocation.  The done channel will signal when the call is complete by returning
252 // the same Call object.  If done is nil, Go will allocate a new channel.
253 // If non-nil, done must be buffered or Go will deliberately crash.
254 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
255         c := new(Call)
256         c.ServiceMethod = serviceMethod
257         c.Args = args
258         c.Reply = reply
259         if done == nil {
260                 done = make(chan *Call, 10) // buffered.
261         } else {
262                 // If caller passes done != nil, it must arrange that
263                 // done has enough buffer for the number of simultaneous
264                 // RPCs that will be using that channel.  If the channel
265                 // is totally unbuffered, it's best not to run at all.
266                 if cap(done) == 0 {
267                         log.Panic("rpc: done channel is unbuffered")
268                 }
269         }
270         c.Done = done
271         if client.shutdown {
272                 c.Error = ErrShutdown
273                 c.done()
274                 return c
275         }
276         client.send(c)
277         return c
278 }
279
280 // Call invokes the named function, waits for it to complete, and returns its error status.
281 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error {
282         if client.shutdown {
283                 return ErrShutdown
284         }
285         call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
286         return call.Error
287 }