1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // TODO(rsc): All the prints in this file should go to standard error.
17 // Network file descriptor.
19 // locking/lifetime of sysfd
24 // immutable until Close
43 // owned by fd wait server
47 type InvalidConnError struct{}
49 func (e *InvalidConnError) String() string { return "invalid net.Conn" }
50 func (e *InvalidConnError) Temporary() bool { return false }
51 func (e *InvalidConnError) Timeout() bool { return false }
53 // A pollServer helps FDs determine when to retry a non-blocking
54 // read or write after they get EAGAIN. When an FD needs to wait,
55 // send the fd on s.cr (for a read) or s.cw (for a write) to pass the
56 // request to the poll server. Then receive on fd.cr/fd.cw.
57 // When the pollServer finds that i/o on FD should be possible
58 // again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
59 // This protocol is implemented as s.WaitRead() and s.WaitWrite().
61 // There is one subtlety: when sending on s.cr/s.cw, the
62 // poll server is probably in a system call, waiting for an fd
63 // to become ready. It's not looking at the request channels.
64 // To resolve this, the poll server waits not just on the FDs it has
65 // been given but also its own pipe. After sending on the
66 // buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
67 // byte to the pipe, causing the pollServer's poll system call to
68 // return. In response to the pipe being readable, the pollServer
69 // re-polls its request channels.
71 // Note that the ordering is "send request" and then "wake up server".
72 // If the operations were reversed, there would be a race: the poll
73 // server might wake up and look at the request channel, see that it
74 // was empty, and go back to sleep, all before the requester managed
75 // to send the request. Because the send must complete before the wakeup,
76 // the request channel must be buffered. A buffer of size 1 is sufficient
77 // for any request load. If many processes are trying to submit requests,
78 // one will succeed, the pollServer will read the request, and then the
79 // channel will be empty for the next process's request. A larger buffer
80 // might help batch requests.
82 // To avoid races in closing, all fd operations are locked and
83 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown
84 // and sets a closing flag. Only when the last reference is removed
85 // will the fd be closed.
87 type pollServer struct {
88 cr, cw chan *netFD // buffered >= 1
90 pending map[int]*netFD
91 poll *pollster // low-level OS hooks
92 deadline int64 // next deadline (nsec since 1970)
95 func (s *pollServer) AddFD(fd *netFD, mode int) {
98 // fd closed underfoot
106 if err := s.poll.AddFD(intfd, mode, false); err != nil {
107 panic("pollServer AddFD " + err.String())
122 if t > 0 && (s.deadline == 0 || t < s.deadline) {
127 func (s *pollServer) LookupFD(fd int, mode int) *netFD {
132 netfd, ok := s.pending[key]
136 s.pending[key] = nil, false
140 func (s *pollServer) WakeFD(fd *netFD, mode int) {
154 func (s *pollServer) Now() int64 {
155 return time.Nanoseconds()
158 func (s *pollServer) CheckDeadlines() {
160 // TODO(rsc): This will need to be handled more efficiently,
161 // probably with a heap indexed by wakeup time.
163 var next_deadline int64
164 for key, fd := range s.pending {
179 s.pending[key] = nil, false
181 s.poll.DelFD(fd.sysfd, mode)
184 s.poll.DelFD(fd.sysfd, mode)
188 } else if next_deadline == 0 || t < next_deadline {
193 s.deadline = next_deadline
196 func (s *pollServer) Run() {
197 var scratch [100]byte
207 fd, mode, err := s.poll.WaitFD(t)
209 print("pollServer WaitFD: ", err.String(), "\n")
218 // Drain our wakeup pipe.
219 for nn, _ := s.pr.Read(scratch[0:]); nn > 0; {
220 nn, _ = s.pr.Read(scratch[0:])
222 // Read from channels
223 for fd, ok := <-s.cr; ok; fd, ok = <-s.cr {
226 for fd, ok := <-s.cw; ok; fd, ok = <-s.cw {
230 netfd := s.LookupFD(fd, mode)
232 print("pollServer: unexpected wakeup for fd=", fd, " mode=", string(mode), "\n")
235 s.WakeFD(netfd, mode)
240 var wakeupbuf [1]byte
242 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
244 func (s *pollServer) WaitRead(fd *netFD) {
250 func (s *pollServer) WaitWrite(fd *netFD) {
256 // Network FD methods.
257 // All the network FDs use a single pollServer.
259 var pollserver *pollServer
260 var onceStartServer sync.Once
263 p, err := newPollServer()
265 print("Start pollServer: ", err.String(), "\n")
270 func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err os.Error) {
271 onceStartServer.Do(startServer)
272 if e := syscall.SetNonblock(fd, true); e != 0 {
273 return nil, &OpError{"setnonblock", net, laddr, os.Errno(e)}
290 f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs)
291 f.cr = make(chan bool, 1)
292 f.cw = make(chan bool, 1)
296 // Add a reference to this fd.
297 func (fd *netFD) incref() {
303 // Remove a reference to this FD and close if we've been asked to do so (and
304 // there are no references left.
305 func (fd *netFD) decref() {
308 if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
309 // In case the user has set linger, switch to blocking mode so
310 // the close blocks. As long as this doesn't happen often, we
311 // can handle the extra OS processes. Otherwise we'll need to
312 // use the pollserver for Close too. Sigh.
313 syscall.SetNonblock(fd.sysfd, false)
321 func (fd *netFD) Close() os.Error {
322 if fd == nil || fd.sysfile == nil {
327 syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
333 func (fd *netFD) Read(p []byte) (n int, err os.Error) {
338 defer fd.rio.Unlock()
341 if fd.sysfile == nil {
344 if fd.rdeadline_delta > 0 {
345 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
352 n, errno = syscall.Read(fd.sysfile.Fd(), p)
353 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.rdeadline >= 0 {
354 pollserver.WaitRead(fd)
359 oserr = os.Errno(errno)
360 } else if n == 0 && errno == 0 && fd.proto != syscall.SOCK_DGRAM {
366 err = &OpError{"read", fd.net, fd.raddr, oserr}
371 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) {
372 if fd == nil || fd.sysfile == nil {
373 return 0, nil, os.EINVAL
376 defer fd.rio.Unlock()
379 if fd.rdeadline_delta > 0 {
380 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
387 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0)
388 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.rdeadline >= 0 {
389 pollserver.WaitRead(fd)
394 oserr = os.Errno(errno)
399 err = &OpError{"read", fd.net, fd.laddr, oserr}
404 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
405 if fd == nil || fd.sysfile == nil {
406 return 0, 0, 0, nil, os.EINVAL
409 defer fd.rio.Unlock()
412 if fd.rdeadline_delta > 0 {
413 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
420 n, oobn, flags, sa, errno = syscall.Recvmsg(fd.sysfd, p, oob, 0)
421 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.rdeadline >= 0 {
422 pollserver.WaitRead(fd)
426 oserr = os.Errno(errno)
434 err = &OpError{"read", fd.net, fd.laddr, oserr}
440 func (fd *netFD) Write(p []byte) (n int, err os.Error) {
445 defer fd.wio.Unlock()
448 if fd.sysfile == nil {
451 if fd.wdeadline_delta > 0 {
452 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
460 n, errno := syscall.Write(fd.sysfile.Fd(), p[nn:])
467 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.wdeadline >= 0 {
468 pollserver.WaitWrite(fd)
473 oserr = os.Errno(errno)
477 oserr = io.ErrUnexpectedEOF
482 err = &OpError{"write", fd.net, fd.raddr, oserr}
487 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) {
488 if fd == nil || fd.sysfile == nil {
492 defer fd.wio.Unlock()
495 if fd.wdeadline_delta > 0 {
496 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
502 errno := syscall.Sendto(fd.sysfd, p, 0, sa)
503 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.wdeadline >= 0 {
504 pollserver.WaitWrite(fd)
508 oserr = os.Errno(errno)
515 err = &OpError{"write", fd.net, fd.raddr, oserr}
520 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
521 if fd == nil || fd.sysfile == nil {
522 return 0, 0, os.EINVAL
525 defer fd.wio.Unlock()
528 if fd.wdeadline_delta > 0 {
529 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
536 errno = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
537 if (errno == syscall.EAGAIN || errno == syscall.EINTR) && fd.wdeadline >= 0 {
538 pollserver.WaitWrite(fd)
542 oserr = os.Errno(errno)
550 err = &OpError{"write", fd.net, fd.raddr, oserr}
555 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
556 if fd == nil || fd.sysfile == nil {
557 return nil, os.EINVAL
563 // See ../syscall/exec.go for description of ForkLock.
564 // It is okay to hold the lock across syscall.Accept
565 // because we have put fd.sysfd into non-blocking mode.
566 syscall.ForkLock.RLock()
568 var sa syscall.Sockaddr
571 syscall.ForkLock.RUnlock()
572 return nil, os.EINVAL
574 s, sa, e = syscall.Accept(fd.sysfd)
575 if e != syscall.EAGAIN && e != syscall.EINTR {
578 syscall.ForkLock.RUnlock()
579 pollserver.WaitRead(fd)
580 syscall.ForkLock.RLock()
583 syscall.ForkLock.RUnlock()
584 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}
586 syscall.CloseOnExec(s)
587 syscall.ForkLock.RUnlock()
589 if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa)); err != nil {
596 func (fd *netFD) dup() (f *os.File, err os.Error) {
597 ns, e := syscall.Dup(fd.sysfd)
599 return nil, &OpError{"dup", fd.net, fd.laddr, os.Errno(e)}
602 // We want blocking mode for the new fd, hence the double negative.
603 if e = syscall.SetNonblock(ns, false); e != 0 {
604 return nil, &OpError{"setnonblock", fd.net, fd.laddr, os.Errno(e)}
607 return os.NewFile(ns, fd.sysfile.Name()), nil
610 func closesocket(s int) (errno int) {
611 return syscall.Close(s)