// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Pipe adapter to connect code expecting an io.Reader // with code expecting an io.Writer. package io import ( "os" "runtime" "sync" ) type pipeResult struct { n int err os.Error } // Shared pipe structure. type pipe struct { // Reader sends on cr1, receives on cr2. // Writer does the same on cw1, cw2. r1, w1 chan []byte r2, w2 chan pipeResult rclose chan os.Error // read close; error to return to writers wclose chan os.Error // write close; error to return to readers done chan int // read or write half is done } func (p *pipe) run() { var ( rb []byte // pending Read wb []byte // pending Write wn int // amount written so far from wb rerr os.Error // if read end is closed, error to send to writers werr os.Error // if write end is closed, error to send to readers r1 chan []byte // p.cr1 or nil depending on whether Read is ok w1 chan []byte // p.cw1 or nil depending on whether Write is ok ndone int ) // Read and Write are enabled at the start. r1 = p.r1 w1 = p.w1 for { select { case <-p.done: if ndone++; ndone == 2 { // both reader and writer are gone // close out any existing i/o if r1 == nil { p.r2 <- pipeResult{0, os.EINVAL} } if w1 == nil { p.w2 <- pipeResult{0, os.EINVAL} } return } continue case rerr = <-p.rclose: if w1 == nil { // finish pending Write p.w2 <- pipeResult{wn, rerr} wn = 0 w1 = p.w1 // allow another Write } if r1 == nil { // Close of read side during Read. // finish pending Read with os.EINVAL. p.r2 <- pipeResult{0, os.EINVAL} r1 = p.r1 // allow another Read } continue case werr = <-p.wclose: if r1 == nil { // finish pending Read p.r2 <- pipeResult{0, werr} r1 = p.r1 // allow another Read } if w1 == nil { // Close of write side during Write. // finish pending Write with os.EINVAL. p.w2 <- pipeResult{wn, os.EINVAL} wn = 0 w1 = p.w1 // allow another Write } continue case rb = <-r1: if werr != nil { // write end is closed p.r2 <- pipeResult{0, werr} continue } if rerr != nil { // read end is closed p.r2 <- pipeResult{0, os.EINVAL} continue } r1 = nil // disable Read until this one is done case wb = <-w1: if rerr != nil { // read end is closed p.w2 <- pipeResult{0, rerr} continue } if werr != nil { // write end is closed p.w2 <- pipeResult{0, os.EINVAL} continue } w1 = nil // disable Write until this one is done } if r1 == nil && w1 == nil { // Have rb and wb. Execute. n := copy(rb, wb) wn += n wb = wb[n:] // Finish Read. p.r2 <- pipeResult{n, nil} r1 = p.r1 // allow another Read // Maybe finish Write. if len(wb) == 0 { p.w2 <- pipeResult{wn, nil} wn = 0 w1 = p.w1 // allow another Write } } } } // Read/write halves of the pipe. // They are separate structures for two reasons: // 1. If one end becomes garbage without being Closed, // its finalizer can Close so that the other end // does not hang indefinitely. // 2. Clients cannot use interface conversions on the // read end to find the Write method, and vice versa. type pipeHalf struct { c1 chan []byte c2 chan pipeResult cclose chan os.Error done chan int lock sync.Mutex closed bool io sync.Mutex ioclosed bool } func (p *pipeHalf) rw(data []byte) (n int, err os.Error) { // Run i/o operation. // Check ioclosed flag under lock to make sure we're still allowed to do i/o. p.io.Lock() if p.ioclosed { p.io.Unlock() return 0, os.EINVAL } p.io.Unlock() p.c1 <- data res := <-p.c2 return res.n, res.err } func (p *pipeHalf) close(err os.Error) os.Error { // Close pipe half. // Only first call to close does anything. p.lock.Lock() if p.closed { p.lock.Unlock() return os.EINVAL } p.closed = true p.lock.Unlock() // First, send the close notification. p.cclose <- err // Runner is now responding to rw operations // with os.EINVAL. Cut off future rw operations // by setting ioclosed flag. p.io.Lock() p.ioclosed = true p.io.Unlock() // With ioclosed set, there will be no more rw operations // working on the channels. // Tell the runner we won't be bothering it anymore. p.done <- 1 // Successfully torn down; can disable finalizer. runtime.SetFinalizer(p, nil) return nil } func (p *pipeHalf) finalizer() { p.close(os.EINVAL) } // A PipeReader is the read half of a pipe. type PipeReader struct { pipeHalf } // Read implements the standard Read interface: // it reads data from the pipe, blocking until a writer // arrives or the write end is closed. // If the write end is closed with an error, that error is // returned as err; otherwise err is nil. func (r *PipeReader) Read(data []byte) (n int, err os.Error) { return r.rw(data) } // Close closes the reader; subsequent writes to the // write half of the pipe will return the error os.EPIPE. func (r *PipeReader) Close() os.Error { return r.CloseWithError(nil) } // CloseWithError closes the reader; subsequent writes // to the write half of the pipe will return the error err. func (r *PipeReader) CloseWithError(err os.Error) os.Error { if err == nil { err = os.EPIPE } return r.close(err) } // A PipeWriter is the write half of a pipe. type PipeWriter struct { pipeHalf } // Write implements the standard Write interface: // it writes data to the pipe, blocking until readers // have consumed all the data or the read end is closed. // If the read end is closed with an error, that err is // returned as err; otherwise err is os.EPIPE. func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { return w.rw(data) } // Close closes the writer; subsequent reads from the // read half of the pipe will return no bytes and os.EOF. func (w *PipeWriter) Close() os.Error { return w.CloseWithError(nil) } // CloseWithError closes the writer; subsequent reads from the // read half of the pipe will return no bytes and the error err. func (w *PipeWriter) CloseWithError(err os.Error) os.Error { if err == nil { err = os.EOF } return w.close(err) } // Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { p := &pipe{ r1: make(chan []byte), r2: make(chan pipeResult), w1: make(chan []byte), w2: make(chan pipeResult), rclose: make(chan os.Error), wclose: make(chan os.Error), done: make(chan int), } go p.run() // NOTE: Cannot use composite literal here: // pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone} // because this implicitly copies the pipeHalf, which copies the inner mutex. r := new(PipeReader) r.c1 = p.r1 r.c2 = p.r2 r.cclose = p.rclose r.done = p.done runtime.SetFinalizer(r, (*PipeReader).finalizer) w := new(PipeWriter) w.c1 = p.w1 w.c2 = p.w2 w.cclose = p.wclose w.done = p.done runtime.SetFinalizer(w, (*PipeWriter).finalizer) return r, w }