1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
16 // The direction of a connection from the client's perspective.
24 func (dir Dir) String() string {
36 payRequest = iota // request structure follows
37 payError // error structure follows
38 payData // user payload follows
39 payAck // acknowledgement; no payload
40 payClosed // channel is now closed
41 payAckSend // payload has been delivered.
44 // A header is sent as a prefix to every transmission. It will be followed by
45 // a request structure, an error structure, or an arbitrary user payload structure.
52 // Sent with a header once per channel from importer to exporter to report
53 // that it wants to bind to a channel with the specified direction for count
54 // messages, with space for size buffered values. If count is -1, it means unlimited.
62 // Sent with a header to report an error.
67 // Used to unify management of acknowledgements for import and export.
68 type unackedCounter interface {
74 // A channel and its direction.
80 // clientSet contains the objects and methods needed for tracking
81 // clients of an exporter and draining outstanding messages.
82 type clientSet struct {
83 mu sync.Mutex // protects access to channel and client maps
84 names map[string]*chanDir
85 clients map[unackedCounter]bool
88 // Mutex-protected encoder and decoder pair.
96 func newEncDec(conn io.ReadWriter) *encDec {
98 dec: gob.NewDecoder(conn),
99 enc: gob.NewEncoder(conn),
103 // Decode an item from the connection.
104 func (ed *encDec) decode(value reflect.Value) error {
106 err := ed.dec.DecodeValue(value)
108 // TODO: tear down connection?
114 // Encode a header and payload onto the connection.
115 func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
117 hdr.PayloadType = payloadType
118 err := ed.enc.Encode(hdr)
121 err = ed.enc.Encode(payload)
125 // TODO: tear down connection if there is an error?
131 // See the comment for Exporter.Drain.
132 func (cs *clientSet) drain(timeout time.Duration) error {
133 deadline := time.Now().Add(timeout)
137 // Any messages waiting for a client?
138 for _, chDir := range cs.names {
139 if chDir.ch.Len() > 0 {
143 // Any unacknowledged messages?
144 for client := range cs.clients {
145 n := client.unackedCount()
146 if n > 0 { // Check for > rather than != just to be safe.
155 if timeout > 0 && time.Now().After(deadline) {
156 return errors.New("timeout")
158 time.Sleep(100 * time.Millisecond)
163 // See the comment for Exporter.Sync.
164 func (cs *clientSet) sync(timeout time.Duration) error {
165 deadline := time.Now().Add(timeout)
166 // seq remembers the clients and their seqNum at point of entry.
167 seq := make(map[unackedCounter]int64)
169 for client := range cs.clients {
170 seq[client] = client.seq()
176 // Any unacknowledged messages? Look only at clients that existed
177 // when we started and are still in this client set.
178 for client := range seq {
179 if _, ok := cs.clients[client]; ok {
180 if client.ack() < seq[client] {
190 if timeout > 0 && time.Now().After(deadline) {
191 return errors.New("timeout")
193 time.Sleep(100 * time.Millisecond)
198 // A netChan represents a channel imported or exported
199 // on a single connection. Flow is controlled by the receiving
200 // side by sending payAckSend messages when values
201 // are delivered into the local channel.
202 type netChan struct {
206 size int // buffer size of channel.
209 // sender-specific state
210 ackCh chan bool // buffered with space for all the acks we need
211 space int // available space.
213 // receiver-specific state
214 sendCh chan reflect.Value // buffered channel of values received from other end.
215 ed *encDec // so that we can send acks.
216 count int64 // number of values still to receive.
219 // Create a new netChan with the given name (only used for
220 // messages), id, direction, buffer size, and count.
221 // The connection to the other side is represented by ed.
222 func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
223 c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
225 c.ackCh = make(chan bool, size)
231 // Close the channel.
232 func (nch *netChan) close() {
237 if nch.sendCh != nil {
238 // If the sender goroutine is active, close the channel to it.
239 // It will close nch.ch when it can.
251 // Send message from remote side to local receiver.
252 func (nch *netChan) send(val reflect.Value) {
254 panic("send on wrong direction of channel")
256 if nch.sendCh == nil {
257 // If possible, do local send directly and ack immediately.
258 if nch.ch.TrySend(val) {
262 // Start sender goroutine to manage delayed delivery of values.
263 nch.sendCh = make(chan reflect.Value, nch.size)
267 case nch.sendCh <- val:
270 // TODO: should this be more resilient?
271 panic("netchan: remote sender sent more values than allowed")
275 // sendAck sends an acknowledgment that a message has left
276 // the channel's buffer. If the messages remaining to be sent
277 // will fit in the channel's buffer, then we don't
278 // need to send an ack.
279 func (nch *netChan) sendAck() {
280 if nch.count < 0 || nch.count > int64(nch.size) {
281 nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
288 // The sender process forwards items from the sending queue
289 // to the destination channel, acknowledging each item.
290 func (nch *netChan) sender() {
292 panic("sender on wrong direction of channel")
294 // When Exporter.Hangup is called, the underlying channel is closed,
295 // and so we may get a "too many operations on closed channel" error
296 // if there are outstanding messages in sendCh.
297 // Make sure that this doesn't panic the whole program.
299 if r := recover(); r != nil {
300 // TODO check that r is "too many operations", otherwise re-panic.
303 for v := range nch.sendCh {
310 // Receive value from local side for sending to remote side.
311 func (nch *netChan) recv() (val reflect.Value, ok bool) {
313 panic("recv on wrong direction of channel")
317 // Wait for buffer space.
325 // acked is called when the remote side indicates that
326 // a value has been delivered.
327 func (nch *netChan) acked() {
329 panic("recv on wrong direction of channel")
332 case nch.ackCh <- true:
335 // TODO: should this be more resilient?
336 panic("netchan: remote receiver sent too many acks")