...

Source file src/io/pipe.go

Documentation: io

		 1  // Copyright 2009 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  // Pipe adapter to connect code expecting an io.Reader
		 6  // with code expecting an io.Writer.
		 7  
		 8  package io
		 9  
		10  import (
		11  	"errors"
		12  	"sync"
		13  )
		14  
		15  // onceError is an object that will only store an error once.
		16  type onceError struct {
		17  	sync.Mutex // guards following
		18  	err				error
		19  }
		20  
		21  func (a *onceError) Store(err error) {
		22  	a.Lock()
		23  	defer a.Unlock()
		24  	if a.err != nil {
		25  		return
		26  	}
		27  	a.err = err
		28  }
		29  func (a *onceError) Load() error {
		30  	a.Lock()
		31  	defer a.Unlock()
		32  	return a.err
		33  }
		34  
		35  // ErrClosedPipe is the error used for read or write operations on a closed pipe.
		36  var ErrClosedPipe = errors.New("io: read/write on closed pipe")
		37  
		38  // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
		39  type pipe struct {
		40  	wrMu sync.Mutex // Serializes Write operations
		41  	wrCh chan []byte
		42  	rdCh chan int
		43  
		44  	once sync.Once // Protects closing done
		45  	done chan struct{}
		46  	rerr onceError
		47  	werr onceError
		48  }
		49  
		50  func (p *pipe) Read(b []byte) (n int, err error) {
		51  	select {
		52  	case <-p.done:
		53  		return 0, p.readCloseError()
		54  	default:
		55  	}
		56  
		57  	select {
		58  	case bw := <-p.wrCh:
		59  		nr := copy(b, bw)
		60  		p.rdCh <- nr
		61  		return nr, nil
		62  	case <-p.done:
		63  		return 0, p.readCloseError()
		64  	}
		65  }
		66  
		67  func (p *pipe) readCloseError() error {
		68  	rerr := p.rerr.Load()
		69  	if werr := p.werr.Load(); rerr == nil && werr != nil {
		70  		return werr
		71  	}
		72  	return ErrClosedPipe
		73  }
		74  
		75  func (p *pipe) CloseRead(err error) error {
		76  	if err == nil {
		77  		err = ErrClosedPipe
		78  	}
		79  	p.rerr.Store(err)
		80  	p.once.Do(func() { close(p.done) })
		81  	return nil
		82  }
		83  
		84  func (p *pipe) Write(b []byte) (n int, err error) {
		85  	select {
		86  	case <-p.done:
		87  		return 0, p.writeCloseError()
		88  	default:
		89  		p.wrMu.Lock()
		90  		defer p.wrMu.Unlock()
		91  	}
		92  
		93  	for once := true; once || len(b) > 0; once = false {
		94  		select {
		95  		case p.wrCh <- b:
		96  			nw := <-p.rdCh
		97  			b = b[nw:]
		98  			n += nw
		99  		case <-p.done:
	 100  			return n, p.writeCloseError()
	 101  		}
	 102  	}
	 103  	return n, nil
	 104  }
	 105  
	 106  func (p *pipe) writeCloseError() error {
	 107  	werr := p.werr.Load()
	 108  	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
	 109  		return rerr
	 110  	}
	 111  	return ErrClosedPipe
	 112  }
	 113  
	 114  func (p *pipe) CloseWrite(err error) error {
	 115  	if err == nil {
	 116  		err = EOF
	 117  	}
	 118  	p.werr.Store(err)
	 119  	p.once.Do(func() { close(p.done) })
	 120  	return nil
	 121  }
	 122  
	 123  // A PipeReader is the read half of a pipe.
	 124  type PipeReader struct {
	 125  	p *pipe
	 126  }
	 127  
	 128  // Read implements the standard Read interface:
	 129  // it reads data from the pipe, blocking until a writer
	 130  // arrives or the write end is closed.
	 131  // If the write end is closed with an error, that error is
	 132  // returned as err; otherwise err is EOF.
	 133  func (r *PipeReader) Read(data []byte) (n int, err error) {
	 134  	return r.p.Read(data)
	 135  }
	 136  
	 137  // Close closes the reader; subsequent writes to the
	 138  // write half of the pipe will return the error ErrClosedPipe.
	 139  func (r *PipeReader) Close() error {
	 140  	return r.CloseWithError(nil)
	 141  }
	 142  
	 143  // CloseWithError closes the reader; subsequent writes
	 144  // to the write half of the pipe will return the error err.
	 145  //
	 146  // CloseWithError never overwrites the previous error if it exists
	 147  // and always returns nil.
	 148  func (r *PipeReader) CloseWithError(err error) error {
	 149  	return r.p.CloseRead(err)
	 150  }
	 151  
	 152  // A PipeWriter is the write half of a pipe.
	 153  type PipeWriter struct {
	 154  	p *pipe
	 155  }
	 156  
	 157  // Write implements the standard Write interface:
	 158  // it writes data to the pipe, blocking until one or more readers
	 159  // have consumed all the data or the read end is closed.
	 160  // If the read end is closed with an error, that err is
	 161  // returned as err; otherwise err is ErrClosedPipe.
	 162  func (w *PipeWriter) Write(data []byte) (n int, err error) {
	 163  	return w.p.Write(data)
	 164  }
	 165  
	 166  // Close closes the writer; subsequent reads from the
	 167  // read half of the pipe will return no bytes and EOF.
	 168  func (w *PipeWriter) Close() error {
	 169  	return w.CloseWithError(nil)
	 170  }
	 171  
	 172  // CloseWithError closes the writer; subsequent reads from the
	 173  // read half of the pipe will return no bytes and the error err,
	 174  // or EOF if err is nil.
	 175  //
	 176  // CloseWithError never overwrites the previous error if it exists
	 177  // and always returns nil.
	 178  func (w *PipeWriter) CloseWithError(err error) error {
	 179  	return w.p.CloseWrite(err)
	 180  }
	 181  
	 182  // Pipe creates a synchronous in-memory pipe.
	 183  // It can be used to connect code expecting an io.Reader
	 184  // with code expecting an io.Writer.
	 185  //
	 186  // Reads and Writes on the pipe are matched one to one
	 187  // except when multiple Reads are needed to consume a single Write.
	 188  // That is, each Write to the PipeWriter blocks until it has satisfied
	 189  // one or more Reads from the PipeReader that fully consume
	 190  // the written data.
	 191  // The data is copied directly from the Write to the corresponding
	 192  // Read (or Reads); there is no internal buffering.
	 193  //
	 194  // It is safe to call Read and Write in parallel with each other or with Close.
	 195  // Parallel calls to Read and parallel calls to Write are also safe:
	 196  // the individual calls will be gated sequentially.
	 197  func Pipe() (*PipeReader, *PipeWriter) {
	 198  	p := &pipe{
	 199  		wrCh: make(chan []byte),
	 200  		rdCh: make(chan int),
	 201  		done: make(chan struct{}),
	 202  	}
	 203  	return &PipeReader{p}, &PipeWriter{p}
	 204  }
	 205  

View as plain text