1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // impLog is a logging convenience function. The first argument must be a string.
19 func impLog(args ...interface{}) {
20 args[0] = "netchan import: " + args[0].(string)
24 // An Importer allows a set of channels to be imported from a single
25 // remote machine/network port. A machine may have multiple
26 // importers, even from the same machine/network port.
27 type Importer struct {
29 chanLock sync.Mutex // protects access to channel map
30 names map[string]*netChan
31 chans map[int]*netChan
36 // NewImporter creates a new Importer object to import a set of channels
37 // from the given connection. The Exporter must be available and serving when
38 // the Importer is created.
39 func NewImporter(conn io.ReadWriter) *Importer {
41 imp.encDec = newEncDec(conn)
42 imp.chans = make(map[int]*netChan)
43 imp.names = make(map[string]*netChan)
44 imp.errors = make(chan os.Error, 10)
49 // Import imports a set of channels from the given network and address.
50 func Import(network, remoteaddr string) (*Importer, os.Error) {
51 conn, err := net.Dial(network, "", remoteaddr)
55 return NewImporter(conn), nil
58 // shutdown closes all channels for which we are receiving data from the remote side.
59 func (imp *Importer) shutdown() {
61 for _, ich := range imp.chans {
69 // Handle the data from a single imported data stream, which will
72 // The response identifies by name which channel is transmitting data.
73 func (imp *Importer) run() {
74 // Loop on responses; requests are sent by ImportNValues()
76 hdrValue := reflect.NewValue(hdr)
79 errValue := reflect.NewValue(err)
82 if e := imp.decode(hdrValue); e != nil {
87 switch hdr.PayloadType {
91 if e := imp.decode(errValue); e != nil {
96 impLog("response error:", err.Error)
98 case imp.errors <- os.ErrorString(err.Error):
99 continue // errors are not acknowledged
106 nch := imp.getChan(hdr.Id, false)
110 continue // closes are not acknowledged.
112 // we can receive spurious acks if the channel is
113 // hung up, so we ask getChan to ignore any errors.
114 nch := imp.getChan(hdr.Id, true)
120 impLog("unexpected payload type:", hdr.PayloadType)
123 nch := imp.getChan(hdr.Id, false)
128 impLog("cannot happen: receive from non-Recv channel")
131 // Acknowledge receipt
133 ackHdr.SeqNum = hdr.SeqNum
134 imp.encode(ackHdr, payAck, nil)
135 // Create a new value for each received item.
136 value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
137 if e := imp.decode(value); e != nil {
138 impLog("importer value decode:", e)
145 func (imp *Importer) getChan(id int, errOk bool) *netChan {
148 imp.chanLock.Unlock()
151 impLog("unknown id in netchan request: ", id)
158 // Errors returns a channel from which transmission and protocol errors
159 // can be read. Clients of the importer are not required to read the error
160 // channel for correct execution. However, if too many errors occur
161 // without being read from the error channel, the importer will shut down.
162 func (imp *Importer) Errors() chan os.Error {
166 // Import imports a channel of the given type, size and specified direction.
167 // It is equivalent to ImportNValues with a count of -1, meaning unbounded.
168 func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error {
169 return imp.ImportNValues(name, chT, dir, size, -1)
172 // ImportNValues imports a channel of the given type and specified
173 // direction and then receives or transmits up to n values on that
174 // channel. A value of n==-1 implies an unbounded number of values. The
175 // channel will have buffer space for size values, or 1 value if size < 1.
176 // The channel to be bound to the remote site's channel is provided
177 // in the call and may be of arbitrary channel type.
178 // Despite the literal signature, the effective signature is
179 // ImportNValues(name string, chT chan T, dir Dir, size, n int) os.Error
181 // imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
182 // if err != nil { log.Fatal(err) }
183 // ch := make(chan myType)
184 // err = imp.ImportNValues("name", ch, Recv, 1, 1)
185 // if err != nil { log.Fatal(err) }
186 // fmt.Printf("%+v\n", <-ch)
187 func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error {
188 ch, err := checkChan(chT, dir)
193 defer imp.chanLock.Unlock()
194 _, present := imp.names[name]
196 return os.ErrorString("channel name already being imported:" + name)
203 nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
204 imp.names[name] = nch
206 // Tell the other side about this channel.
207 hdr := &header{Id: id}
208 req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
209 if err = imp.encode(hdr, payRequest, req); err != nil {
210 impLog("request encode:", err)
215 for i := 0; n == -1 || i < n; i++ {
216 val, ok := nch.recv()
218 if err = imp.encode(hdr, payClosed, nil); err != nil {
219 impLog("error encoding client closed message:", err)
223 if err = imp.encode(hdr, payData, val.Interface()); err != nil {
224 impLog("error encoding client send:", err)
233 // Hangup disassociates the named channel from the Importer and closes
234 // the channel. Messages in flight for the channel may be dropped.
235 func (imp *Importer) Hangup(name string) os.Error {
237 defer imp.chanLock.Unlock()
238 nc := imp.names[name]
240 return os.ErrorString("netchan import: hangup: no such channel: " + name)
242 imp.names[name] = nil, false
243 imp.chans[nc.id] = nil, false