...

Source file src/runtime/select.go

Documentation: runtime

		 1  // Copyright 2009 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package runtime
		 6  
		 7  // This file contains the implementation of Go select statements.
		 8  
		 9  import (
		10  	"runtime/internal/atomic"
		11  	"unsafe"
		12  )
		13  
		14  const debugSelect = false
		15  
		16  // Select case descriptor.
		17  // Known to compiler.
		18  // Changes here must also be made in src/cmd/compile/internal/walk/select.go's scasetype.
		19  type scase struct {
		20  	c		*hchan				 // chan
		21  	elem unsafe.Pointer // data element
		22  }
		23  
		24  var (
		25  	chansendpc = funcPC(chansend)
		26  	chanrecvpc = funcPC(chanrecv)
		27  )
		28  
		29  func selectsetpc(pc *uintptr) {
		30  	*pc = getcallerpc()
		31  }
		32  
		33  func sellock(scases []scase, lockorder []uint16) {
		34  	var c *hchan
		35  	for _, o := range lockorder {
		36  		c0 := scases[o].c
		37  		if c0 != c {
		38  			c = c0
		39  			lock(&c.lock)
		40  		}
		41  	}
		42  }
		43  
		44  func selunlock(scases []scase, lockorder []uint16) {
		45  	// We must be very careful here to not touch sel after we have unlocked
		46  	// the last lock, because sel can be freed right after the last unlock.
		47  	// Consider the following situation.
		48  	// First M calls runtime·park() in runtime·selectgo() passing the sel.
		49  	// Once runtime·park() has unlocked the last lock, another M makes
		50  	// the G that calls select runnable again and schedules it for execution.
		51  	// When the G runs on another M, it locks all the locks and frees sel.
		52  	// Now if the first M touches sel, it will access freed memory.
		53  	for i := len(lockorder) - 1; i >= 0; i-- {
		54  		c := scases[lockorder[i]].c
		55  		if i > 0 && c == scases[lockorder[i-1]].c {
		56  			continue // will unlock it on the next iteration
		57  		}
		58  		unlock(&c.lock)
		59  	}
		60  }
		61  
		62  func selparkcommit(gp *g, _ unsafe.Pointer) bool {
		63  	// There are unlocked sudogs that point into gp's stack. Stack
		64  	// copying must lock the channels of those sudogs.
		65  	// Set activeStackChans here instead of before we try parking
		66  	// because we could self-deadlock in stack growth on a
		67  	// channel lock.
		68  	gp.activeStackChans = true
		69  	// Mark that it's safe for stack shrinking to occur now,
		70  	// because any thread acquiring this G's stack for shrinking
		71  	// is guaranteed to observe activeStackChans after this store.
		72  	atomic.Store8(&gp.parkingOnChan, 0)
		73  	// Make sure we unlock after setting activeStackChans and
		74  	// unsetting parkingOnChan. The moment we unlock any of the
		75  	// channel locks we risk gp getting readied by a channel operation
		76  	// and so gp could continue running before everything before the
		77  	// unlock is visible (even to gp itself).
		78  
		79  	// This must not access gp's stack (see gopark). In
		80  	// particular, it must not access the *hselect. That's okay,
		81  	// because by the time this is called, gp.waiting has all
		82  	// channels in lock order.
		83  	var lastc *hchan
		84  	for sg := gp.waiting; sg != nil; sg = sg.waitlink {
		85  		if sg.c != lastc && lastc != nil {
		86  			// As soon as we unlock the channel, fields in
		87  			// any sudog with that channel may change,
		88  			// including c and waitlink. Since multiple
		89  			// sudogs may have the same channel, we unlock
		90  			// only after we've passed the last instance
		91  			// of a channel.
		92  			unlock(&lastc.lock)
		93  		}
		94  		lastc = sg.c
		95  	}
		96  	if lastc != nil {
		97  		unlock(&lastc.lock)
		98  	}
		99  	return true
	 100  }
	 101  
	 102  func block() {
	 103  	gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever
	 104  }
	 105  
	 106  // selectgo implements the select statement.
	 107  //
	 108  // cas0 points to an array of type [ncases]scase, and order0 points to
	 109  // an array of type [2*ncases]uint16 where ncases must be <= 65536.
	 110  // Both reside on the goroutine's stack (regardless of any escaping in
	 111  // selectgo).
	 112  //
	 113  // For race detector builds, pc0 points to an array of type
	 114  // [ncases]uintptr (also on the stack); for other builds, it's set to
	 115  // nil.
	 116  //
	 117  // selectgo returns the index of the chosen scase, which matches the
	 118  // ordinal position of its respective select{recv,send,default} call.
	 119  // Also, if the chosen scase was a receive operation, it reports whether
	 120  // a value was received.
	 121  func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	 122  	if debugSelect {
	 123  		print("select: cas0=", cas0, "\n")
	 124  	}
	 125  
	 126  	// NOTE: In order to maintain a lean stack size, the number of scases
	 127  	// is capped at 65536.
	 128  	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	 129  	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
	 130  
	 131  	ncases := nsends + nrecvs
	 132  	scases := cas1[:ncases:ncases]
	 133  	pollorder := order1[:ncases:ncases]
	 134  	lockorder := order1[ncases:][:ncases:ncases]
	 135  	// NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler.
	 136  
	 137  	// Even when raceenabled is true, there might be select
	 138  	// statements in packages compiled without -race (e.g.,
	 139  	// ensureSigM in runtime/signal_unix.go).
	 140  	var pcs []uintptr
	 141  	if raceenabled && pc0 != nil {
	 142  		pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
	 143  		pcs = pc1[:ncases:ncases]
	 144  	}
	 145  	casePC := func(casi int) uintptr {
	 146  		if pcs == nil {
	 147  			return 0
	 148  		}
	 149  		return pcs[casi]
	 150  	}
	 151  
	 152  	var t0 int64
	 153  	if blockprofilerate > 0 {
	 154  		t0 = cputicks()
	 155  	}
	 156  
	 157  	// The compiler rewrites selects that statically have
	 158  	// only 0 or 1 cases plus default into simpler constructs.
	 159  	// The only way we can end up with such small sel.ncase
	 160  	// values here is for a larger select in which most channels
	 161  	// have been nilled out. The general code handles those
	 162  	// cases correctly, and they are rare enough not to bother
	 163  	// optimizing (and needing to test).
	 164  
	 165  	// generate permuted order
	 166  	norder := 0
	 167  	for i := range scases {
	 168  		cas := &scases[i]
	 169  
	 170  		// Omit cases without channels from the poll and lock orders.
	 171  		if cas.c == nil {
	 172  			cas.elem = nil // allow GC
	 173  			continue
	 174  		}
	 175  
	 176  		j := fastrandn(uint32(norder + 1))
	 177  		pollorder[norder] = pollorder[j]
	 178  		pollorder[j] = uint16(i)
	 179  		norder++
	 180  	}
	 181  	pollorder = pollorder[:norder]
	 182  	lockorder = lockorder[:norder]
	 183  
	 184  	// sort the cases by Hchan address to get the locking order.
	 185  	// simple heap sort, to guarantee n log n time and constant stack footprint.
	 186  	for i := range lockorder {
	 187  		j := i
	 188  		// Start with the pollorder to permute cases on the same channel.
	 189  		c := scases[pollorder[i]].c
	 190  		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
	 191  			k := (j - 1) / 2
	 192  			lockorder[j] = lockorder[k]
	 193  			j = k
	 194  		}
	 195  		lockorder[j] = pollorder[i]
	 196  	}
	 197  	for i := len(lockorder) - 1; i >= 0; i-- {
	 198  		o := lockorder[i]
	 199  		c := scases[o].c
	 200  		lockorder[i] = lockorder[0]
	 201  		j := 0
	 202  		for {
	 203  			k := j*2 + 1
	 204  			if k >= i {
	 205  				break
	 206  			}
	 207  			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
	 208  				k++
	 209  			}
	 210  			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
	 211  				lockorder[j] = lockorder[k]
	 212  				j = k
	 213  				continue
	 214  			}
	 215  			break
	 216  		}
	 217  		lockorder[j] = o
	 218  	}
	 219  
	 220  	if debugSelect {
	 221  		for i := 0; i+1 < len(lockorder); i++ {
	 222  			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
	 223  				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
	 224  				throw("select: broken sort")
	 225  			}
	 226  		}
	 227  	}
	 228  
	 229  	// lock all the channels involved in the select
	 230  	sellock(scases, lockorder)
	 231  
	 232  	var (
	 233  		gp		 *g
	 234  		sg		 *sudog
	 235  		c			*hchan
	 236  		k			*scase
	 237  		sglist *sudog
	 238  		sgnext *sudog
	 239  		qp		 unsafe.Pointer
	 240  		nextp	**sudog
	 241  	)
	 242  
	 243  	// pass 1 - look for something already waiting
	 244  	var casi int
	 245  	var cas *scase
	 246  	var caseSuccess bool
	 247  	var caseReleaseTime int64 = -1
	 248  	var recvOK bool
	 249  	for _, casei := range pollorder {
	 250  		casi = int(casei)
	 251  		cas = &scases[casi]
	 252  		c = cas.c
	 253  
	 254  		if casi >= nsends {
	 255  			sg = c.sendq.dequeue()
	 256  			if sg != nil {
	 257  				goto recv
	 258  			}
	 259  			if c.qcount > 0 {
	 260  				goto bufrecv
	 261  			}
	 262  			if c.closed != 0 {
	 263  				goto rclose
	 264  			}
	 265  		} else {
	 266  			if raceenabled {
	 267  				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
	 268  			}
	 269  			if c.closed != 0 {
	 270  				goto sclose
	 271  			}
	 272  			sg = c.recvq.dequeue()
	 273  			if sg != nil {
	 274  				goto send
	 275  			}
	 276  			if c.qcount < c.dataqsiz {
	 277  				goto bufsend
	 278  			}
	 279  		}
	 280  	}
	 281  
	 282  	if !block {
	 283  		selunlock(scases, lockorder)
	 284  		casi = -1
	 285  		goto retc
	 286  	}
	 287  
	 288  	// pass 2 - enqueue on all chans
	 289  	gp = getg()
	 290  	if gp.waiting != nil {
	 291  		throw("gp.waiting != nil")
	 292  	}
	 293  	nextp = &gp.waiting
	 294  	for _, casei := range lockorder {
	 295  		casi = int(casei)
	 296  		cas = &scases[casi]
	 297  		c = cas.c
	 298  		sg := acquireSudog()
	 299  		sg.g = gp
	 300  		sg.isSelect = true
	 301  		// No stack splits between assigning elem and enqueuing
	 302  		// sg on gp.waiting where copystack can find it.
	 303  		sg.elem = cas.elem
	 304  		sg.releasetime = 0
	 305  		if t0 != 0 {
	 306  			sg.releasetime = -1
	 307  		}
	 308  		sg.c = c
	 309  		// Construct waiting list in lock order.
	 310  		*nextp = sg
	 311  		nextp = &sg.waitlink
	 312  
	 313  		if casi < nsends {
	 314  			c.sendq.enqueue(sg)
	 315  		} else {
	 316  			c.recvq.enqueue(sg)
	 317  		}
	 318  	}
	 319  
	 320  	// wait for someone to wake us up
	 321  	gp.param = nil
	 322  	// Signal to anyone trying to shrink our stack that we're about
	 323  	// to park on a channel. The window between when this G's status
	 324  	// changes and when we set gp.activeStackChans is not safe for
	 325  	// stack shrinking.
	 326  	atomic.Store8(&gp.parkingOnChan, 1)
	 327  	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	 328  	gp.activeStackChans = false
	 329  
	 330  	sellock(scases, lockorder)
	 331  
	 332  	gp.selectDone = 0
	 333  	sg = (*sudog)(gp.param)
	 334  	gp.param = nil
	 335  
	 336  	// pass 3 - dequeue from unsuccessful chans
	 337  	// otherwise they stack up on quiet channels
	 338  	// record the successful case, if any.
	 339  	// We singly-linked up the SudoGs in lock order.
	 340  	casi = -1
	 341  	cas = nil
	 342  	caseSuccess = false
	 343  	sglist = gp.waiting
	 344  	// Clear all elem before unlinking from gp.waiting.
	 345  	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
	 346  		sg1.isSelect = false
	 347  		sg1.elem = nil
	 348  		sg1.c = nil
	 349  	}
	 350  	gp.waiting = nil
	 351  
	 352  	for _, casei := range lockorder {
	 353  		k = &scases[casei]
	 354  		if sg == sglist {
	 355  			// sg has already been dequeued by the G that woke us up.
	 356  			casi = int(casei)
	 357  			cas = k
	 358  			caseSuccess = sglist.success
	 359  			if sglist.releasetime > 0 {
	 360  				caseReleaseTime = sglist.releasetime
	 361  			}
	 362  		} else {
	 363  			c = k.c
	 364  			if int(casei) < nsends {
	 365  				c.sendq.dequeueSudoG(sglist)
	 366  			} else {
	 367  				c.recvq.dequeueSudoG(sglist)
	 368  			}
	 369  		}
	 370  		sgnext = sglist.waitlink
	 371  		sglist.waitlink = nil
	 372  		releaseSudog(sglist)
	 373  		sglist = sgnext
	 374  	}
	 375  
	 376  	if cas == nil {
	 377  		throw("selectgo: bad wakeup")
	 378  	}
	 379  
	 380  	c = cas.c
	 381  
	 382  	if debugSelect {
	 383  		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	 384  	}
	 385  
	 386  	if casi < nsends {
	 387  		if !caseSuccess {
	 388  			goto sclose
	 389  		}
	 390  	} else {
	 391  		recvOK = caseSuccess
	 392  	}
	 393  
	 394  	if raceenabled {
	 395  		if casi < nsends {
	 396  			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	 397  		} else if cas.elem != nil {
	 398  			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
	 399  		}
	 400  	}
	 401  	if msanenabled {
	 402  		if casi < nsends {
	 403  			msanread(cas.elem, c.elemtype.size)
	 404  		} else if cas.elem != nil {
	 405  			msanwrite(cas.elem, c.elemtype.size)
	 406  		}
	 407  	}
	 408  
	 409  	selunlock(scases, lockorder)
	 410  	goto retc
	 411  
	 412  bufrecv:
	 413  	// can receive from buffer
	 414  	if raceenabled {
	 415  		if cas.elem != nil {
	 416  			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
	 417  		}
	 418  		racenotify(c, c.recvx, nil)
	 419  	}
	 420  	if msanenabled && cas.elem != nil {
	 421  		msanwrite(cas.elem, c.elemtype.size)
	 422  	}
	 423  	recvOK = true
	 424  	qp = chanbuf(c, c.recvx)
	 425  	if cas.elem != nil {
	 426  		typedmemmove(c.elemtype, cas.elem, qp)
	 427  	}
	 428  	typedmemclr(c.elemtype, qp)
	 429  	c.recvx++
	 430  	if c.recvx == c.dataqsiz {
	 431  		c.recvx = 0
	 432  	}
	 433  	c.qcount--
	 434  	selunlock(scases, lockorder)
	 435  	goto retc
	 436  
	 437  bufsend:
	 438  	// can send to buffer
	 439  	if raceenabled {
	 440  		racenotify(c, c.sendx, nil)
	 441  		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	 442  	}
	 443  	if msanenabled {
	 444  		msanread(cas.elem, c.elemtype.size)
	 445  	}
	 446  	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	 447  	c.sendx++
	 448  	if c.sendx == c.dataqsiz {
	 449  		c.sendx = 0
	 450  	}
	 451  	c.qcount++
	 452  	selunlock(scases, lockorder)
	 453  	goto retc
	 454  
	 455  recv:
	 456  	// can receive from sleeping sender (sg)
	 457  	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	 458  	if debugSelect {
	 459  		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	 460  	}
	 461  	recvOK = true
	 462  	goto retc
	 463  
	 464  rclose:
	 465  	// read at end of closed channel
	 466  	selunlock(scases, lockorder)
	 467  	recvOK = false
	 468  	if cas.elem != nil {
	 469  		typedmemclr(c.elemtype, cas.elem)
	 470  	}
	 471  	if raceenabled {
	 472  		raceacquire(c.raceaddr())
	 473  	}
	 474  	goto retc
	 475  
	 476  send:
	 477  	// can send to a sleeping receiver (sg)
	 478  	if raceenabled {
	 479  		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	 480  	}
	 481  	if msanenabled {
	 482  		msanread(cas.elem, c.elemtype.size)
	 483  	}
	 484  	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	 485  	if debugSelect {
	 486  		print("syncsend: cas0=", cas0, " c=", c, "\n")
	 487  	}
	 488  	goto retc
	 489  
	 490  retc:
	 491  	if caseReleaseTime > 0 {
	 492  		blockevent(caseReleaseTime-t0, 1)
	 493  	}
	 494  	return casi, recvOK
	 495  
	 496  sclose:
	 497  	// send on closed channel
	 498  	selunlock(scases, lockorder)
	 499  	panic(plainError("send on closed channel"))
	 500  }
	 501  
	 502  func (c *hchan) sortkey() uintptr {
	 503  	return uintptr(unsafe.Pointer(c))
	 504  }
	 505  
	 506  // A runtimeSelect is a single case passed to rselect.
	 507  // This must match ../reflect/value.go:/runtimeSelect
	 508  type runtimeSelect struct {
	 509  	dir selectDir
	 510  	typ unsafe.Pointer // channel type (not used here)
	 511  	ch	*hchan				 // channel
	 512  	val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
	 513  }
	 514  
	 515  // These values must match ../reflect/value.go:/SelectDir.
	 516  type selectDir int
	 517  
	 518  const (
	 519  	_						 selectDir = iota
	 520  	selectSend							// case Chan <- Send
	 521  	selectRecv							// case <-Chan:
	 522  	selectDefault					 // default
	 523  )
	 524  
	 525  //go:linkname reflect_rselect reflect.rselect
	 526  func reflect_rselect(cases []runtimeSelect) (int, bool) {
	 527  	if len(cases) == 0 {
	 528  		block()
	 529  	}
	 530  	sel := make([]scase, len(cases))
	 531  	orig := make([]int, len(cases))
	 532  	nsends, nrecvs := 0, 0
	 533  	dflt := -1
	 534  	for i, rc := range cases {
	 535  		var j int
	 536  		switch rc.dir {
	 537  		case selectDefault:
	 538  			dflt = i
	 539  			continue
	 540  		case selectSend:
	 541  			j = nsends
	 542  			nsends++
	 543  		case selectRecv:
	 544  			nrecvs++
	 545  			j = len(cases) - nrecvs
	 546  		}
	 547  
	 548  		sel[j] = scase{c: rc.ch, elem: rc.val}
	 549  		orig[j] = i
	 550  	}
	 551  
	 552  	// Only a default case.
	 553  	if nsends+nrecvs == 0 {
	 554  		return dflt, false
	 555  	}
	 556  
	 557  	// Compact sel and orig if necessary.
	 558  	if nsends+nrecvs < len(cases) {
	 559  		copy(sel[nsends:], sel[len(cases)-nrecvs:])
	 560  		copy(orig[nsends:], orig[len(cases)-nrecvs:])
	 561  	}
	 562  
	 563  	order := make([]uint16, 2*(nsends+nrecvs))
	 564  	var pc0 *uintptr
	 565  	if raceenabled {
	 566  		pcs := make([]uintptr, nsends+nrecvs)
	 567  		for i := range pcs {
	 568  			selectsetpc(&pcs[i])
	 569  		}
	 570  		pc0 = &pcs[0]
	 571  	}
	 572  
	 573  	chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
	 574  
	 575  	// Translate chosen back to caller's ordering.
	 576  	if chosen < 0 {
	 577  		chosen = dflt
	 578  	} else {
	 579  		chosen = orig[chosen]
	 580  	}
	 581  	return chosen, recvOK
	 582  }
	 583  
	 584  func (q *waitq) dequeueSudoG(sgp *sudog) {
	 585  	x := sgp.prev
	 586  	y := sgp.next
	 587  	if x != nil {
	 588  		if y != nil {
	 589  			// middle of queue
	 590  			x.next = y
	 591  			y.prev = x
	 592  			sgp.next = nil
	 593  			sgp.prev = nil
	 594  			return
	 595  		}
	 596  		// end of queue
	 597  		x.next = nil
	 598  		q.last = x
	 599  		sgp.prev = nil
	 600  		return
	 601  	}
	 602  	if y != nil {
	 603  		// start of queue
	 604  		y.prev = nil
	 605  		q.first = y
	 606  		sgp.next = nil
	 607  		return
	 608  	}
	 609  
	 610  	// x==y==nil. Either sgp is the only element in the queue,
	 611  	// or it has already been removed. Use q.first to disambiguate.
	 612  	if q.first == sgp {
	 613  		q.first = nil
	 614  		q.last = nil
	 615  	}
	 616  }
	 617  

View as plain text