1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
6 Package netchan implements type-safe networked channels:
7 it allows the two ends of a channel to appear on different
8 computers connected by a network. It does this by transporting
9 data sent to a channel on one machine so it can be recovered
10 by a receive of a channel of the same type on the other.
12 An exporter publishes a set of channels by name. An importer
13 connects to the exporting machine and imports the channels
14 by name. After importing the channels, the two machines can
15 use the channels in the usual way.
17 Networked channels are not synchronized; they always behave
18 as if they are buffered channels of at least one element.
22 // BUG: can't use range clause to receive when using ImportNValues to limit the count.
36 // expLog is a logging convenience function. The first argument must be a string.
37 func expLog(args ...interface{}) {
38 args[0] = "netchan export: " + args[0].(string)
42 // An Exporter allows a set of channels to be published on a single
43 // network port. A single machine may have multiple Exporters
44 // but they must use different ports.
45 type Exporter struct {
49 type expClient struct {
52 chans map[int]*netChan // channels in use by client
53 mu sync.Mutex // protects remaining fields
54 errored bool // client has been sent an error
55 seqNum int64 // sequences messages sent to client; has value of highest sent
56 ackNum int64 // highest sequence number acknowledged
57 seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
60 func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
61 client := new(expClient)
63 client.encDec = newEncDec(conn)
66 client.chans = make(map[int]*netChan)
70 func (client *expClient) sendError(hdr *header, err string) {
72 expLog("sending error to client:", error.Error)
73 client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
79 func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
82 ech, ok := exp.names[name]
85 client.sendError(hdr, "no such channel: "+name)
89 client.sendError(hdr, "wrong direction for channel: "+name)
92 nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
93 client.chans[hdr.Id] = nch
97 func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
98 nch := client.chans[hdr.Id]
103 client.sendError(hdr, "wrong direction for channel: "+nch.name)
108 // The function run manages sends and receives for a single client. For each
109 // (client Recv) request, this will launch a serveRecv goroutine to deliver
110 // the data for that channel, while (client Send) requests are handled as
111 // data arrives from the client.
112 func (client *expClient) run() {
114 hdrValue := reflect.ValueOf(hdr)
116 reqValue := reflect.ValueOf(req)
120 if err := client.decode(hdrValue); err != nil {
122 expLog("error decoding client header:", err)
126 switch hdr.PayloadType {
129 if err := client.decode(reqValue); err != nil {
130 expLog("error decoding client request:", err)
134 panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
138 // look up channel before calling serveRecv to
139 // avoid a lock around client.chans.
140 if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
141 go client.serveRecv(nch, *hdr, req.Count)
144 client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
145 // The actual sends will have payload type payData.
146 // TODO: manage the count?
148 error.Error = "request: can't handle channel direction"
149 expLog(error.Error, req.Dir)
150 client.encode(hdr, payError, error)
153 client.serveSend(*hdr)
155 client.serveClosed(*hdr)
158 if client.ackNum != hdr.SeqNum-1 {
159 // Since the sequence number is incremented and the message is sent
160 // in a single instance of locking client.mu, the messages are guaranteed
161 // to be sent in order. Therefore receipt of acknowledgement N means
162 // all messages <=N have been seen by the recipient. We check anyway.
163 expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
165 if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
166 client.ackNum = hdr.SeqNum
170 if nch := client.getChan(hdr, Send); nch != nil {
174 log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
177 client.exp.delClient(client)
180 // Send all the data on a single channel to a client asking for a Recv.
181 // The header is passed by value to avoid issues of overwriting.
182 func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
184 val, ok := nch.recv()
186 if err := client.encode(&hdr, payClosed, nil); err != nil {
187 expLog("error encoding server closed message:", err)
191 // We hold the lock during transmission to guarantee messages are
192 // sent in sequence number order. Also, we increment first so the
193 // value of client.SeqNum is the value of the highest used sequence
194 // number, not one beyond.
197 hdr.SeqNum = client.seqNum
198 client.seqLock.Lock() // guarantee ordering of messages
200 err := client.encode(&hdr, payData, val.Interface())
201 client.seqLock.Unlock()
203 expLog("error encoding client response:", err)
204 client.sendError(&hdr, err.Error())
207 // Negative count means run forever.
209 if count--; count <= 0 {
216 // Receive and deliver locally one item from a client asking for a Send
217 // The header is passed by value to avoid issues of overwriting.
218 func (client *expClient) serveSend(hdr header) {
219 nch := client.getChan(&hdr, Recv)
223 // Create a new value for each received item.
224 val := reflect.New(nch.ch.Type().Elem()).Elem()
225 if err := client.decode(val); err != nil {
226 expLog("value decode:", err, "; type ", nch.ch.Type())
232 // Report that client has closed the channel that is sending to us.
233 // The header is passed by value to avoid issues of overwriting.
234 func (client *expClient) serveClosed(hdr header) {
235 nch := client.getChan(&hdr, Recv)
242 func (client *expClient) unackedCount() int64 {
244 n := client.seqNum - client.ackNum
249 func (client *expClient) seq() int64 {
256 func (client *expClient) ack() int64 {
263 // Serve waits for incoming connections on the listener
264 // and serves the Exporter's channels on each.
265 // It blocks until the listener is closed.
266 func (exp *Exporter) Serve(listener net.Listener) {
268 conn, err := listener.Accept()
270 expLog("listen:", err)
273 go exp.ServeConn(conn)
277 // ServeConn exports the Exporter's channels on conn.
278 // It blocks until the connection is terminated.
279 func (exp *Exporter) ServeConn(conn io.ReadWriter) {
280 exp.addClient(conn).run()
283 // NewExporter creates a new Exporter that exports a set of channels.
284 func NewExporter() *Exporter {
286 clientSet: &clientSet{
287 names: make(map[string]*chanDir),
288 clients: make(map[unackedCounter]bool),
294 // ListenAndServe exports the exporter's channels through the
295 // given network and local address defined as in net.Listen.
296 func (exp *Exporter) ListenAndServe(network, localaddr string) error {
297 listener, err := net.Listen(network, localaddr)
301 go exp.Serve(listener)
305 // addClient creates a new expClient and records its existence
306 func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
307 client := newClient(exp, conn)
309 exp.clients[client] = true
314 // delClient forgets the client existed
315 func (exp *Exporter) delClient(client *expClient) {
317 delete(exp.clients, client)
321 // Drain waits until all messages sent from this exporter/importer, including
322 // those not yet sent to any client and possibly including those sent while
323 // Drain was executing, have been received by the importer. In short, it
324 // waits until all the exporter's messages have been received by a client.
325 // If the timeout (measured in nanoseconds) is positive and Drain takes
326 // longer than that to complete, an error is returned.
327 func (exp *Exporter) Drain(timeout int64) error {
328 // This wrapper function is here so the method's comment will appear in godoc.
329 return exp.clientSet.drain(timeout)
332 // Sync waits until all clients of the exporter have received the messages
333 // that were sent at the time Sync was invoked. Unlike Drain, it does not
334 // wait for messages sent while it is running or messages that have not been
335 // dispatched to any client. If the timeout (measured in nanoseconds) is
336 // positive and Sync takes longer than that to complete, an error is
338 func (exp *Exporter) Sync(timeout int64) error {
339 // This wrapper function is here so the method's comment will appear in godoc.
340 return exp.clientSet.sync(timeout)
343 func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
344 chanType := reflect.TypeOf(chT)
345 if chanType.Kind() != reflect.Chan {
346 return reflect.Value{}, errors.New("not a channel")
348 if dir != Send && dir != Recv {
349 return reflect.Value{}, errors.New("unknown channel direction")
351 switch chanType.ChanDir() {
352 case reflect.BothDir:
353 case reflect.SendDir:
355 return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
357 case reflect.RecvDir:
359 return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
362 return reflect.ValueOf(chT), nil
365 // Export exports a channel of a given type and specified direction. The
366 // channel to be exported is provided in the call and may be of arbitrary
368 // Despite the literal signature, the effective signature is
369 // Export(name string, chT chan T, dir Dir)
370 func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
371 ch, err := checkChan(chT, dir)
376 defer exp.mu.Unlock()
377 _, present := exp.names[name]
379 return errors.New("channel name already being exported:" + name)
381 exp.names[name] = &chanDir{ch, dir}
385 // Hangup disassociates the named channel from the Exporter and closes
386 // the channel. Messages in flight for the channel may be dropped.
387 func (exp *Exporter) Hangup(name string) error {
389 chDir, ok := exp.names[name]
391 delete(exp.names, name)
393 // TODO drop all instances of channel from client sets
396 return errors.New("netchan export: hangup: no such channel: " + name)