OSDN Git Service

libgo: Update to weekly.2012-01-20.
[pf3gnuchains/gcc-fork.git] / libgo / go / old / netchan / common.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package netchan
6
7 import (
8         "encoding/gob"
9         "errors"
10         "io"
11         "reflect"
12         "sync"
13         "time"
14 )
15
16 // The direction of a connection from the client's perspective.
17 type Dir int
18
19 const (
20         Recv Dir = iota
21         Send
22 )
23
24 func (dir Dir) String() string {
25         switch dir {
26         case Recv:
27                 return "Recv"
28         case Send:
29                 return "Send"
30         }
31         return "???"
32 }
33
34 // Payload types
35 const (
36         payRequest = iota // request structure follows
37         payError          // error structure follows
38         payData           // user payload follows
39         payAck            // acknowledgement; no payload
40         payClosed         // channel is now closed
41         payAckSend        // payload has been delivered.
42 )
43
44 // A header is sent as a prefix to every transmission.  It will be followed by
45 // a request structure, an error structure, or an arbitrary user payload structure.
46 type header struct {
47         Id          int
48         PayloadType int
49         SeqNum      int64
50 }
51
52 // Sent with a header once per channel from importer to exporter to report
53 // that it wants to bind to a channel with the specified direction for count
54 // messages, with space for size buffered values. If count is -1, it means unlimited.
55 type request struct {
56         Name  string
57         Count int64
58         Size  int
59         Dir   Dir
60 }
61
62 // Sent with a header to report an error.
63 type error_ struct {
64         Error string
65 }
66
67 // Used to unify management of acknowledgements for import and export.
68 type unackedCounter interface {
69         unackedCount() int64
70         ack() int64
71         seq() int64
72 }
73
74 // A channel and its direction.
75 type chanDir struct {
76         ch  reflect.Value
77         dir Dir
78 }
79
80 // clientSet contains the objects and methods needed for tracking
81 // clients of an exporter and draining outstanding messages.
82 type clientSet struct {
83         mu      sync.Mutex // protects access to channel and client maps
84         names   map[string]*chanDir
85         clients map[unackedCounter]bool
86 }
87
88 // Mutex-protected encoder and decoder pair.
89 type encDec struct {
90         decLock sync.Mutex
91         dec     *gob.Decoder
92         encLock sync.Mutex
93         enc     *gob.Encoder
94 }
95
96 func newEncDec(conn io.ReadWriter) *encDec {
97         return &encDec{
98                 dec: gob.NewDecoder(conn),
99                 enc: gob.NewEncoder(conn),
100         }
101 }
102
103 // Decode an item from the connection.
104 func (ed *encDec) decode(value reflect.Value) error {
105         ed.decLock.Lock()
106         err := ed.dec.DecodeValue(value)
107         if err != nil {
108                 // TODO: tear down connection?
109         }
110         ed.decLock.Unlock()
111         return err
112 }
113
114 // Encode a header and payload onto the connection.
115 func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
116         ed.encLock.Lock()
117         hdr.PayloadType = payloadType
118         err := ed.enc.Encode(hdr)
119         if err == nil {
120                 if payload != nil {
121                         err = ed.enc.Encode(payload)
122                 }
123         }
124         if err != nil {
125                 // TODO: tear down connection if there is an error?
126         }
127         ed.encLock.Unlock()
128         return err
129 }
130
131 // See the comment for Exporter.Drain.
132 func (cs *clientSet) drain(timeout time.Duration) error {
133         deadline := time.Now().Add(timeout)
134         for {
135                 pending := false
136                 cs.mu.Lock()
137                 // Any messages waiting for a client?
138                 for _, chDir := range cs.names {
139                         if chDir.ch.Len() > 0 {
140                                 pending = true
141                         }
142                 }
143                 // Any unacknowledged messages?
144                 for client := range cs.clients {
145                         n := client.unackedCount()
146                         if n > 0 { // Check for > rather than != just to be safe.
147                                 pending = true
148                                 break
149                         }
150                 }
151                 cs.mu.Unlock()
152                 if !pending {
153                         break
154                 }
155                 if timeout > 0 && time.Now().After(deadline) {
156                         return errors.New("timeout")
157                 }
158                 time.Sleep(100 * time.Millisecond)
159         }
160         return nil
161 }
162
163 // See the comment for Exporter.Sync.
164 func (cs *clientSet) sync(timeout time.Duration) error {
165         deadline := time.Now().Add(timeout)
166         // seq remembers the clients and their seqNum at point of entry.
167         seq := make(map[unackedCounter]int64)
168         cs.mu.Lock()
169         for client := range cs.clients {
170                 seq[client] = client.seq()
171         }
172         cs.mu.Unlock()
173         for {
174                 pending := false
175                 cs.mu.Lock()
176                 // Any unacknowledged messages?  Look only at clients that existed
177                 // when we started and are still in this client set.
178                 for client := range seq {
179                         if _, ok := cs.clients[client]; ok {
180                                 if client.ack() < seq[client] {
181                                         pending = true
182                                         break
183                                 }
184                         }
185                 }
186                 cs.mu.Unlock()
187                 if !pending {
188                         break
189                 }
190                 if timeout > 0 && time.Now().After(deadline) {
191                         return errors.New("timeout")
192                 }
193                 time.Sleep(100 * time.Millisecond)
194         }
195         return nil
196 }
197
198 // A netChan represents a channel imported or exported
199 // on a single connection. Flow is controlled by the receiving
200 // side by sending payAckSend messages when values
201 // are delivered into the local channel.
202 type netChan struct {
203         *chanDir
204         name   string
205         id     int
206         size   int // buffer size of channel.
207         closed bool
208
209         // sender-specific state
210         ackCh chan bool // buffered with space for all the acks we need
211         space int       // available space.
212
213         // receiver-specific state
214         sendCh chan reflect.Value // buffered channel of values received from other end.
215         ed     *encDec            // so that we can send acks.
216         count  int64              // number of values still to receive.
217 }
218
219 // Create a new netChan with the given name (only used for
220 // messages), id, direction, buffer size, and count.
221 // The connection to the other side is represented by ed.
222 func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
223         c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
224         if c.dir == Send {
225                 c.ackCh = make(chan bool, size)
226                 c.space = size
227         }
228         return c
229 }
230
231 // Close the channel.
232 func (nch *netChan) close() {
233         if nch.closed {
234                 return
235         }
236         if nch.dir == Recv {
237                 if nch.sendCh != nil {
238                         // If the sender goroutine is active, close the channel to it.
239                         // It will close nch.ch when it can.
240                         close(nch.sendCh)
241                 } else {
242                         nch.ch.Close()
243                 }
244         } else {
245                 nch.ch.Close()
246                 close(nch.ackCh)
247         }
248         nch.closed = true
249 }
250
251 // Send message from remote side to local receiver.
252 func (nch *netChan) send(val reflect.Value) {
253         if nch.dir != Recv {
254                 panic("send on wrong direction of channel")
255         }
256         if nch.sendCh == nil {
257                 // If possible, do local send directly and ack immediately.
258                 if nch.ch.TrySend(val) {
259                         nch.sendAck()
260                         return
261                 }
262                 // Start sender goroutine to manage delayed delivery of values.
263                 nch.sendCh = make(chan reflect.Value, nch.size)
264                 go nch.sender()
265         }
266         select {
267         case nch.sendCh <- val:
268                 // ok
269         default:
270                 // TODO: should this be more resilient?
271                 panic("netchan: remote sender sent more values than allowed")
272         }
273 }
274
275 // sendAck sends an acknowledgment that a message has left
276 // the channel's buffer. If the messages remaining to be sent
277 // will fit in the channel's buffer, then we don't
278 // need to send an ack.
279 func (nch *netChan) sendAck() {
280         if nch.count < 0 || nch.count > int64(nch.size) {
281                 nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
282         }
283         if nch.count > 0 {
284                 nch.count--
285         }
286 }
287
288 // The sender process forwards items from the sending queue
289 // to the destination channel, acknowledging each item.
290 func (nch *netChan) sender() {
291         if nch.dir != Recv {
292                 panic("sender on wrong direction of channel")
293         }
294         // When Exporter.Hangup is called, the underlying channel is closed,
295         // and so we may get a "too many operations on closed channel" error
296         // if there are outstanding messages in sendCh.
297         // Make sure that this doesn't panic the whole program.
298         defer func() {
299                 if r := recover(); r != nil {
300                         // TODO check that r is "too many operations", otherwise re-panic.
301                 }
302         }()
303         for v := range nch.sendCh {
304                 nch.ch.Send(v)
305                 nch.sendAck()
306         }
307         nch.ch.Close()
308 }
309
310 // Receive value from local side for sending to remote side.
311 func (nch *netChan) recv() (val reflect.Value, ok bool) {
312         if nch.dir != Send {
313                 panic("recv on wrong direction of channel")
314         }
315
316         if nch.space == 0 {
317                 // Wait for buffer space.
318                 <-nch.ackCh
319                 nch.space++
320         }
321         nch.space--
322         return nch.ch.Recv()
323 }
324
325 // acked is called when the remote side indicates that
326 // a value has been delivered.
327 func (nch *netChan) acked() {
328         if nch.dir != Send {
329                 panic("recv on wrong direction of channel")
330         }
331         select {
332         case nch.ackCh <- true:
333                 // ok
334         default:
335                 // TODO: should this be more resilient?
336                 panic("netchan: remote receiver sent too many acks")
337         }
338 }