...

Source file src/sync/poolqueue.go

Documentation: sync

		 1  // Copyright 2019 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package sync
		 6  
		 7  import (
		 8  	"sync/atomic"
		 9  	"unsafe"
		10  )
		11  
		12  // poolDequeue is a lock-free fixed-size single-producer,
		13  // multi-consumer queue. The single producer can both push and pop
		14  // from the head, and consumers can pop from the tail.
		15  //
		16  // It has the added feature that it nils out unused slots to avoid
		17  // unnecessary retention of objects. This is important for sync.Pool,
		18  // but not typically a property considered in the literature.
		19  type poolDequeue struct {
		20  	// headTail packs together a 32-bit head index and a 32-bit
		21  	// tail index. Both are indexes into vals modulo len(vals)-1.
		22  	//
		23  	// tail = index of oldest data in queue
		24  	// head = index of next slot to fill
		25  	//
		26  	// Slots in the range [tail, head) are owned by consumers.
		27  	// A consumer continues to own a slot outside this range until
		28  	// it nils the slot, at which point ownership passes to the
		29  	// producer.
		30  	//
		31  	// The head index is stored in the most-significant bits so
		32  	// that we can atomically add to it and the overflow is
		33  	// harmless.
		34  	headTail uint64
		35  
		36  	// vals is a ring buffer of interface{} values stored in this
		37  	// dequeue. The size of this must be a power of 2.
		38  	//
		39  	// vals[i].typ is nil if the slot is empty and non-nil
		40  	// otherwise. A slot is still in use until *both* the tail
		41  	// index has moved beyond it and typ has been set to nil. This
		42  	// is set to nil atomically by the consumer and read
		43  	// atomically by the producer.
		44  	vals []eface
		45  }
		46  
		47  type eface struct {
		48  	typ, val unsafe.Pointer
		49  }
		50  
		51  const dequeueBits = 32
		52  
		53  // dequeueLimit is the maximum size of a poolDequeue.
		54  //
		55  // This must be at most (1<<dequeueBits)/2 because detecting fullness
		56  // depends on wrapping around the ring buffer without wrapping around
		57  // the index. We divide by 4 so this fits in an int on 32-bit.
		58  const dequeueLimit = (1 << dequeueBits) / 4
		59  
		60  // dequeueNil is used in poolDequeue to represent interface{}(nil).
		61  // Since we use nil to represent empty slots, we need a sentinel value
		62  // to represent nil.
		63  type dequeueNil *struct{}
		64  
		65  func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
		66  	const mask = 1<<dequeueBits - 1
		67  	head = uint32((ptrs >> dequeueBits) & mask)
		68  	tail = uint32(ptrs & mask)
		69  	return
		70  }
		71  
		72  func (d *poolDequeue) pack(head, tail uint32) uint64 {
		73  	const mask = 1<<dequeueBits - 1
		74  	return (uint64(head) << dequeueBits) |
		75  		uint64(tail&mask)
		76  }
		77  
		78  // pushHead adds val at the head of the queue. It returns false if the
		79  // queue is full. It must only be called by a single producer.
		80  func (d *poolDequeue) pushHead(val interface{}) bool {
		81  	ptrs := atomic.LoadUint64(&d.headTail)
		82  	head, tail := d.unpack(ptrs)
		83  	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		84  		// Queue is full.
		85  		return false
		86  	}
		87  	slot := &d.vals[head&uint32(len(d.vals)-1)]
		88  
		89  	// Check if the head slot has been released by popTail.
		90  	typ := atomic.LoadPointer(&slot.typ)
		91  	if typ != nil {
		92  		// Another goroutine is still cleaning up the tail, so
		93  		// the queue is actually still full.
		94  		return false
		95  	}
		96  
		97  	// The head slot is free, so we own it.
		98  	if val == nil {
		99  		val = dequeueNil(nil)
	 100  	}
	 101  	*(*interface{})(unsafe.Pointer(slot)) = val
	 102  
	 103  	// Increment head. This passes ownership of slot to popTail
	 104  	// and acts as a store barrier for writing the slot.
	 105  	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	 106  	return true
	 107  }
	 108  
	 109  // popHead removes and returns the element at the head of the queue.
	 110  // It returns false if the queue is empty. It must only be called by a
	 111  // single producer.
	 112  func (d *poolDequeue) popHead() (interface{}, bool) {
	 113  	var slot *eface
	 114  	for {
	 115  		ptrs := atomic.LoadUint64(&d.headTail)
	 116  		head, tail := d.unpack(ptrs)
	 117  		if tail == head {
	 118  			// Queue is empty.
	 119  			return nil, false
	 120  		}
	 121  
	 122  		// Confirm tail and decrement head. We do this before
	 123  		// reading the value to take back ownership of this
	 124  		// slot.
	 125  		head--
	 126  		ptrs2 := d.pack(head, tail)
	 127  		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
	 128  			// We successfully took back slot.
	 129  			slot = &d.vals[head&uint32(len(d.vals)-1)]
	 130  			break
	 131  		}
	 132  	}
	 133  
	 134  	val := *(*interface{})(unsafe.Pointer(slot))
	 135  	if val == dequeueNil(nil) {
	 136  		val = nil
	 137  	}
	 138  	// Zero the slot. Unlike popTail, this isn't racing with
	 139  	// pushHead, so we don't need to be careful here.
	 140  	*slot = eface{}
	 141  	return val, true
	 142  }
	 143  
	 144  // popTail removes and returns the element at the tail of the queue.
	 145  // It returns false if the queue is empty. It may be called by any
	 146  // number of consumers.
	 147  func (d *poolDequeue) popTail() (interface{}, bool) {
	 148  	var slot *eface
	 149  	for {
	 150  		ptrs := atomic.LoadUint64(&d.headTail)
	 151  		head, tail := d.unpack(ptrs)
	 152  		if tail == head {
	 153  			// Queue is empty.
	 154  			return nil, false
	 155  		}
	 156  
	 157  		// Confirm head and tail (for our speculative check
	 158  		// above) and increment tail. If this succeeds, then
	 159  		// we own the slot at tail.
	 160  		ptrs2 := d.pack(head, tail+1)
	 161  		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
	 162  			// Success.
	 163  			slot = &d.vals[tail&uint32(len(d.vals)-1)]
	 164  			break
	 165  		}
	 166  	}
	 167  
	 168  	// We now own slot.
	 169  	val := *(*interface{})(unsafe.Pointer(slot))
	 170  	if val == dequeueNil(nil) {
	 171  		val = nil
	 172  	}
	 173  
	 174  	// Tell pushHead that we're done with this slot. Zeroing the
	 175  	// slot is also important so we don't leave behind references
	 176  	// that could keep this object live longer than necessary.
	 177  	//
	 178  	// We write to val first and then publish that we're done with
	 179  	// this slot by atomically writing to typ.
	 180  	slot.val = nil
	 181  	atomic.StorePointer(&slot.typ, nil)
	 182  	// At this point pushHead owns the slot.
	 183  
	 184  	return val, true
	 185  }
	 186  
	 187  // poolChain is a dynamically-sized version of poolDequeue.
	 188  //
	 189  // This is implemented as a doubly-linked list queue of poolDequeues
	 190  // where each dequeue is double the size of the previous one. Once a
	 191  // dequeue fills up, this allocates a new one and only ever pushes to
	 192  // the latest dequeue. Pops happen from the other end of the list and
	 193  // once a dequeue is exhausted, it gets removed from the list.
	 194  type poolChain struct {
	 195  	// head is the poolDequeue to push to. This is only accessed
	 196  	// by the producer, so doesn't need to be synchronized.
	 197  	head *poolChainElt
	 198  
	 199  	// tail is the poolDequeue to popTail from. This is accessed
	 200  	// by consumers, so reads and writes must be atomic.
	 201  	tail *poolChainElt
	 202  }
	 203  
	 204  type poolChainElt struct {
	 205  	poolDequeue
	 206  
	 207  	// next and prev link to the adjacent poolChainElts in this
	 208  	// poolChain.
	 209  	//
	 210  	// next is written atomically by the producer and read
	 211  	// atomically by the consumer. It only transitions from nil to
	 212  	// non-nil.
	 213  	//
	 214  	// prev is written atomically by the consumer and read
	 215  	// atomically by the producer. It only transitions from
	 216  	// non-nil to nil.
	 217  	next, prev *poolChainElt
	 218  }
	 219  
	 220  func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
	 221  	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
	 222  }
	 223  
	 224  func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
	 225  	return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
	 226  }
	 227  
	 228  func (c *poolChain) pushHead(val interface{}) {
	 229  	d := c.head
	 230  	if d == nil {
	 231  		// Initialize the chain.
	 232  		const initSize = 8 // Must be a power of 2
	 233  		d = new(poolChainElt)
	 234  		d.vals = make([]eface, initSize)
	 235  		c.head = d
	 236  		storePoolChainElt(&c.tail, d)
	 237  	}
	 238  
	 239  	if d.pushHead(val) {
	 240  		return
	 241  	}
	 242  
	 243  	// The current dequeue is full. Allocate a new one of twice
	 244  	// the size.
	 245  	newSize := len(d.vals) * 2
	 246  	if newSize >= dequeueLimit {
	 247  		// Can't make it any bigger.
	 248  		newSize = dequeueLimit
	 249  	}
	 250  
	 251  	d2 := &poolChainElt{prev: d}
	 252  	d2.vals = make([]eface, newSize)
	 253  	c.head = d2
	 254  	storePoolChainElt(&d.next, d2)
	 255  	d2.pushHead(val)
	 256  }
	 257  
	 258  func (c *poolChain) popHead() (interface{}, bool) {
	 259  	d := c.head
	 260  	for d != nil {
	 261  		if val, ok := d.popHead(); ok {
	 262  			return val, ok
	 263  		}
	 264  		// There may still be unconsumed elements in the
	 265  		// previous dequeue, so try backing up.
	 266  		d = loadPoolChainElt(&d.prev)
	 267  	}
	 268  	return nil, false
	 269  }
	 270  
	 271  func (c *poolChain) popTail() (interface{}, bool) {
	 272  	d := loadPoolChainElt(&c.tail)
	 273  	if d == nil {
	 274  		return nil, false
	 275  	}
	 276  
	 277  	for {
	 278  		// It's important that we load the next pointer
	 279  		// *before* popping the tail. In general, d may be
	 280  		// transiently empty, but if next is non-nil before
	 281  		// the pop and the pop fails, then d is permanently
	 282  		// empty, which is the only condition under which it's
	 283  		// safe to drop d from the chain.
	 284  		d2 := loadPoolChainElt(&d.next)
	 285  
	 286  		if val, ok := d.popTail(); ok {
	 287  			return val, ok
	 288  		}
	 289  
	 290  		if d2 == nil {
	 291  			// This is the only dequeue. It's empty right
	 292  			// now, but could be pushed to in the future.
	 293  			return nil, false
	 294  		}
	 295  
	 296  		// The tail of the chain has been drained, so move on
	 297  		// to the next dequeue. Try to drop it from the chain
	 298  		// so the next pop doesn't have to look at the empty
	 299  		// dequeue again.
	 300  		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
	 301  			// We won the race. Clear the prev pointer so
	 302  			// the garbage collector can collect the empty
	 303  			// dequeue and so popHead doesn't back up
	 304  			// further than necessary.
	 305  			storePoolChainElt(&d2.prev, nil)
	 306  		}
	 307  		d = d2
	 308  	}
	 309  }
	 310  

View as plain text