OSDN Git Service

5db679a3ed6d251003ec76941e6dd38a0fd65e9d
[pf3gnuchains/gcc-fork.git] / libgo / go / netchan / import.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package netchan
6
7 import (
8         "io"
9         "log"
10         "net"
11         "os"
12         "reflect"
13         "sync"
14 )
15
16 // Import
17
18 // impLog is a logging convenience function.  The first argument must be a string.
19 func impLog(args ...interface{}) {
20         args[0] = "netchan import: " + args[0].(string)
21         log.Print(args...)
22 }
23
24 // An Importer allows a set of channels to be imported from a single
25 // remote machine/network port.  A machine may have multiple
26 // importers, even from the same machine/network port.
27 type Importer struct {
28         *encDec
29         chanLock sync.Mutex // protects access to channel map
30         names    map[string]*netChan
31         chans    map[int]*netChan
32         errors   chan os.Error
33         maxId    int
34 }
35
36 // NewImporter creates a new Importer object to import a set of channels
37 // from the given connection. The Exporter must be available and serving when
38 // the Importer is created.
39 func NewImporter(conn io.ReadWriter) *Importer {
40         imp := new(Importer)
41         imp.encDec = newEncDec(conn)
42         imp.chans = make(map[int]*netChan)
43         imp.names = make(map[string]*netChan)
44         imp.errors = make(chan os.Error, 10)
45         go imp.run()
46         return imp
47 }
48
49 // Import imports a set of channels from the given network and address.
50 func Import(network, remoteaddr string) (*Importer, os.Error) {
51         conn, err := net.Dial(network, "", remoteaddr)
52         if err != nil {
53                 return nil, err
54         }
55         return NewImporter(conn), nil
56 }
57
58 // shutdown closes all channels for which we are receiving data from the remote side.
59 func (imp *Importer) shutdown() {
60         imp.chanLock.Lock()
61         for _, ich := range imp.chans {
62                 if ich.dir == Recv {
63                         ich.close()
64                 }
65         }
66         imp.chanLock.Unlock()
67 }
68
69 // Handle the data from a single imported data stream, which will
70 // have the form
71 //      (response, data)*
72 // The response identifies by name which channel is transmitting data.
73 func (imp *Importer) run() {
74         // Loop on responses; requests are sent by ImportNValues()
75         hdr := new(header)
76         hdrValue := reflect.NewValue(hdr)
77         ackHdr := new(header)
78         err := new(error)
79         errValue := reflect.NewValue(err)
80         for {
81                 *hdr = header{}
82                 if e := imp.decode(hdrValue); e != nil {
83                         impLog("header:", e)
84                         imp.shutdown()
85                         return
86                 }
87                 switch hdr.PayloadType {
88                 case payData:
89                         // done lower in loop
90                 case payError:
91                         if e := imp.decode(errValue); e != nil {
92                                 impLog("error:", e)
93                                 return
94                         }
95                         if err.Error != "" {
96                                 impLog("response error:", err.Error)
97                                 select {
98                                 case imp.errors <- os.ErrorString(err.Error):
99                                         continue // errors are not acknowledged
100                                 default:
101                                         imp.shutdown()
102                                         return
103                                 }
104                         }
105                 case payClosed:
106                         nch := imp.getChan(hdr.Id, false)
107                         if nch != nil {
108                                 nch.close()
109                         }
110                         continue // closes are not acknowledged.
111                 case payAckSend:
112                         // we can receive spurious acks if the channel is
113                         // hung up, so we ask getChan to ignore any errors.
114                         nch := imp.getChan(hdr.Id, true)
115                         if nch != nil {
116                                 nch.acked()
117                         }
118                         continue
119                 default:
120                         impLog("unexpected payload type:", hdr.PayloadType)
121                         return
122                 }
123                 nch := imp.getChan(hdr.Id, false)
124                 if nch == nil {
125                         continue
126                 }
127                 if nch.dir != Recv {
128                         impLog("cannot happen: receive from non-Recv channel")
129                         return
130                 }
131                 // Acknowledge receipt
132                 ackHdr.Id = hdr.Id
133                 ackHdr.SeqNum = hdr.SeqNum
134                 imp.encode(ackHdr, payAck, nil)
135                 // Create a new value for each received item.
136                 value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
137                 if e := imp.decode(value); e != nil {
138                         impLog("importer value decode:", e)
139                         return
140                 }
141                 nch.send(value)
142         }
143 }
144
145 func (imp *Importer) getChan(id int, errOk bool) *netChan {
146         imp.chanLock.Lock()
147         ich := imp.chans[id]
148         imp.chanLock.Unlock()
149         if ich == nil {
150                 if !errOk {
151                         impLog("unknown id in netchan request: ", id)
152                 }
153                 return nil
154         }
155         return ich
156 }
157
158 // Errors returns a channel from which transmission and protocol errors
159 // can be read. Clients of the importer are not required to read the error
160 // channel for correct execution. However, if too many errors occur
161 // without being read from the error channel, the importer will shut down.
162 func (imp *Importer) Errors() chan os.Error {
163         return imp.errors
164 }
165
166 // Import imports a channel of the given type, size and specified direction.
167 // It is equivalent to ImportNValues with a count of -1, meaning unbounded.
168 func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error {
169         return imp.ImportNValues(name, chT, dir, size, -1)
170 }
171
172 // ImportNValues imports a channel of the given type and specified
173 // direction and then receives or transmits up to n values on that
174 // channel.  A value of n==-1 implies an unbounded number of values.  The
175 // channel will have buffer space for size values, or 1 value if size < 1.
176 // The channel to be bound to the remote site's channel is provided
177 // in the call and may be of arbitrary channel type.
178 // Despite the literal signature, the effective signature is
179 //      ImportNValues(name string, chT chan T, dir Dir, size, n int) os.Error
180 // Example usage:
181 //      imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
182 //      if err != nil { log.Fatal(err) }
183 //      ch := make(chan myType)
184 //      err = imp.ImportNValues("name", ch, Recv, 1, 1)
185 //      if err != nil { log.Fatal(err) }
186 //      fmt.Printf("%+v\n", <-ch)
187 func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error {
188         ch, err := checkChan(chT, dir)
189         if err != nil {
190                 return err
191         }
192         imp.chanLock.Lock()
193         defer imp.chanLock.Unlock()
194         _, present := imp.names[name]
195         if present {
196                 return os.ErrorString("channel name already being imported:" + name)
197         }
198         if size < 1 {
199                 size = 1
200         }
201         id := imp.maxId
202         imp.maxId++
203         nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
204         imp.names[name] = nch
205         imp.chans[id] = nch
206         // Tell the other side about this channel.
207         hdr := &header{Id: id}
208         req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
209         if err = imp.encode(hdr, payRequest, req); err != nil {
210                 impLog("request encode:", err)
211                 return err
212         }
213         if dir == Send {
214                 go func() {
215                         for i := 0; n == -1 || i < n; i++ {
216                                 val, ok := nch.recv()
217                                 if !ok {
218                                         if err = imp.encode(hdr, payClosed, nil); err != nil {
219                                                 impLog("error encoding client closed message:", err)
220                                         }
221                                         return
222                                 }
223                                 if err = imp.encode(hdr, payData, val.Interface()); err != nil {
224                                         impLog("error encoding client send:", err)
225                                         return
226                                 }
227                         }
228                 }()
229         }
230         return nil
231 }
232
233 // Hangup disassociates the named channel from the Importer and closes
234 // the channel.  Messages in flight for the channel may be dropped.
235 func (imp *Importer) Hangup(name string) os.Error {
236         imp.chanLock.Lock()
237         defer imp.chanLock.Unlock()
238         nc := imp.names[name]
239         if nc == nil {
240                 return os.ErrorString("netchan import: hangup: no such channel: " + name)
241         }
242         imp.names[name] = nil, false
243         imp.chans[nc.id] = nil, false
244         nc.close()
245         return nil
246 }