...

Source file src/net/pipe.go

Documentation: net

		 1  // Copyright 2010 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package net
		 6  
		 7  import (
		 8  	"io"
		 9  	"os"
		10  	"sync"
		11  	"time"
		12  )
		13  
		14  // pipeDeadline is an abstraction for handling timeouts.
		15  type pipeDeadline struct {
		16  	mu		 sync.Mutex // Guards timer and cancel
		17  	timer	*time.Timer
		18  	cancel chan struct{} // Must be non-nil
		19  }
		20  
		21  func makePipeDeadline() pipeDeadline {
		22  	return pipeDeadline{cancel: make(chan struct{})}
		23  }
		24  
		25  // set sets the point in time when the deadline will time out.
		26  // A timeout event is signaled by closing the channel returned by waiter.
		27  // Once a timeout has occurred, the deadline can be refreshed by specifying a
		28  // t value in the future.
		29  //
		30  // A zero value for t prevents timeout.
		31  func (d *pipeDeadline) set(t time.Time) {
		32  	d.mu.Lock()
		33  	defer d.mu.Unlock()
		34  
		35  	if d.timer != nil && !d.timer.Stop() {
		36  		<-d.cancel // Wait for the timer callback to finish and close cancel
		37  	}
		38  	d.timer = nil
		39  
		40  	// Time is zero, then there is no deadline.
		41  	closed := isClosedChan(d.cancel)
		42  	if t.IsZero() {
		43  		if closed {
		44  			d.cancel = make(chan struct{})
		45  		}
		46  		return
		47  	}
		48  
		49  	// Time in the future, setup a timer to cancel in the future.
		50  	if dur := time.Until(t); dur > 0 {
		51  		if closed {
		52  			d.cancel = make(chan struct{})
		53  		}
		54  		d.timer = time.AfterFunc(dur, func() {
		55  			close(d.cancel)
		56  		})
		57  		return
		58  	}
		59  
		60  	// Time in the past, so close immediately.
		61  	if !closed {
		62  		close(d.cancel)
		63  	}
		64  }
		65  
		66  // wait returns a channel that is closed when the deadline is exceeded.
		67  func (d *pipeDeadline) wait() chan struct{} {
		68  	d.mu.Lock()
		69  	defer d.mu.Unlock()
		70  	return d.cancel
		71  }
		72  
		73  func isClosedChan(c <-chan struct{}) bool {
		74  	select {
		75  	case <-c:
		76  		return true
		77  	default:
		78  		return false
		79  	}
		80  }
		81  
		82  type pipeAddr struct{}
		83  
		84  func (pipeAddr) Network() string { return "pipe" }
		85  func (pipeAddr) String() string	{ return "pipe" }
		86  
		87  type pipe struct {
		88  	wrMu sync.Mutex // Serialize Write operations
		89  
		90  	// Used by local Read to interact with remote Write.
		91  	// Successful receive on rdRx is always followed by send on rdTx.
		92  	rdRx <-chan []byte
		93  	rdTx chan<- int
		94  
		95  	// Used by local Write to interact with remote Read.
		96  	// Successful send on wrTx is always followed by receive on wrRx.
		97  	wrTx chan<- []byte
		98  	wrRx <-chan int
		99  
	 100  	once			 sync.Once // Protects closing localDone
	 101  	localDone	chan struct{}
	 102  	remoteDone <-chan struct{}
	 103  
	 104  	readDeadline	pipeDeadline
	 105  	writeDeadline pipeDeadline
	 106  }
	 107  
	 108  // Pipe creates a synchronous, in-memory, full duplex
	 109  // network connection; both ends implement the Conn interface.
	 110  // Reads on one end are matched with writes on the other,
	 111  // copying data directly between the two; there is no internal
	 112  // buffering.
	 113  func Pipe() (Conn, Conn) {
	 114  	cb1 := make(chan []byte)
	 115  	cb2 := make(chan []byte)
	 116  	cn1 := make(chan int)
	 117  	cn2 := make(chan int)
	 118  	done1 := make(chan struct{})
	 119  	done2 := make(chan struct{})
	 120  
	 121  	p1 := &pipe{
	 122  		rdRx: cb1, rdTx: cn1,
	 123  		wrTx: cb2, wrRx: cn2,
	 124  		localDone: done1, remoteDone: done2,
	 125  		readDeadline:	makePipeDeadline(),
	 126  		writeDeadline: makePipeDeadline(),
	 127  	}
	 128  	p2 := &pipe{
	 129  		rdRx: cb2, rdTx: cn2,
	 130  		wrTx: cb1, wrRx: cn1,
	 131  		localDone: done2, remoteDone: done1,
	 132  		readDeadline:	makePipeDeadline(),
	 133  		writeDeadline: makePipeDeadline(),
	 134  	}
	 135  	return p1, p2
	 136  }
	 137  
	 138  func (*pipe) LocalAddr() Addr	{ return pipeAddr{} }
	 139  func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
	 140  
	 141  func (p *pipe) Read(b []byte) (int, error) {
	 142  	n, err := p.read(b)
	 143  	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
	 144  		err = &OpError{Op: "read", Net: "pipe", Err: err}
	 145  	}
	 146  	return n, err
	 147  }
	 148  
	 149  func (p *pipe) read(b []byte) (n int, err error) {
	 150  	switch {
	 151  	case isClosedChan(p.localDone):
	 152  		return 0, io.ErrClosedPipe
	 153  	case isClosedChan(p.remoteDone):
	 154  		return 0, io.EOF
	 155  	case isClosedChan(p.readDeadline.wait()):
	 156  		return 0, os.ErrDeadlineExceeded
	 157  	}
	 158  
	 159  	select {
	 160  	case bw := <-p.rdRx:
	 161  		nr := copy(b, bw)
	 162  		p.rdTx <- nr
	 163  		return nr, nil
	 164  	case <-p.localDone:
	 165  		return 0, io.ErrClosedPipe
	 166  	case <-p.remoteDone:
	 167  		return 0, io.EOF
	 168  	case <-p.readDeadline.wait():
	 169  		return 0, os.ErrDeadlineExceeded
	 170  	}
	 171  }
	 172  
	 173  func (p *pipe) Write(b []byte) (int, error) {
	 174  	n, err := p.write(b)
	 175  	if err != nil && err != io.ErrClosedPipe {
	 176  		err = &OpError{Op: "write", Net: "pipe", Err: err}
	 177  	}
	 178  	return n, err
	 179  }
	 180  
	 181  func (p *pipe) write(b []byte) (n int, err error) {
	 182  	switch {
	 183  	case isClosedChan(p.localDone):
	 184  		return 0, io.ErrClosedPipe
	 185  	case isClosedChan(p.remoteDone):
	 186  		return 0, io.ErrClosedPipe
	 187  	case isClosedChan(p.writeDeadline.wait()):
	 188  		return 0, os.ErrDeadlineExceeded
	 189  	}
	 190  
	 191  	p.wrMu.Lock() // Ensure entirety of b is written together
	 192  	defer p.wrMu.Unlock()
	 193  	for once := true; once || len(b) > 0; once = false {
	 194  		select {
	 195  		case p.wrTx <- b:
	 196  			nw := <-p.wrRx
	 197  			b = b[nw:]
	 198  			n += nw
	 199  		case <-p.localDone:
	 200  			return n, io.ErrClosedPipe
	 201  		case <-p.remoteDone:
	 202  			return n, io.ErrClosedPipe
	 203  		case <-p.writeDeadline.wait():
	 204  			return n, os.ErrDeadlineExceeded
	 205  		}
	 206  	}
	 207  	return n, nil
	 208  }
	 209  
	 210  func (p *pipe) SetDeadline(t time.Time) error {
	 211  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
	 212  		return io.ErrClosedPipe
	 213  	}
	 214  	p.readDeadline.set(t)
	 215  	p.writeDeadline.set(t)
	 216  	return nil
	 217  }
	 218  
	 219  func (p *pipe) SetReadDeadline(t time.Time) error {
	 220  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
	 221  		return io.ErrClosedPipe
	 222  	}
	 223  	p.readDeadline.set(t)
	 224  	return nil
	 225  }
	 226  
	 227  func (p *pipe) SetWriteDeadline(t time.Time) error {
	 228  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
	 229  		return io.ErrClosedPipe
	 230  	}
	 231  	p.writeDeadline.set(t)
	 232  	return nil
	 233  }
	 234  
	 235  func (p *pipe) Close() error {
	 236  	p.once.Do(func() { close(p.localDone) })
	 237  	return nil
	 238  }
	 239  

View as plain text