*encDec
conn net.Conn
chanLock sync.Mutex // protects access to channel map
- chans map[string]*chanDir
+ names map[string]*netChan
+ chans map[int]*netChan
errors chan os.Error
+ maxId int
}
// NewImporter creates a new Importer object to import channels
imp := new(Importer)
imp.encDec = newEncDec(conn)
imp.conn = conn
- imp.chans = make(map[string]*chanDir)
+ imp.chans = make(map[int]*netChan)
+ imp.names = make(map[string]*netChan)
imp.errors = make(chan os.Error, 10)
go imp.run()
return imp, nil
imp.chanLock.Lock()
for _, ich := range imp.chans {
if ich.dir == Recv {
- ich.ch.Close()
+ ich.close()
}
}
imp.chanLock.Unlock()
imp.shutdown()
return
}
- switch hdr.payloadType {
+ switch hdr.PayloadType {
case payData:
// done lower in loop
case payError:
impLog("error:", e)
return
}
- if err.error != "" {
- impLog("response error:", err.error)
- if sent := imp.errors <- os.ErrorString(err.error); !sent {
+ if err.Error != "" {
+ impLog("response error:", err.Error)
+ if sent := imp.errors <- os.ErrorString(err.Error); !sent {
imp.shutdown()
return
}
continue // errors are not acknowledged.
}
case payClosed:
- ich := imp.getChan(hdr.name)
- if ich != nil {
- ich.ch.Close()
+ nch := imp.getChan(hdr.Id, false)
+ if nch != nil {
+ nch.close()
}
continue // closes are not acknowledged.
+ case payAckSend:
+ // we can receive spurious acks if the channel is
+ // hung up, so we ask getChan to ignore any errors.
+ nch := imp.getChan(hdr.Id, true)
+ if nch != nil {
+ nch.acked()
+ }
+ continue
default:
- impLog("unexpected payload type:", hdr.payloadType)
+ impLog("unexpected payload type:", hdr.PayloadType)
return
}
- ich := imp.getChan(hdr.name)
- if ich == nil {
+ nch := imp.getChan(hdr.Id, false)
+ if nch == nil {
continue
}
- if ich.dir != Recv {
+ if nch.dir != Recv {
impLog("cannot happen: receive from non-Recv channel")
return
}
// Acknowledge receipt
- ackHdr.name = hdr.name
- ackHdr.seqNum = hdr.seqNum
+ ackHdr.Id = hdr.Id
+ ackHdr.SeqNum = hdr.SeqNum
imp.encode(ackHdr, payAck, nil)
// Create a new value for each received item.
- value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem())
+ value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
if e := imp.decode(value); e != nil {
impLog("importer value decode:", e)
return
}
- ich.ch.Send(value)
+ nch.send(value)
}
}
-func (imp *Importer) getChan(name string) *chanDir {
+func (imp *Importer) getChan(id int, errOk bool) *netChan {
imp.chanLock.Lock()
- ich := imp.chans[name]
+ ich := imp.chans[id]
imp.chanLock.Unlock()
if ich == nil {
- impLog("unknown name in netchan request:", name)
+ if !errOk {
+ impLog("unknown id in netchan request: ", id)
+ }
return nil
}
return ich
return imp.errors
}
-// Import imports a channel of the given type and specified direction.
+// Import imports a channel of the given type, size and specified direction.
// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
-func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
- return imp.ImportNValues(name, chT, dir, -1)
+func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error {
+ return imp.ImportNValues(name, chT, dir, size, -1)
}
-// ImportNValues imports a channel of the given type and specified direction
-// and then receives or transmits up to n values on that channel. A value of
-// n==-1 implies an unbounded number of values. The channel to be bound to
-// the remote site's channel is provided in the call and may be of arbitrary
-// channel type.
+// ImportNValues imports a channel of the given type and specified
+// direction and then receives or transmits up to n values on that
+// channel. A value of n==-1 implies an unbounded number of values. The
+// channel will have buffer space for size values, or 1 value if size < 1.
+// The channel to be bound to the remote site's channel is provided
+// in the call and may be of arbitrary channel type.
// Despite the literal signature, the effective signature is
// ImportNValues(name string, chT chan T, dir Dir, n int) os.Error
// Example usage:
// err = imp.ImportNValues("name", ch, Recv, 1)
// if err != nil { log.Exit(err) }
// fmt.Printf("%+v\n", <-ch)
-func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) os.Error {
+func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error {
ch, err := checkChan(chT, dir)
if err != nil {
return err
}
imp.chanLock.Lock()
defer imp.chanLock.Unlock()
- _, present := imp.chans[name]
+ _, present := imp.names[name]
if present {
return os.ErrorString("channel name already being imported:" + name)
}
- imp.chans[name] = &chanDir{ch, dir}
+ if size < 1 {
+ size = 1
+ }
+ id := imp.maxId
+ imp.maxId++
+ nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
+ imp.names[name] = nch
+ imp.chans[id] = nch
// Tell the other side about this channel.
- hdr := &header{name: name}
- req := &request{count: int64(n), dir: dir}
+ hdr := &header{Id: id}
+ req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
if err = imp.encode(hdr, payRequest, req); err != nil {
impLog("request encode:", err)
return err
if dir == Send {
go func() {
for i := 0; n == -1 || i < n; i++ {
- val := ch.Recv()
- if ch.Closed() {
+ val, closed := nch.recv()
+ if closed {
if err = imp.encode(hdr, payClosed, nil); err != nil {
impLog("error encoding client closed message:", err)
}
// the channel. Messages in flight for the channel may be dropped.
func (imp *Importer) Hangup(name string) os.Error {
imp.chanLock.Lock()
- chDir, ok := imp.chans[name]
+ nc, ok := imp.names[name]
if ok {
- imp.chans[name] = nil, false
+ imp.names[name] = nil, false
+ imp.chans[nc.id] = nil, false
}
imp.chanLock.Unlock()
if !ok {
return os.ErrorString("netchan import: hangup: no such channel: " + name)
}
- chDir.ch.Close()
+ nc.close()
return nil
}