OSDN Git Service

Remove the types float and complex.
[pf3gnuchains/gcc-fork.git] / libgo / go / netchan / common.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package netchan
6
7 import (
8         "gob"
9         "net"
10         "os"
11         "reflect"
12         "sync"
13         "time"
14 )
15
16 // The direction of a connection from the client's perspective.
17 type Dir int
18
19 const (
20         Recv Dir = iota
21         Send
22 )
23
24 func (dir Dir) String() string {
25         switch dir {
26         case Recv:
27                 return "Recv"
28         case Send:
29                 return "Send"
30         }
31         return "???"
32 }
33
34 // Payload types
35 const (
36         payRequest = iota // request structure follows
37         payError          // error structure follows
38         payData           // user payload follows
39         payAck            // acknowledgement; no payload
40         payClosed         // channel is now closed
41         payAckSend        // payload has been delivered.
42 )
43
44 // A header is sent as a prefix to every transmission.  It will be followed by
45 // a request structure, an error structure, or an arbitrary user payload structure.
46 type header struct {
47         Id          int
48         PayloadType int
49         SeqNum      int64
50 }
51
52 // Sent with a header once per channel from importer to exporter to report
53 // that it wants to bind to a channel with the specified direction for count
54 // messages, with space for size buffered values. If count is -1, it means unlimited.
55 type request struct {
56         Name  string
57         Count int64
58         Size  int
59         Dir   Dir
60 }
61
62 // Sent with a header to report an error.
63 type error struct {
64         Error string
65 }
66
67 // Used to unify management of acknowledgements for import and export.
68 type unackedCounter interface {
69         unackedCount() int64
70         ack() int64
71         seq() int64
72 }
73
74 // A channel and its direction.
75 type chanDir struct {
76         ch  *reflect.ChanValue
77         dir Dir
78 }
79
80 // clientSet contains the objects and methods needed for tracking
81 // clients of an exporter and draining outstanding messages.
82 type clientSet struct {
83         mu      sync.Mutex // protects access to channel and client maps
84         names   map[string]*chanDir
85         clients map[unackedCounter]bool
86 }
87
88 // Mutex-protected encoder and decoder pair.
89 type encDec struct {
90         decLock sync.Mutex
91         dec     *gob.Decoder
92         encLock sync.Mutex
93         enc     *gob.Encoder
94 }
95
96 func newEncDec(conn net.Conn) *encDec {
97         return &encDec{
98                 dec: gob.NewDecoder(conn),
99                 enc: gob.NewEncoder(conn),
100         }
101 }
102
103 // Decode an item from the connection.
104 func (ed *encDec) decode(value reflect.Value) os.Error {
105         ed.decLock.Lock()
106         err := ed.dec.DecodeValue(value)
107         if err != nil {
108                 // TODO: tear down connection?
109         }
110         ed.decLock.Unlock()
111         return err
112 }
113
114 // Encode a header and payload onto the connection.
115 func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
116         ed.encLock.Lock()
117         hdr.PayloadType = payloadType
118         err := ed.enc.Encode(hdr)
119         if err == nil {
120                 if payload != nil {
121                         err = ed.enc.Encode(payload)
122                 }
123         }
124         if err != nil {
125                 // TODO: tear down connection if there is an error?
126         }
127         ed.encLock.Unlock()
128         return err
129 }
130
131 // See the comment for Exporter.Drain.
132 func (cs *clientSet) drain(timeout int64) os.Error {
133         startTime := time.Nanoseconds()
134         for {
135                 pending := false
136                 cs.mu.Lock()
137                 // Any messages waiting for a client?
138                 for _, chDir := range cs.names {
139                         if chDir.ch.Len() > 0 {
140                                 pending = true
141                         }
142                 }
143                 // Any unacknowledged messages?
144                 for client := range cs.clients {
145                         n := client.unackedCount()
146                         if n > 0 { // Check for > rather than != just to be safe.
147                                 pending = true
148                                 break
149                         }
150                 }
151                 cs.mu.Unlock()
152                 if !pending {
153                         break
154                 }
155                 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
156                         return os.ErrorString("timeout")
157                 }
158                 time.Sleep(100 * 1e6) // 100 milliseconds
159         }
160         return nil
161 }
162
163 // See the comment for Exporter.Sync.
164 func (cs *clientSet) sync(timeout int64) os.Error {
165         startTime := time.Nanoseconds()
166         // seq remembers the clients and their seqNum at point of entry.
167         seq := make(map[unackedCounter]int64)
168         for client := range cs.clients {
169                 seq[client] = client.seq()
170         }
171         for {
172                 pending := false
173                 cs.mu.Lock()
174                 // Any unacknowledged messages?  Look only at clients that existed
175                 // when we started and are still in this client set.
176                 for client := range seq {
177                         if _, ok := cs.clients[client]; ok {
178                                 if client.ack() < seq[client] {
179                                         pending = true
180                                         break
181                                 }
182                         }
183                 }
184                 cs.mu.Unlock()
185                 if !pending {
186                         break
187                 }
188                 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
189                         return os.ErrorString("timeout")
190                 }
191                 time.Sleep(100 * 1e6) // 100 milliseconds
192         }
193         return nil
194 }
195
196 // A netChan represents a channel imported or exported
197 // on a single connection. Flow is controlled by the receiving
198 // side by sending payAckSend messages when values
199 // are delivered into the local channel.
200 type netChan struct {
201         *chanDir
202         name string
203         id   int
204         size int // buffer size of channel.
205
206         // sender-specific state
207         ackCh chan bool // buffered with space for all the acks we need
208         space int       // available space.
209
210         // receiver-specific state
211         sendCh chan reflect.Value // buffered channel of values received from other end.
212         ed     *encDec            // so that we can send acks.
213         count  int64              // number of values still to receive.
214 }
215
216 // Create a new netChan with the given name (only used for
217 // messages), id, direction, buffer size, and count.
218 // The connection to the other side is represented by ed.
219 func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
220         c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
221         if c.dir == Send {
222                 c.ackCh = make(chan bool, size)
223                 c.space = size
224         }
225         return c
226 }
227
228 // Close the channel.
229 func (nch *netChan) close() {
230         if nch.dir == Recv {
231                 if nch.sendCh != nil {
232                         // If the sender goroutine is active, close the channel to it.
233                         // It will close nch.ch when it can.
234                         close(nch.sendCh)
235                 } else {
236                         nch.ch.Close()
237                 }
238         } else {
239                 nch.ch.Close()
240                 close(nch.ackCh)
241         }
242 }
243
244 // Send message from remote side to local receiver.
245 func (nch *netChan) send(val reflect.Value) {
246         if nch.dir != Recv {
247                 panic("send on wrong direction of channel")
248         }
249         if nch.sendCh == nil {
250                 // If possible, do local send directly and ack immediately.
251                 if nch.ch.TrySend(val) {
252                         nch.sendAck()
253                         return
254                 }
255                 // Start sender goroutine to manage delayed delivery of values.
256                 nch.sendCh = make(chan reflect.Value, nch.size)
257                 go nch.sender()
258         }
259         if ok := nch.sendCh <- val; !ok {
260                 // TODO: should this be more resilient?
261                 panic("netchan: remote sender sent more values than allowed")
262         }
263 }
264
265 // sendAck sends an acknowledgment that a message has left
266 // the channel's buffer. If the messages remaining to be sent
267 // will fit in the channel's buffer, then we don't
268 // need to send an ack.
269 func (nch *netChan) sendAck() {
270         if nch.count < 0 || nch.count > int64(nch.size) {
271                 nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
272         }
273         if nch.count > 0 {
274                 nch.count--
275         }
276 }
277
278 // The sender process forwards items from the sending queue
279 // to the destination channel, acknowledging each item.
280 func (nch *netChan) sender() {
281         if nch.dir != Recv {
282                 panic("sender on wrong direction of channel")
283         }
284         // When Exporter.Hangup is called, the underlying channel is closed,
285         // and so we may get a "too many operations on closed channel" error
286         // if there are outstanding messages in sendCh.
287         // Make sure that this doesn't panic the whole program.
288         defer func() {
289                 if r := recover(); r != nil {
290                         // TODO check that r is "too many operations", otherwise re-panic.
291                 }
292         }()
293         for v := range nch.sendCh {
294                 nch.ch.Send(v)
295                 nch.sendAck()
296         }
297         nch.ch.Close()
298 }
299
300 // Receive value from local side for sending to remote side.
301 func (nch *netChan) recv() (val reflect.Value, closed bool) {
302         if nch.dir != Send {
303                 panic("recv on wrong direction of channel")
304         }
305
306         if nch.space == 0 {
307                 // Wait for buffer space.
308                 <-nch.ackCh
309                 nch.space++
310         }
311         nch.space--
312         return nch.ch.Recv(), nch.ch.Closed()
313 }
314
315 // acked is called when the remote side indicates that
316 // a value has been delivered.
317 func (nch *netChan) acked() {
318         if nch.dir != Send {
319                 panic("recv on wrong direction of channel")
320         }
321         if ok := nch.ackCh <- true; !ok {
322                 panic("netchan: remote receiver sent too many acks")
323                 // TODO: should this be more resilient?
324         }
325 }