1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
16 type InvalidConnError struct{}
18 func (e *InvalidConnError) String() string { return "invalid net.Conn" }
19 func (e *InvalidConnError) Temporary() bool { return false }
20 func (e *InvalidConnError) Timeout() bool { return false }
26 e := syscall.WSAStartup(uint32(0x101), &d)
28 initErr = os.NewSyscallError("WSAStartup", e)
32 func closesocket(s syscall.Handle) (errno int) {
33 return syscall.Closesocket(s)
36 // Interface for all io operations.
37 type anOpIface interface {
43 // IO completion result parameters.
44 type ioResult struct {
49 // anOp implements functionality common to all io operations.
51 // Used by IOCP interface, it must be first field
52 // of the struct, as our code rely on it.
55 resultc chan ioResult // io completion results
56 errnoc chan int // io submit / cancel operation errors
60 func (o *anOp) Init(fd *netFD) {
62 o.resultc = make(chan ioResult, 1)
63 o.errnoc = make(chan int)
66 func (o *anOp) Op() *anOp {
70 // bufOp is used by io operations that read / write
71 // data from / to client buffer.
77 func (o *bufOp) Init(fd *netFD, buf []byte) {
79 o.buf.Len = uint32(len(buf))
83 o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
87 // resultSrv will retrieve all io completion results from
88 // iocp and send them to the correspondent waiting client
89 // goroutine via channel supplied in the request.
90 type resultSrv struct {
94 func (s *resultSrv) Run() {
95 var o *syscall.Overlapped
99 r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
102 // Dequeued successfully completed io packet.
103 case r.err == syscall.WAIT_TIMEOUT && o == nil:
104 // Wait has timed out (should not happen now, but might be used in the future).
105 panic("GetQueuedCompletionStatus timed out")
107 // Failed to dequeue anything -> report the error.
108 panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err))
110 // Dequeued failed io packet.
112 (*anOp)(unsafe.Pointer(o)).resultc <- r
116 // ioSrv executes net io requests.
118 submchan chan anOpIface // submit io requests
119 canchan chan anOpIface // cancel io requests
122 // ProcessRemoteIO will execute submit io requests on behalf
123 // of other goroutines, all on a single os thread, so it can
124 // cancel them later. Results of all operations will be sent
125 // back to their requesters via channel supplied in request.
126 func (s *ioSrv) ProcessRemoteIO() {
127 runtime.LockOSThread()
128 defer runtime.UnlockOSThread()
131 case o := <-s.submchan:
132 o.Op().errnoc <- o.Submit()
133 case o := <-s.canchan:
134 o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
139 // ExecIO executes a single io operation. It either executes it
140 // inline, or, if timeouts are employed, passes the request onto
141 // a special goroutine and waits for completion or cancels request.
142 func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) {
145 if deadline_delta > 0 {
146 // Send request to a special dedicated thread,
147 // so it can stop the io with CancelIO later.
155 // IO completed immediately, but we need to get our completion message anyway.
156 case syscall.ERROR_IO_PENDING:
157 // IO started, and we have to wait for its completion.
159 return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)}
161 // Wait for our request to complete.
163 if deadline_delta > 0 {
165 case r = <-o.resultc:
166 case <-time.After(deadline_delta):
170 if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
171 r.err = syscall.EWOULDBLOCK
178 err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)}
180 return int(r.qty), err
183 // Start helper goroutines.
184 var resultsrv *resultSrv
186 var onceStartServer sync.Once
189 resultsrv = new(resultSrv)
191 resultsrv.iocp, errno = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1)
193 panic("CreateIoCompletionPort failed " + syscall.Errstr(errno))
198 iosrv.submchan = make(chan anOpIface)
199 iosrv.canchan = make(chan anOpIface)
200 go iosrv.ProcessRemoteIO()
203 // Network file descriptor.
205 // locking/lifetime of sysfd
210 // immutable until Close
219 rdeadline_delta int64
222 wdeadline_delta int64
227 func allocFD(fd syscall.Handle, family, proto int, net string) (f *netFD) {
234 runtime.SetFinalizer(f, (*netFD).Close)
238 func newFD(fd syscall.Handle, family, proto int, net string) (f *netFD, err os.Error) {
242 onceStartServer.Do(startServer)
243 // Associate our socket with resultsrv.iocp.
244 if _, e := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); e != 0 {
245 return nil, os.Errno(e)
247 return allocFD(fd, family, proto, net), nil
250 func (fd *netFD) setAddr(laddr, raddr Addr) {
255 func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) {
256 e := syscall.Connect(fd.sysfd, ra)
263 // Add a reference to this fd.
264 func (fd *netFD) incref() {
270 // Remove a reference to this FD and close if we've been asked to do so (and
271 // there are no references left.
272 func (fd *netFD) decref() {
275 if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
276 // In case the user has set linger, switch to blocking mode so
277 // the close blocks. As long as this doesn't happen often, we
278 // can handle the extra OS processes. Otherwise we'll need to
279 // use the resultsrv for Close too. Sigh.
280 syscall.SetNonblock(fd.sysfd, false)
281 closesocket(fd.sysfd)
282 fd.sysfd = syscall.InvalidHandle
283 // no need for a finalizer anymore
284 runtime.SetFinalizer(fd, nil)
289 func (fd *netFD) Close() os.Error {
290 if fd == nil || fd.sysfd == syscall.InvalidHandle {
295 syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
301 // Read from network.
307 func (o *readOp) Submit() (errno int) {
309 return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
312 func (o *readOp) Name() string {
316 func (fd *netFD) Read(buf []byte) (n int, err os.Error) {
321 defer fd.rio.Unlock()
324 if fd.sysfd == syscall.InvalidHandle {
329 n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
330 if err == nil && n == 0 {
336 // ReadFrom from network.
338 type readFromOp struct {
340 rsa syscall.RawSockaddrAny
344 func (o *readFromOp) Submit() (errno int) {
346 return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
349 func (o *readFromOp) Name() string {
353 func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) {
355 return 0, nil, os.EINVAL
361 defer fd.rio.Unlock()
364 if fd.sysfd == syscall.InvalidHandle {
365 return 0, nil, os.EINVAL
369 o.rsan = int32(unsafe.Sizeof(o.rsa))
370 n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
374 sa, _ = o.rsa.Sockaddr()
380 type writeOp struct {
384 func (o *writeOp) Submit() (errno int) {
386 return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
389 func (o *writeOp) Name() string {
393 func (fd *netFD) Write(buf []byte) (n int, err os.Error) {
398 defer fd.wio.Unlock()
401 if fd.sysfd == syscall.InvalidHandle {
406 return iosrv.ExecIO(&o, fd.wdeadline_delta)
409 // WriteTo to network.
411 type writeToOp struct {
416 func (o *writeToOp) Submit() (errno int) {
418 return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
421 func (o *writeToOp) Name() string {
425 func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) {
433 defer fd.wio.Unlock()
436 if fd.sysfd == syscall.InvalidHandle {
442 return iosrv.ExecIO(&o, fd.wdeadline_delta)
445 // Accept new network connections.
447 type acceptOp struct {
449 newsock syscall.Handle
450 attrs [2]syscall.RawSockaddrAny // space for local and remote address only
453 func (o *acceptOp) Submit() (errno int) {
455 l := uint32(unsafe.Sizeof(o.attrs[0]))
456 return syscall.AcceptEx(o.fd.sysfd, o.newsock,
457 (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
460 func (o *acceptOp) Name() string {
464 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
465 if fd == nil || fd.sysfd == syscall.InvalidHandle {
466 return nil, os.EINVAL
472 // See ../syscall/exec.go for description of ForkLock.
473 syscall.ForkLock.RLock()
474 s, e := syscall.Socket(fd.family, fd.proto, 0)
476 syscall.ForkLock.RUnlock()
477 return nil, os.Errno(e)
479 syscall.CloseOnExec(s)
480 syscall.ForkLock.RUnlock()
482 // Associate our new socket with IOCP.
483 onceStartServer.Do(startServer)
484 if _, e = syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); e != 0 {
485 return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)}
488 // Submit accept request.
492 _, err = iosrv.ExecIO(&o, 0)
498 // Inherit properties of the listening socket.
499 e = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
505 // Get local and peer addr out of AcceptEx buffer.
506 var lrsa, rrsa *syscall.RawSockaddrAny
508 l := uint32(unsafe.Sizeof(*lrsa))
509 syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
510 0, l, l, &lrsa, &llen, &rrsa, &rlen)
511 lsa, _ := lrsa.Sockaddr()
512 rsa, _ := rrsa.Sockaddr()
514 nfd = allocFD(s, fd.family, fd.proto, fd.net)
515 nfd.setAddr(toAddr(lsa), toAddr(rsa))
519 // Unimplemented functions.
521 func (fd *netFD) dup() (f *os.File, err os.Error) {
522 // TODO: Implement this
523 return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
526 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
527 return 0, 0, 0, nil, os.EAFNOSUPPORT
530 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
531 return 0, 0, os.EAFNOSUPPORT