...

Source file src/net/textproto/pipeline.go

Documentation: net/textproto

		 1  // Copyright 2010 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package textproto
		 6  
		 7  import (
		 8  	"sync"
		 9  )
		10  
		11  // A Pipeline manages a pipelined in-order request/response sequence.
		12  //
		13  // To use a Pipeline p to manage multiple clients on a connection,
		14  // each client should run:
		15  //
		16  //	id := p.Next()	// take a number
		17  //
		18  //	p.StartRequest(id)	// wait for turn to send request
		19  //	«send request»
		20  //	p.EndRequest(id)	// notify Pipeline that request is sent
		21  //
		22  //	p.StartResponse(id)	// wait for turn to read response
		23  //	«read response»
		24  //	p.EndResponse(id)	// notify Pipeline that response is read
		25  //
		26  // A pipelined server can use the same calls to ensure that
		27  // responses computed in parallel are written in the correct order.
		28  type Pipeline struct {
		29  	mu			 sync.Mutex
		30  	id			 uint
		31  	request	sequencer
		32  	response sequencer
		33  }
		34  
		35  // Next returns the next id for a request/response pair.
		36  func (p *Pipeline) Next() uint {
		37  	p.mu.Lock()
		38  	id := p.id
		39  	p.id++
		40  	p.mu.Unlock()
		41  	return id
		42  }
		43  
		44  // StartRequest blocks until it is time to send (or, if this is a server, receive)
		45  // the request with the given id.
		46  func (p *Pipeline) StartRequest(id uint) {
		47  	p.request.Start(id)
		48  }
		49  
		50  // EndRequest notifies p that the request with the given id has been sent
		51  // (or, if this is a server, received).
		52  func (p *Pipeline) EndRequest(id uint) {
		53  	p.request.End(id)
		54  }
		55  
		56  // StartResponse blocks until it is time to receive (or, if this is a server, send)
		57  // the request with the given id.
		58  func (p *Pipeline) StartResponse(id uint) {
		59  	p.response.Start(id)
		60  }
		61  
		62  // EndResponse notifies p that the response with the given id has been received
		63  // (or, if this is a server, sent).
		64  func (p *Pipeline) EndResponse(id uint) {
		65  	p.response.End(id)
		66  }
		67  
		68  // A sequencer schedules a sequence of numbered events that must
		69  // happen in order, one after the other. The event numbering must start
		70  // at 0 and increment without skipping. The event number wraps around
		71  // safely as long as there are not 2^32 simultaneous events pending.
		72  type sequencer struct {
		73  	mu	 sync.Mutex
		74  	id	 uint
		75  	wait map[uint]chan struct{}
		76  }
		77  
		78  // Start waits until it is time for the event numbered id to begin.
		79  // That is, except for the first event, it waits until End(id-1) has
		80  // been called.
		81  func (s *sequencer) Start(id uint) {
		82  	s.mu.Lock()
		83  	if s.id == id {
		84  		s.mu.Unlock()
		85  		return
		86  	}
		87  	c := make(chan struct{})
		88  	if s.wait == nil {
		89  		s.wait = make(map[uint]chan struct{})
		90  	}
		91  	s.wait[id] = c
		92  	s.mu.Unlock()
		93  	<-c
		94  }
		95  
		96  // End notifies the sequencer that the event numbered id has completed,
		97  // allowing it to schedule the event numbered id+1.	It is a run-time error
		98  // to call End with an id that is not the number of the active event.
		99  func (s *sequencer) End(id uint) {
	 100  	s.mu.Lock()
	 101  	if s.id != id {
	 102  		s.mu.Unlock()
	 103  		panic("out of sync")
	 104  	}
	 105  	id++
	 106  	s.id = id
	 107  	if s.wait == nil {
	 108  		s.wait = make(map[uint]chan struct{})
	 109  	}
	 110  	c, ok := s.wait[id]
	 111  	if ok {
	 112  		delete(s.wait, id)
	 113  	}
	 114  	s.mu.Unlock()
	 115  	if ok {
	 116  		close(c)
	 117  	}
	 118  }
	 119  

View as plain text