1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
22 func (e ServerError) Error() string {
26 var ErrShutdown = errors.New("connection is shut down")
28 // Call represents an active RPC.
30 ServiceMethod string // The name of the service and method to call.
31 Args interface{} // The argument to the function (*struct).
32 Reply interface{} // The reply from the function (*struct).
33 Error error // After completion, the error status.
34 Done chan *Call // Strobes when call is complete; value is the error status.
38 // Client represents an RPC Client.
39 // There may be multiple outstanding Calls associated
40 // with a single Client.
42 mutex sync.Mutex // protects pending, seq, request
47 pending map[uint64]*Call
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses. The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
60 type ClientCodec interface {
61 WriteRequest(*Request, interface{}) error
62 ReadResponseHeader(*Response) error
63 ReadResponseBody(interface{}) error
68 func (client *Client) send(c *Call) {
69 // Register this call.
79 client.pending[c.seq] = c
82 // Encode and send the request.
84 defer client.sending.Unlock()
85 client.request.Seq = c.seq
86 client.request.ServiceMethod = c.ServiceMethod
87 if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
93 func (client *Client) input() {
98 err = client.codec.ReadResponseHeader(&response)
100 if err == io.EOF && !client.closing {
101 err = io.ErrUnexpectedEOF
107 c := client.pending[seq]
108 delete(client.pending, seq)
109 client.mutex.Unlock()
111 if response.Error == "" {
112 err = client.codec.ReadResponseBody(c.Reply)
114 c.Error = errors.New("reading body " + err.Error())
117 // We've got an error response. Give this to the request;
118 // any subsequent requests will get the ReadResponseBody
119 // error if there is one.
120 c.Error = ServerError(response.Error)
121 err = client.codec.ReadResponseBody(nil)
123 err = errors.New("reading error body: " + err.Error())
128 // Terminate pending calls.
130 client.shutdown = true
131 for _, call := range client.pending {
135 client.mutex.Unlock()
136 if err != io.EOF || !client.closing {
137 log.Println("rpc: client protocol error:", err)
141 func (call *Call) done() {
143 case call.Done <- call:
146 // We don't want to block here. It is the caller's responsibility to make
147 // sure the channel has enough buffer space. See comment in Go().
148 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
152 // NewClient returns a new Client to handle requests to the
153 // set of services at the other end of the connection.
154 // It adds a buffer to the write side of the connection so
155 // the header and payload are sent as a unit.
156 func NewClient(conn io.ReadWriteCloser) *Client {
157 encBuf := bufio.NewWriter(conn)
158 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
159 return NewClientWithCodec(client)
162 // NewClientWithCodec is like NewClient but uses the specified
163 // codec to encode requests and decode responses.
164 func NewClientWithCodec(codec ClientCodec) *Client {
167 pending: make(map[uint64]*Call),
173 type gobClientCodec struct {
174 rwc io.ReadWriteCloser
180 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
181 if err = c.enc.Encode(r); err != nil {
184 if err = c.enc.Encode(body); err != nil {
187 return c.encBuf.Flush()
190 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
191 return c.dec.Decode(r)
194 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
195 return c.dec.Decode(body)
198 func (c *gobClientCodec) Close() error {
202 // DialHTTP connects to an HTTP RPC server at the specified network address
203 // listening on the default HTTP RPC path.
204 func DialHTTP(network, address string) (*Client, error) {
205 return DialHTTPPath(network, address, DefaultRPCPath)
208 // DialHTTPPath connects to an HTTP RPC server
209 // at the specified network address and path.
210 func DialHTTPPath(network, address, path string) (*Client, error) {
212 conn, err := net.Dial(network, address)
216 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
218 // Require successful HTTP response
219 // before switching to RPC protocol.
220 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
221 if err == nil && resp.Status == connected {
222 return NewClient(conn), nil
225 err = errors.New("unexpected HTTP response: " + resp.Status)
228 return nil, &net.OpError{"dial-http", network + " " + address, nil, err}
231 // Dial connects to an RPC server at the specified network address.
232 func Dial(network, address string) (*Client, error) {
233 conn, err := net.Dial(network, address)
237 return NewClient(conn), nil
240 func (client *Client) Close() error {
242 if client.shutdown || client.closing {
243 client.mutex.Unlock()
246 client.closing = true
247 client.mutex.Unlock()
248 return client.codec.Close()
251 // Go invokes the function asynchronously. It returns the Call structure representing
252 // the invocation. The done channel will signal when the call is complete by returning
253 // the same Call object. If done is nil, Go will allocate a new channel.
254 // If non-nil, done must be buffered or Go will deliberately crash.
255 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
257 call.ServiceMethod = serviceMethod
261 done = make(chan *Call, 10) // buffered.
263 // If caller passes done != nil, it must arrange that
264 // done has enough buffer for the number of simultaneous
265 // RPCs that will be using that channel. If the channel
266 // is totally unbuffered, it's best not to run at all.
268 log.Panic("rpc: done channel is unbuffered")
273 call.Error = ErrShutdown
281 // Call invokes the named function, waits for it to complete, and returns its error status.
282 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
286 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done