1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
22 func (e ServerError) Error() string {
26 var ErrShutdown = errors.New("connection is shut down")
28 // Call represents an active RPC.
30 ServiceMethod string // The name of the service and method to call.
31 Args interface{} // The argument to the function (*struct).
32 Reply interface{} // The reply from the function (*struct).
33 Error error // After completion, the error status.
34 Done chan *Call // Strobes when call is complete.
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client.
41 mutex sync.Mutex // protects pending, seq, request
46 pending map[uint64]*Call
51 // A ClientCodec implements writing of RPC requests and
52 // reading of RPC responses for the client side of an RPC session.
53 // The client calls WriteRequest to write a request to the connection
54 // and calls ReadResponseHeader and ReadResponseBody in pairs
55 // to read responses. The client calls Close when finished with the
56 // connection. ReadResponseBody may be called with a nil
57 // argument to force the body of the response to be read and then
59 type ClientCodec interface {
60 WriteRequest(*Request, interface{}) error
61 ReadResponseHeader(*Response) error
62 ReadResponseBody(interface{}) error
67 func (client *Client) send(call *Call) {
69 defer client.sending.Unlock()
71 // Register this call.
74 call.Error = ErrShutdown
81 client.pending[seq] = call
84 // Encode and send the request.
85 client.request.Seq = seq
86 client.request.ServiceMethod = call.ServiceMethod
87 err := client.codec.WriteRequest(&client.request, call.Args)
90 delete(client.pending, seq)
97 func (client *Client) input() {
101 response = Response{}
102 err = client.codec.ReadResponseHeader(&response)
104 if err == io.EOF && !client.closing {
105 err = io.ErrUnexpectedEOF
111 call := client.pending[seq]
112 delete(client.pending, seq)
113 client.mutex.Unlock()
115 if response.Error == "" {
116 err = client.codec.ReadResponseBody(call.Reply)
118 call.Error = errors.New("reading body " + err.Error())
121 // We've got an error response. Give this to the request;
122 // any subsequent requests will get the ReadResponseBody
123 // error if there is one.
124 call.Error = ServerError(response.Error)
125 err = client.codec.ReadResponseBody(nil)
127 err = errors.New("reading error body: " + err.Error())
132 // Terminate pending calls.
133 client.sending.Lock()
135 client.shutdown = true
136 closing := client.closing
137 for _, call := range client.pending {
141 client.mutex.Unlock()
142 client.sending.Unlock()
143 if err != io.EOF || !closing {
144 log.Println("rpc: client protocol error:", err)
148 func (call *Call) done() {
150 case call.Done <- call:
153 // We don't want to block here. It is the caller's responsibility to make
154 // sure the channel has enough buffer space. See comment in Go().
155 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
159 // NewClient returns a new Client to handle requests to the
160 // set of services at the other end of the connection.
161 // It adds a buffer to the write side of the connection so
162 // the header and payload are sent as a unit.
163 func NewClient(conn io.ReadWriteCloser) *Client {
164 encBuf := bufio.NewWriter(conn)
165 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
166 return NewClientWithCodec(client)
169 // NewClientWithCodec is like NewClient but uses the specified
170 // codec to encode requests and decode responses.
171 func NewClientWithCodec(codec ClientCodec) *Client {
174 pending: make(map[uint64]*Call),
180 type gobClientCodec struct {
181 rwc io.ReadWriteCloser
187 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
188 if err = c.enc.Encode(r); err != nil {
191 if err = c.enc.Encode(body); err != nil {
194 return c.encBuf.Flush()
197 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
198 return c.dec.Decode(r)
201 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
202 return c.dec.Decode(body)
205 func (c *gobClientCodec) Close() error {
209 // DialHTTP connects to an HTTP RPC server at the specified network address
210 // listening on the default HTTP RPC path.
211 func DialHTTP(network, address string) (*Client, error) {
212 return DialHTTPPath(network, address, DefaultRPCPath)
215 // DialHTTPPath connects to an HTTP RPC server
216 // at the specified network address and path.
217 func DialHTTPPath(network, address, path string) (*Client, error) {
219 conn, err := net.Dial(network, address)
223 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
225 // Require successful HTTP response
226 // before switching to RPC protocol.
227 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
228 if err == nil && resp.Status == connected {
229 return NewClient(conn), nil
232 err = errors.New("unexpected HTTP response: " + resp.Status)
235 return nil, &net.OpError{
237 Net: network + " " + address,
243 // Dial connects to an RPC server at the specified network address.
244 func Dial(network, address string) (*Client, error) {
245 conn, err := net.Dial(network, address)
249 return NewClient(conn), nil
252 func (client *Client) Close() error {
254 if client.shutdown || client.closing {
255 client.mutex.Unlock()
258 client.closing = true
259 client.mutex.Unlock()
260 return client.codec.Close()
263 // Go invokes the function asynchronously. It returns the Call structure representing
264 // the invocation. The done channel will signal when the call is complete by returning
265 // the same Call object. If done is nil, Go will allocate a new channel.
266 // If non-nil, done must be buffered or Go will deliberately crash.
267 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
269 call.ServiceMethod = serviceMethod
273 done = make(chan *Call, 10) // buffered.
275 // If caller passes done != nil, it must arrange that
276 // done has enough buffer for the number of simultaneous
277 // RPCs that will be using that channel. If the channel
278 // is totally unbuffered, it's best not to run at all.
280 log.Panic("rpc: done channel is unbuffered")
288 // Call invokes the named function, waits for it to complete, and returns its error status.
289 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
290 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done