...

Source file src/runtime/sema.go

Documentation: runtime

		 1  // Copyright 2009 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  // Semaphore implementation exposed to Go.
		 6  // Intended use is provide a sleep and wakeup
		 7  // primitive that can be used in the contended case
		 8  // of other synchronization primitives.
		 9  // Thus it targets the same goal as Linux's futex,
		10  // but it has much simpler semantics.
		11  //
		12  // That is, don't think of these as semaphores.
		13  // Think of them as a way to implement sleep and wakeup
		14  // such that every sleep is paired with a single wakeup,
		15  // even if, due to races, the wakeup happens before the sleep.
		16  //
		17  // See Mullender and Cox, ``Semaphores in Plan 9,''
		18  // https://swtch.com/semaphore.pdf
		19  
		20  package runtime
		21  
		22  import (
		23  	"internal/cpu"
		24  	"runtime/internal/atomic"
		25  	"unsafe"
		26  )
		27  
		28  // Asynchronous semaphore for sync.Mutex.
		29  
		30  // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
		31  // Each of those sudog may in turn point (through s.waitlink) to a list
		32  // of other sudogs waiting on the same address.
		33  // The operations on the inner lists of sudogs with the same address
		34  // are all O(1). The scanning of the top-level semaRoot list is O(log n),
		35  // where n is the number of distinct addresses with goroutines blocked
		36  // on them that hash to the given semaRoot.
		37  // See golang.org/issue/17953 for a program that worked badly
		38  // before we introduced the second level of list, and test/locklinear.go
		39  // for a test that exercises this.
		40  type semaRoot struct {
		41  	lock	mutex
		42  	treap *sudog // root of balanced tree of unique waiters.
		43  	nwait uint32 // Number of waiters. Read w/o the lock.
		44  }
		45  
		46  // Prime to not correlate with any user patterns.
		47  const semTabSize = 251
		48  
		49  var semtable [semTabSize]struct {
		50  	root semaRoot
		51  	pad	[cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
		52  }
		53  
		54  //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
		55  func sync_runtime_Semacquire(addr *uint32) {
		56  	semacquire1(addr, false, semaBlockProfile, 0)
		57  }
		58  
		59  //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
		60  func poll_runtime_Semacquire(addr *uint32) {
		61  	semacquire1(addr, false, semaBlockProfile, 0)
		62  }
		63  
		64  //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
		65  func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
		66  	semrelease1(addr, handoff, skipframes)
		67  }
		68  
		69  //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
		70  func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
		71  	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
		72  }
		73  
		74  //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
		75  func poll_runtime_Semrelease(addr *uint32) {
		76  	semrelease(addr)
		77  }
		78  
		79  func readyWithTime(s *sudog, traceskip int) {
		80  	if s.releasetime != 0 {
		81  		s.releasetime = cputicks()
		82  	}
		83  	goready(s.g, traceskip)
		84  }
		85  
		86  type semaProfileFlags int
		87  
		88  const (
		89  	semaBlockProfile semaProfileFlags = 1 << iota
		90  	semaMutexProfile
		91  )
		92  
		93  // Called from runtime.
		94  func semacquire(addr *uint32) {
		95  	semacquire1(addr, false, 0, 0)
		96  }
		97  
		98  func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
		99  	gp := getg()
	 100  	if gp != gp.m.curg {
	 101  		throw("semacquire not on the G stack")
	 102  	}
	 103  
	 104  	// Easy case.
	 105  	if cansemacquire(addr) {
	 106  		return
	 107  	}
	 108  
	 109  	// Harder case:
	 110  	//	increment waiter count
	 111  	//	try cansemacquire one more time, return if succeeded
	 112  	//	enqueue itself as a waiter
	 113  	//	sleep
	 114  	//	(waiter descriptor is dequeued by signaler)
	 115  	s := acquireSudog()
	 116  	root := semroot(addr)
	 117  	t0 := int64(0)
	 118  	s.releasetime = 0
	 119  	s.acquiretime = 0
	 120  	s.ticket = 0
	 121  	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
	 122  		t0 = cputicks()
	 123  		s.releasetime = -1
	 124  	}
	 125  	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
	 126  		if t0 == 0 {
	 127  			t0 = cputicks()
	 128  		}
	 129  		s.acquiretime = t0
	 130  	}
	 131  	for {
	 132  		lockWithRank(&root.lock, lockRankRoot)
	 133  		// Add ourselves to nwait to disable "easy case" in semrelease.
	 134  		atomic.Xadd(&root.nwait, 1)
	 135  		// Check cansemacquire to avoid missed wakeup.
	 136  		if cansemacquire(addr) {
	 137  			atomic.Xadd(&root.nwait, -1)
	 138  			unlock(&root.lock)
	 139  			break
	 140  		}
	 141  		// Any semrelease after the cansemacquire knows we're waiting
	 142  		// (we set nwait above), so go to sleep.
	 143  		root.queue(addr, s, lifo)
	 144  		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
	 145  		if s.ticket != 0 || cansemacquire(addr) {
	 146  			break
	 147  		}
	 148  	}
	 149  	if s.releasetime > 0 {
	 150  		blockevent(s.releasetime-t0, 3+skipframes)
	 151  	}
	 152  	releaseSudog(s)
	 153  }
	 154  
	 155  func semrelease(addr *uint32) {
	 156  	semrelease1(addr, false, 0)
	 157  }
	 158  
	 159  func semrelease1(addr *uint32, handoff bool, skipframes int) {
	 160  	root := semroot(addr)
	 161  	atomic.Xadd(addr, 1)
	 162  
	 163  	// Easy case: no waiters?
	 164  	// This check must happen after the xadd, to avoid a missed wakeup
	 165  	// (see loop in semacquire).
	 166  	if atomic.Load(&root.nwait) == 0 {
	 167  		return
	 168  	}
	 169  
	 170  	// Harder case: search for a waiter and wake it.
	 171  	lockWithRank(&root.lock, lockRankRoot)
	 172  	if atomic.Load(&root.nwait) == 0 {
	 173  		// The count is already consumed by another goroutine,
	 174  		// so no need to wake up another goroutine.
	 175  		unlock(&root.lock)
	 176  		return
	 177  	}
	 178  	s, t0 := root.dequeue(addr)
	 179  	if s != nil {
	 180  		atomic.Xadd(&root.nwait, -1)
	 181  	}
	 182  	unlock(&root.lock)
	 183  	if s != nil { // May be slow or even yield, so unlock first
	 184  		acquiretime := s.acquiretime
	 185  		if acquiretime != 0 {
	 186  			mutexevent(t0-acquiretime, 3+skipframes)
	 187  		}
	 188  		if s.ticket != 0 {
	 189  			throw("corrupted semaphore ticket")
	 190  		}
	 191  		if handoff && cansemacquire(addr) {
	 192  			s.ticket = 1
	 193  		}
	 194  		readyWithTime(s, 5+skipframes)
	 195  		if s.ticket == 1 && getg().m.locks == 0 {
	 196  			// Direct G handoff
	 197  			// readyWithTime has added the waiter G as runnext in the
	 198  			// current P; we now call the scheduler so that we start running
	 199  			// the waiter G immediately.
	 200  			// Note that waiter inherits our time slice: this is desirable
	 201  			// to avoid having a highly contended semaphore hog the P
	 202  			// indefinitely. goyield is like Gosched, but it emits a
	 203  			// "preempted" trace event instead and, more importantly, puts
	 204  			// the current G on the local runq instead of the global one.
	 205  			// We only do this in the starving regime (handoff=true), as in
	 206  			// the non-starving case it is possible for a different waiter
	 207  			// to acquire the semaphore while we are yielding/scheduling,
	 208  			// and this would be wasteful. We wait instead to enter starving
	 209  			// regime, and then we start to do direct handoffs of ticket and
	 210  			// P.
	 211  			// See issue 33747 for discussion.
	 212  			goyield()
	 213  		}
	 214  	}
	 215  }
	 216  
	 217  func semroot(addr *uint32) *semaRoot {
	 218  	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
	 219  }
	 220  
	 221  func cansemacquire(addr *uint32) bool {
	 222  	for {
	 223  		v := atomic.Load(addr)
	 224  		if v == 0 {
	 225  			return false
	 226  		}
	 227  		if atomic.Cas(addr, v, v-1) {
	 228  			return true
	 229  		}
	 230  	}
	 231  }
	 232  
	 233  // queue adds s to the blocked goroutines in semaRoot.
	 234  func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
	 235  	s.g = getg()
	 236  	s.elem = unsafe.Pointer(addr)
	 237  	s.next = nil
	 238  	s.prev = nil
	 239  
	 240  	var last *sudog
	 241  	pt := &root.treap
	 242  	for t := *pt; t != nil; t = *pt {
	 243  		if t.elem == unsafe.Pointer(addr) {
	 244  			// Already have addr in list.
	 245  			if lifo {
	 246  				// Substitute s in t's place in treap.
	 247  				*pt = s
	 248  				s.ticket = t.ticket
	 249  				s.acquiretime = t.acquiretime
	 250  				s.parent = t.parent
	 251  				s.prev = t.prev
	 252  				s.next = t.next
	 253  				if s.prev != nil {
	 254  					s.prev.parent = s
	 255  				}
	 256  				if s.next != nil {
	 257  					s.next.parent = s
	 258  				}
	 259  				// Add t first in s's wait list.
	 260  				s.waitlink = t
	 261  				s.waittail = t.waittail
	 262  				if s.waittail == nil {
	 263  					s.waittail = t
	 264  				}
	 265  				t.parent = nil
	 266  				t.prev = nil
	 267  				t.next = nil
	 268  				t.waittail = nil
	 269  			} else {
	 270  				// Add s to end of t's wait list.
	 271  				if t.waittail == nil {
	 272  					t.waitlink = s
	 273  				} else {
	 274  					t.waittail.waitlink = s
	 275  				}
	 276  				t.waittail = s
	 277  				s.waitlink = nil
	 278  			}
	 279  			return
	 280  		}
	 281  		last = t
	 282  		if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
	 283  			pt = &t.prev
	 284  		} else {
	 285  			pt = &t.next
	 286  		}
	 287  	}
	 288  
	 289  	// Add s as new leaf in tree of unique addrs.
	 290  	// The balanced tree is a treap using ticket as the random heap priority.
	 291  	// That is, it is a binary tree ordered according to the elem addresses,
	 292  	// but then among the space of possible binary trees respecting those
	 293  	// addresses, it is kept balanced on average by maintaining a heap ordering
	 294  	// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.
	 295  	// https://en.wikipedia.org/wiki/Treap
	 296  	// https://faculty.washington.edu/aragon/pubs/rst89.pdf
	 297  	//
	 298  	// s.ticket compared with zero in couple of places, therefore set lowest bit.
	 299  	// It will not affect treap's quality noticeably.
	 300  	s.ticket = fastrand() | 1
	 301  	s.parent = last
	 302  	*pt = s
	 303  
	 304  	// Rotate up into tree according to ticket (priority).
	 305  	for s.parent != nil && s.parent.ticket > s.ticket {
	 306  		if s.parent.prev == s {
	 307  			root.rotateRight(s.parent)
	 308  		} else {
	 309  			if s.parent.next != s {
	 310  				panic("semaRoot queue")
	 311  			}
	 312  			root.rotateLeft(s.parent)
	 313  		}
	 314  	}
	 315  }
	 316  
	 317  // dequeue searches for and finds the first goroutine
	 318  // in semaRoot blocked on addr.
	 319  // If the sudog was being profiled, dequeue returns the time
	 320  // at which it was woken up as now. Otherwise now is 0.
	 321  func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
	 322  	ps := &root.treap
	 323  	s := *ps
	 324  	for ; s != nil; s = *ps {
	 325  		if s.elem == unsafe.Pointer(addr) {
	 326  			goto Found
	 327  		}
	 328  		if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
	 329  			ps = &s.prev
	 330  		} else {
	 331  			ps = &s.next
	 332  		}
	 333  	}
	 334  	return nil, 0
	 335  
	 336  Found:
	 337  	now = int64(0)
	 338  	if s.acquiretime != 0 {
	 339  		now = cputicks()
	 340  	}
	 341  	if t := s.waitlink; t != nil {
	 342  		// Substitute t, also waiting on addr, for s in root tree of unique addrs.
	 343  		*ps = t
	 344  		t.ticket = s.ticket
	 345  		t.parent = s.parent
	 346  		t.prev = s.prev
	 347  		if t.prev != nil {
	 348  			t.prev.parent = t
	 349  		}
	 350  		t.next = s.next
	 351  		if t.next != nil {
	 352  			t.next.parent = t
	 353  		}
	 354  		if t.waitlink != nil {
	 355  			t.waittail = s.waittail
	 356  		} else {
	 357  			t.waittail = nil
	 358  		}
	 359  		t.acquiretime = now
	 360  		s.waitlink = nil
	 361  		s.waittail = nil
	 362  	} else {
	 363  		// Rotate s down to be leaf of tree for removal, respecting priorities.
	 364  		for s.next != nil || s.prev != nil {
	 365  			if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
	 366  				root.rotateRight(s)
	 367  			} else {
	 368  				root.rotateLeft(s)
	 369  			}
	 370  		}
	 371  		// Remove s, now a leaf.
	 372  		if s.parent != nil {
	 373  			if s.parent.prev == s {
	 374  				s.parent.prev = nil
	 375  			} else {
	 376  				s.parent.next = nil
	 377  			}
	 378  		} else {
	 379  			root.treap = nil
	 380  		}
	 381  	}
	 382  	s.parent = nil
	 383  	s.elem = nil
	 384  	s.next = nil
	 385  	s.prev = nil
	 386  	s.ticket = 0
	 387  	return s, now
	 388  }
	 389  
	 390  // rotateLeft rotates the tree rooted at node x.
	 391  // turning (x a (y b c)) into (y (x a b) c).
	 392  func (root *semaRoot) rotateLeft(x *sudog) {
	 393  	// p -> (x a (y b c))
	 394  	p := x.parent
	 395  	y := x.next
	 396  	b := y.prev
	 397  
	 398  	y.prev = x
	 399  	x.parent = y
	 400  	x.next = b
	 401  	if b != nil {
	 402  		b.parent = x
	 403  	}
	 404  
	 405  	y.parent = p
	 406  	if p == nil {
	 407  		root.treap = y
	 408  	} else if p.prev == x {
	 409  		p.prev = y
	 410  	} else {
	 411  		if p.next != x {
	 412  			throw("semaRoot rotateLeft")
	 413  		}
	 414  		p.next = y
	 415  	}
	 416  }
	 417  
	 418  // rotateRight rotates the tree rooted at node y.
	 419  // turning (y (x a b) c) into (x a (y b c)).
	 420  func (root *semaRoot) rotateRight(y *sudog) {
	 421  	// p -> (y (x a b) c)
	 422  	p := y.parent
	 423  	x := y.prev
	 424  	b := x.next
	 425  
	 426  	x.next = y
	 427  	y.parent = x
	 428  	y.prev = b
	 429  	if b != nil {
	 430  		b.parent = y
	 431  	}
	 432  
	 433  	x.parent = p
	 434  	if p == nil {
	 435  		root.treap = x
	 436  	} else if p.prev == y {
	 437  		p.prev = x
	 438  	} else {
	 439  		if p.next != y {
	 440  			throw("semaRoot rotateRight")
	 441  		}
	 442  		p.next = x
	 443  	}
	 444  }
	 445  
	 446  // notifyList is a ticket-based notification list used to implement sync.Cond.
	 447  //
	 448  // It must be kept in sync with the sync package.
	 449  type notifyList struct {
	 450  	// wait is the ticket number of the next waiter. It is atomically
	 451  	// incremented outside the lock.
	 452  	wait uint32
	 453  
	 454  	// notify is the ticket number of the next waiter to be notified. It can
	 455  	// be read outside the lock, but is only written to with lock held.
	 456  	//
	 457  	// Both wait & notify can wrap around, and such cases will be correctly
	 458  	// handled as long as their "unwrapped" difference is bounded by 2^31.
	 459  	// For this not to be the case, we'd need to have 2^31+ goroutines
	 460  	// blocked on the same condvar, which is currently not possible.
	 461  	notify uint32
	 462  
	 463  	// List of parked waiters.
	 464  	lock mutex
	 465  	head *sudog
	 466  	tail *sudog
	 467  }
	 468  
	 469  // less checks if a < b, considering a & b running counts that may overflow the
	 470  // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
	 471  func less(a, b uint32) bool {
	 472  	return int32(a-b) < 0
	 473  }
	 474  
	 475  // notifyListAdd adds the caller to a notify list such that it can receive
	 476  // notifications. The caller must eventually call notifyListWait to wait for
	 477  // such a notification, passing the returned ticket number.
	 478  //go:linkname notifyListAdd sync.runtime_notifyListAdd
	 479  func notifyListAdd(l *notifyList) uint32 {
	 480  	// This may be called concurrently, for example, when called from
	 481  	// sync.Cond.Wait while holding a RWMutex in read mode.
	 482  	return atomic.Xadd(&l.wait, 1) - 1
	 483  }
	 484  
	 485  // notifyListWait waits for a notification. If one has been sent since
	 486  // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
	 487  //go:linkname notifyListWait sync.runtime_notifyListWait
	 488  func notifyListWait(l *notifyList, t uint32) {
	 489  	lockWithRank(&l.lock, lockRankNotifyList)
	 490  
	 491  	// Return right away if this ticket has already been notified.
	 492  	if less(t, l.notify) {
	 493  		unlock(&l.lock)
	 494  		return
	 495  	}
	 496  
	 497  	// Enqueue itself.
	 498  	s := acquireSudog()
	 499  	s.g = getg()
	 500  	s.ticket = t
	 501  	s.releasetime = 0
	 502  	t0 := int64(0)
	 503  	if blockprofilerate > 0 {
	 504  		t0 = cputicks()
	 505  		s.releasetime = -1
	 506  	}
	 507  	if l.tail == nil {
	 508  		l.head = s
	 509  	} else {
	 510  		l.tail.next = s
	 511  	}
	 512  	l.tail = s
	 513  	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	 514  	if t0 != 0 {
	 515  		blockevent(s.releasetime-t0, 2)
	 516  	}
	 517  	releaseSudog(s)
	 518  }
	 519  
	 520  // notifyListNotifyAll notifies all entries in the list.
	 521  //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
	 522  func notifyListNotifyAll(l *notifyList) {
	 523  	// Fast-path: if there are no new waiters since the last notification
	 524  	// we don't need to acquire the lock.
	 525  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
	 526  		return
	 527  	}
	 528  
	 529  	// Pull the list out into a local variable, waiters will be readied
	 530  	// outside the lock.
	 531  	lockWithRank(&l.lock, lockRankNotifyList)
	 532  	s := l.head
	 533  	l.head = nil
	 534  	l.tail = nil
	 535  
	 536  	// Update the next ticket to be notified. We can set it to the current
	 537  	// value of wait because any previous waiters are already in the list
	 538  	// or will notice that they have already been notified when trying to
	 539  	// add themselves to the list.
	 540  	atomic.Store(&l.notify, atomic.Load(&l.wait))
	 541  	unlock(&l.lock)
	 542  
	 543  	// Go through the local list and ready all waiters.
	 544  	for s != nil {
	 545  		next := s.next
	 546  		s.next = nil
	 547  		readyWithTime(s, 4)
	 548  		s = next
	 549  	}
	 550  }
	 551  
	 552  // notifyListNotifyOne notifies one entry in the list.
	 553  //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
	 554  func notifyListNotifyOne(l *notifyList) {
	 555  	// Fast-path: if there are no new waiters since the last notification
	 556  	// we don't need to acquire the lock at all.
	 557  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
	 558  		return
	 559  	}
	 560  
	 561  	lockWithRank(&l.lock, lockRankNotifyList)
	 562  
	 563  	// Re-check under the lock if we need to do anything.
	 564  	t := l.notify
	 565  	if t == atomic.Load(&l.wait) {
	 566  		unlock(&l.lock)
	 567  		return
	 568  	}
	 569  
	 570  	// Update the next notify ticket number.
	 571  	atomic.Store(&l.notify, t+1)
	 572  
	 573  	// Try to find the g that needs to be notified.
	 574  	// If it hasn't made it to the list yet we won't find it,
	 575  	// but it won't park itself once it sees the new notify number.
	 576  	//
	 577  	// This scan looks linear but essentially always stops quickly.
	 578  	// Because g's queue separately from taking numbers,
	 579  	// there may be minor reorderings in the list, but we
	 580  	// expect the g we're looking for to be near the front.
	 581  	// The g has others in front of it on the list only to the
	 582  	// extent that it lost the race, so the iteration will not
	 583  	// be too long. This applies even when the g is missing:
	 584  	// it hasn't yet gotten to sleep and has lost the race to
	 585  	// the (few) other g's that we find on the list.
	 586  	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
	 587  		if s.ticket == t {
	 588  			n := s.next
	 589  			if p != nil {
	 590  				p.next = n
	 591  			} else {
	 592  				l.head = n
	 593  			}
	 594  			if n == nil {
	 595  				l.tail = p
	 596  			}
	 597  			unlock(&l.lock)
	 598  			s.next = nil
	 599  			readyWithTime(s, 4)
	 600  			return
	 601  		}
	 602  	}
	 603  	unlock(&l.lock)
	 604  }
	 605  
	 606  //go:linkname notifyListCheck sync.runtime_notifyListCheck
	 607  func notifyListCheck(sz uintptr) {
	 608  	if sz != unsafe.Sizeof(notifyList{}) {
	 609  		print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
	 610  		throw("bad notifyList size")
	 611  	}
	 612  }
	 613  
	 614  //go:linkname sync_nanotime sync.runtime_nanotime
	 615  func sync_nanotime() int64 {
	 616  	return nanotime()
	 617  }
	 618  

View as plain text