OSDN Git Service

libgo: Update to weekly 2011-11-09.
[pf3gnuchains/gcc-fork.git] / libgo / go / old / netchan / export.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 /*
6         Package netchan implements type-safe networked channels:
7         it allows the two ends of a channel to appear on different
8         computers connected by a network.  It does this by transporting
9         data sent to a channel on one machine so it can be recovered
10         by a receive of a channel of the same type on the other.
11
12         An exporter publishes a set of channels by name.  An importer
13         connects to the exporting machine and imports the channels
14         by name. After importing the channels, the two machines can
15         use the channels in the usual way.
16
17         Networked channels are not synchronized; they always behave
18         as if they are buffered channels of at least one element.
19 */
20 package netchan
21
22 // BUG: can't use range clause to receive when using ImportNValues to limit the count.
23
24 import (
25         "errors"
26         "io"
27         "log"
28         "net"
29         "reflect"
30         "strconv"
31         "sync"
32 )
33
34 // Export
35
36 // expLog is a logging convenience function.  The first argument must be a string.
37 func expLog(args ...interface{}) {
38         args[0] = "netchan export: " + args[0].(string)
39         log.Print(args...)
40 }
41
42 // An Exporter allows a set of channels to be published on a single
43 // network port.  A single machine may have multiple Exporters
44 // but they must use different ports.
45 type Exporter struct {
46         *clientSet
47 }
48
49 type expClient struct {
50         *encDec
51         exp     *Exporter
52         chans   map[int]*netChan // channels in use by client
53         mu      sync.Mutex       // protects remaining fields
54         errored bool             // client has been sent an error
55         seqNum  int64            // sequences messages sent to client; has value of highest sent
56         ackNum  int64            // highest sequence number acknowledged
57         seqLock sync.Mutex       // guarantees messages are in sequence, only locked under mu
58 }
59
60 func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
61         client := new(expClient)
62         client.exp = exp
63         client.encDec = newEncDec(conn)
64         client.seqNum = 0
65         client.ackNum = 0
66         client.chans = make(map[int]*netChan)
67         return client
68 }
69
70 func (client *expClient) sendError(hdr *header, err string) {
71         error := &error_{err}
72         expLog("sending error to client:", error.Error)
73         client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
74         client.mu.Lock()
75         client.errored = true
76         client.mu.Unlock()
77 }
78
79 func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
80         exp := client.exp
81         exp.mu.Lock()
82         ech, ok := exp.names[name]
83         exp.mu.Unlock()
84         if !ok {
85                 client.sendError(hdr, "no such channel: "+name)
86                 return nil
87         }
88         if ech.dir != dir {
89                 client.sendError(hdr, "wrong direction for channel: "+name)
90                 return nil
91         }
92         nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
93         client.chans[hdr.Id] = nch
94         return nch
95 }
96
97 func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
98         nch := client.chans[hdr.Id]
99         if nch == nil {
100                 return nil
101         }
102         if nch.dir != dir {
103                 client.sendError(hdr, "wrong direction for channel: "+nch.name)
104         }
105         return nch
106 }
107
108 // The function run manages sends and receives for a single client.  For each
109 // (client Recv) request, this will launch a serveRecv goroutine to deliver
110 // the data for that channel, while (client Send) requests are handled as
111 // data arrives from the client.
112 func (client *expClient) run() {
113         hdr := new(header)
114         hdrValue := reflect.ValueOf(hdr)
115         req := new(request)
116         reqValue := reflect.ValueOf(req)
117         error := new(error_)
118         for {
119                 *hdr = header{}
120                 if err := client.decode(hdrValue); err != nil {
121                         if err != io.EOF {
122                                 expLog("error decoding client header:", err)
123                         }
124                         break
125                 }
126                 switch hdr.PayloadType {
127                 case payRequest:
128                         *req = request{}
129                         if err := client.decode(reqValue); err != nil {
130                                 expLog("error decoding client request:", err)
131                                 break
132                         }
133                         if req.Size < 1 {
134                                 panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
135                         }
136                         switch req.Dir {
137                         case Recv:
138                                 // look up channel before calling serveRecv to
139                                 // avoid a lock around client.chans.
140                                 if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
141                                         go client.serveRecv(nch, *hdr, req.Count)
142                                 }
143                         case Send:
144                                 client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
145                                 // The actual sends will have payload type payData.
146                                 // TODO: manage the count?
147                         default:
148                                 error.Error = "request: can't handle channel direction"
149                                 expLog(error.Error, req.Dir)
150                                 client.encode(hdr, payError, error)
151                         }
152                 case payData:
153                         client.serveSend(*hdr)
154                 case payClosed:
155                         client.serveClosed(*hdr)
156                 case payAck:
157                         client.mu.Lock()
158                         if client.ackNum != hdr.SeqNum-1 {
159                                 // Since the sequence number is incremented and the message is sent
160                                 // in a single instance of locking client.mu, the messages are guaranteed
161                                 // to be sent in order.  Therefore receipt of acknowledgement N means
162                                 // all messages <=N have been seen by the recipient.  We check anyway.
163                                 expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
164                         }
165                         if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. 
166                                 client.ackNum = hdr.SeqNum
167                         }
168                         client.mu.Unlock()
169                 case payAckSend:
170                         if nch := client.getChan(hdr, Send); nch != nil {
171                                 nch.acked()
172                         }
173                 default:
174                         log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
175                 }
176         }
177         client.exp.delClient(client)
178 }
179
180 // Send all the data on a single channel to a client asking for a Recv.
181 // The header is passed by value to avoid issues of overwriting.
182 func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
183         for {
184                 val, ok := nch.recv()
185                 if !ok {
186                         if err := client.encode(&hdr, payClosed, nil); err != nil {
187                                 expLog("error encoding server closed message:", err)
188                         }
189                         break
190                 }
191                 // We hold the lock during transmission to guarantee messages are
192                 // sent in sequence number order.  Also, we increment first so the
193                 // value of client.SeqNum is the value of the highest used sequence
194                 // number, not one beyond.
195                 client.mu.Lock()
196                 client.seqNum++
197                 hdr.SeqNum = client.seqNum
198                 client.seqLock.Lock() // guarantee ordering of messages
199                 client.mu.Unlock()
200                 err := client.encode(&hdr, payData, val.Interface())
201                 client.seqLock.Unlock()
202                 if err != nil {
203                         expLog("error encoding client response:", err)
204                         client.sendError(&hdr, err.Error())
205                         break
206                 }
207                 // Negative count means run forever.
208                 if count >= 0 {
209                         if count--; count <= 0 {
210                                 break
211                         }
212                 }
213         }
214 }
215
216 // Receive and deliver locally one item from a client asking for a Send
217 // The header is passed by value to avoid issues of overwriting.
218 func (client *expClient) serveSend(hdr header) {
219         nch := client.getChan(&hdr, Recv)
220         if nch == nil {
221                 return
222         }
223         // Create a new value for each received item.
224         val := reflect.New(nch.ch.Type().Elem()).Elem()
225         if err := client.decode(val); err != nil {
226                 expLog("value decode:", err, "; type ", nch.ch.Type())
227                 return
228         }
229         nch.send(val)
230 }
231
232 // Report that client has closed the channel that is sending to us.
233 // The header is passed by value to avoid issues of overwriting.
234 func (client *expClient) serveClosed(hdr header) {
235         nch := client.getChan(&hdr, Recv)
236         if nch == nil {
237                 return
238         }
239         nch.close()
240 }
241
242 func (client *expClient) unackedCount() int64 {
243         client.mu.Lock()
244         n := client.seqNum - client.ackNum
245         client.mu.Unlock()
246         return n
247 }
248
249 func (client *expClient) seq() int64 {
250         client.mu.Lock()
251         n := client.seqNum
252         client.mu.Unlock()
253         return n
254 }
255
256 func (client *expClient) ack() int64 {
257         client.mu.Lock()
258         n := client.seqNum
259         client.mu.Unlock()
260         return n
261 }
262
263 // Serve waits for incoming connections on the listener
264 // and serves the Exporter's channels on each.
265 // It blocks until the listener is closed.
266 func (exp *Exporter) Serve(listener net.Listener) {
267         for {
268                 conn, err := listener.Accept()
269                 if err != nil {
270                         expLog("listen:", err)
271                         break
272                 }
273                 go exp.ServeConn(conn)
274         }
275 }
276
277 // ServeConn exports the Exporter's channels on conn.
278 // It blocks until the connection is terminated.
279 func (exp *Exporter) ServeConn(conn io.ReadWriter) {
280         exp.addClient(conn).run()
281 }
282
283 // NewExporter creates a new Exporter that exports a set of channels.
284 func NewExporter() *Exporter {
285         e := &Exporter{
286                 clientSet: &clientSet{
287                         names:   make(map[string]*chanDir),
288                         clients: make(map[unackedCounter]bool),
289                 },
290         }
291         return e
292 }
293
294 // ListenAndServe exports the exporter's channels through the
295 // given network and local address defined as in net.Listen.
296 func (exp *Exporter) ListenAndServe(network, localaddr string) error {
297         listener, err := net.Listen(network, localaddr)
298         if err != nil {
299                 return err
300         }
301         go exp.Serve(listener)
302         return nil
303 }
304
305 // addClient creates a new expClient and records its existence
306 func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
307         client := newClient(exp, conn)
308         exp.mu.Lock()
309         exp.clients[client] = true
310         exp.mu.Unlock()
311         return client
312 }
313
314 // delClient forgets the client existed
315 func (exp *Exporter) delClient(client *expClient) {
316         exp.mu.Lock()
317         delete(exp.clients, client)
318         exp.mu.Unlock()
319 }
320
321 // Drain waits until all messages sent from this exporter/importer, including
322 // those not yet sent to any client and possibly including those sent while
323 // Drain was executing, have been received by the importer.  In short, it
324 // waits until all the exporter's messages have been received by a client.
325 // If the timeout (measured in nanoseconds) is positive and Drain takes
326 // longer than that to complete, an error is returned.
327 func (exp *Exporter) Drain(timeout int64) error {
328         // This wrapper function is here so the method's comment will appear in godoc.
329         return exp.clientSet.drain(timeout)
330 }
331
332 // Sync waits until all clients of the exporter have received the messages
333 // that were sent at the time Sync was invoked.  Unlike Drain, it does not
334 // wait for messages sent while it is running or messages that have not been
335 // dispatched to any client.  If the timeout (measured in nanoseconds) is
336 // positive and Sync takes longer than that to complete, an error is
337 // returned.
338 func (exp *Exporter) Sync(timeout int64) error {
339         // This wrapper function is here so the method's comment will appear in godoc.
340         return exp.clientSet.sync(timeout)
341 }
342
343 func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
344         chanType := reflect.TypeOf(chT)
345         if chanType.Kind() != reflect.Chan {
346                 return reflect.Value{}, errors.New("not a channel")
347         }
348         if dir != Send && dir != Recv {
349                 return reflect.Value{}, errors.New("unknown channel direction")
350         }
351         switch chanType.ChanDir() {
352         case reflect.BothDir:
353         case reflect.SendDir:
354                 if dir != Recv {
355                         return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
356                 }
357         case reflect.RecvDir:
358                 if dir != Send {
359                         return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
360                 }
361         }
362         return reflect.ValueOf(chT), nil
363 }
364
365 // Export exports a channel of a given type and specified direction.  The
366 // channel to be exported is provided in the call and may be of arbitrary
367 // channel type.
368 // Despite the literal signature, the effective signature is
369 //      Export(name string, chT chan T, dir Dir)
370 func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
371         ch, err := checkChan(chT, dir)
372         if err != nil {
373                 return err
374         }
375         exp.mu.Lock()
376         defer exp.mu.Unlock()
377         _, present := exp.names[name]
378         if present {
379                 return errors.New("channel name already being exported:" + name)
380         }
381         exp.names[name] = &chanDir{ch, dir}
382         return nil
383 }
384
385 // Hangup disassociates the named channel from the Exporter and closes
386 // the channel.  Messages in flight for the channel may be dropped.
387 func (exp *Exporter) Hangup(name string) error {
388         exp.mu.Lock()
389         chDir, ok := exp.names[name]
390         if ok {
391                 delete(exp.names, name)
392         }
393         // TODO drop all instances of channel from client sets
394         exp.mu.Unlock()
395         if !ok {
396                 return errors.New("netchan export: hangup: no such channel: " + name)
397         }
398         chDir.ch.Close()
399         return nil
400 }