1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
16 // The direction of a connection from the client's perspective.
24 func (dir Dir) String() string {
36 payRequest = iota // request structure follows
37 payError // error structure follows
38 payData // user payload follows
39 payAck // acknowledgement; no payload
40 payClosed // channel is now closed
41 payAckSend // payload has been delivered.
44 // A header is sent as a prefix to every transmission. It will be followed by
45 // a request structure, an error structure, or an arbitrary user payload structure.
52 // Sent with a header once per channel from importer to exporter to report
53 // that it wants to bind to a channel with the specified direction for count
54 // messages, with space for size buffered values. If count is -1, it means unlimited.
62 // Sent with a header to report an error.
67 // Used to unify management of acknowledgements for import and export.
68 type unackedCounter interface {
74 // A channel and its direction.
80 // clientSet contains the objects and methods needed for tracking
81 // clients of an exporter and draining outstanding messages.
82 type clientSet struct {
83 mu sync.Mutex // protects access to channel and client maps
84 names map[string]*chanDir
85 clients map[unackedCounter]bool
88 // Mutex-protected encoder and decoder pair.
96 func newEncDec(conn net.Conn) *encDec {
98 dec: gob.NewDecoder(conn),
99 enc: gob.NewEncoder(conn),
103 // Decode an item from the connection.
104 func (ed *encDec) decode(value reflect.Value) os.Error {
106 err := ed.dec.DecodeValue(value)
108 // TODO: tear down connection?
114 // Encode a header and payload onto the connection.
115 func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
117 hdr.PayloadType = payloadType
118 err := ed.enc.Encode(hdr)
121 err = ed.enc.Encode(payload)
125 // TODO: tear down connection if there is an error?
131 // See the comment for Exporter.Drain.
132 func (cs *clientSet) drain(timeout int64) os.Error {
133 startTime := time.Nanoseconds()
137 // Any messages waiting for a client?
138 for _, chDir := range cs.names {
139 if chDir.ch.Len() > 0 {
143 // Any unacknowledged messages?
144 for client := range cs.clients {
145 n := client.unackedCount()
146 if n > 0 { // Check for > rather than != just to be safe.
155 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
156 return os.ErrorString("timeout")
158 time.Sleep(100 * 1e6) // 100 milliseconds
163 // See the comment for Exporter.Sync.
164 func (cs *clientSet) sync(timeout int64) os.Error {
165 startTime := time.Nanoseconds()
166 // seq remembers the clients and their seqNum at point of entry.
167 seq := make(map[unackedCounter]int64)
168 for client := range cs.clients {
169 seq[client] = client.seq()
174 // Any unacknowledged messages? Look only at clients that existed
175 // when we started and are still in this client set.
176 for client := range seq {
177 if _, ok := cs.clients[client]; ok {
178 if client.ack() < seq[client] {
188 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
189 return os.ErrorString("timeout")
191 time.Sleep(100 * 1e6) // 100 milliseconds
196 // A netChan represents a channel imported or exported
197 // on a single connection. Flow is controlled by the receiving
198 // side by sending payAckSend messages when values
199 // are delivered into the local channel.
200 type netChan struct {
204 size int // buffer size of channel.
206 // sender-specific state
207 ackCh chan bool // buffered with space for all the acks we need
208 space int // available space.
210 // receiver-specific state
211 sendCh chan reflect.Value // buffered channel of values received from other end.
212 ed *encDec // so that we can send acks.
213 count int64 // number of values still to receive.
216 // Create a new netChan with the given name (only used for
217 // messages), id, direction, buffer size, and count.
218 // The connection to the other side is represented by ed.
219 func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
220 c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
222 c.ackCh = make(chan bool, size)
228 // Close the channel.
229 func (nch *netChan) close() {
231 if nch.sendCh != nil {
232 // If the sender goroutine is active, close the channel to it.
233 // It will close nch.ch when it can.
244 // Send message from remote side to local receiver.
245 func (nch *netChan) send(val reflect.Value) {
247 panic("send on wrong direction of channel")
249 if nch.sendCh == nil {
250 // If possible, do local send directly and ack immediately.
251 if nch.ch.TrySend(val) {
255 // Start sender goroutine to manage delayed delivery of values.
256 nch.sendCh = make(chan reflect.Value, nch.size)
259 if ok := nch.sendCh <- val; !ok {
260 // TODO: should this be more resilient?
261 panic("netchan: remote sender sent more values than allowed")
265 // sendAck sends an acknowledgment that a message has left
266 // the channel's buffer. If the messages remaining to be sent
267 // will fit in the channel's buffer, then we don't
268 // need to send an ack.
269 func (nch *netChan) sendAck() {
270 if nch.count < 0 || nch.count > int64(nch.size) {
271 nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
278 // The sender process forwards items from the sending queue
279 // to the destination channel, acknowledging each item.
280 func (nch *netChan) sender() {
282 panic("sender on wrong direction of channel")
284 // When Exporter.Hangup is called, the underlying channel is closed,
285 // and so we may get a "too many operations on closed channel" error
286 // if there are outstanding messages in sendCh.
287 // Make sure that this doesn't panic the whole program.
289 if r := recover(); r != nil {
290 // TODO check that r is "too many operations", otherwise re-panic.
293 for v := range nch.sendCh {
300 // Receive value from local side for sending to remote side.
301 func (nch *netChan) recv() (val reflect.Value, closed bool) {
303 panic("recv on wrong direction of channel")
307 // Wait for buffer space.
312 return nch.ch.Recv(), nch.ch.Closed()
315 // acked is called when the remote side indicates that
316 // a value has been delivered.
317 func (nch *netChan) acked() {
319 panic("recv on wrong direction of channel")
321 if ok := nch.ackCh <- true; !ok {
322 panic("netchan: remote receiver sent too many acks")
323 // TODO: should this be more resilient?