...

Source file src/runtime/chan.go

Documentation: runtime

		 1  // Copyright 2014 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package runtime
		 6  
		 7  // This file contains the implementation of Go channels.
		 8  
		 9  // Invariants:
		10  //	At least one of c.sendq and c.recvq is empty,
		11  //	except for the case of an unbuffered channel with a single goroutine
		12  //	blocked on it for both sending and receiving using a select statement,
		13  //	in which case the length of c.sendq and c.recvq is limited only by the
		14  //	size of the select statement.
		15  //
		16  // For buffered channels, also:
		17  //	c.qcount > 0 implies that c.recvq is empty.
		18  //	c.qcount < c.dataqsiz implies that c.sendq is empty.
		19  
		20  import (
		21  	"runtime/internal/atomic"
		22  	"runtime/internal/math"
		23  	"unsafe"
		24  )
		25  
		26  const (
		27  	maxAlign	= 8
		28  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
		29  	debugChan = false
		30  )
		31  
		32  type hchan struct {
		33  	qcount	 uint					 // total data in the queue
		34  	dataqsiz uint					 // size of the circular queue
		35  	buf			unsafe.Pointer // points to an array of dataqsiz elements
		36  	elemsize uint16
		37  	closed	 uint32
		38  	elemtype *_type // element type
		39  	sendx		uint	 // send index
		40  	recvx		uint	 // receive index
		41  	recvq		waitq	// list of recv waiters
		42  	sendq		waitq	// list of send waiters
		43  
		44  	// lock protects all fields in hchan, as well as several
		45  	// fields in sudogs blocked on this channel.
		46  	//
		47  	// Do not change another G's status while holding this lock
		48  	// (in particular, do not ready a G), as this can deadlock
		49  	// with stack shrinking.
		50  	lock mutex
		51  }
		52  
		53  type waitq struct {
		54  	first *sudog
		55  	last	*sudog
		56  }
		57  
		58  //go:linkname reflect_makechan reflect.makechan
		59  func reflect_makechan(t *chantype, size int) *hchan {
		60  	return makechan(t, size)
		61  }
		62  
		63  func makechan64(t *chantype, size int64) *hchan {
		64  	if int64(int(size)) != size {
		65  		panic(plainError("makechan: size out of range"))
		66  	}
		67  
		68  	return makechan(t, int(size))
		69  }
		70  
		71  func makechan(t *chantype, size int) *hchan {
		72  	elem := t.elem
		73  
		74  	// compiler checks this but be safe.
		75  	if elem.size >= 1<<16 {
		76  		throw("makechan: invalid channel element type")
		77  	}
		78  	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		79  		throw("makechan: bad alignment")
		80  	}
		81  
		82  	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
		83  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		84  		panic(plainError("makechan: size out of range"))
		85  	}
		86  
		87  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
		88  	// buf points into the same allocation, elemtype is persistent.
		89  	// SudoG's are referenced from their owning thread so they can't be collected.
		90  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
		91  	var c *hchan
		92  	switch {
		93  	case mem == 0:
		94  		// Queue or element size is zero.
		95  		c = (*hchan)(mallocgc(hchanSize, nil, true))
		96  		// Race detector uses this location for synchronization.
		97  		c.buf = c.raceaddr()
		98  	case elem.ptrdata == 0:
		99  		// Elements do not contain pointers.
	 100  		// Allocate hchan and buf in one call.
	 101  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
	 102  		c.buf = add(unsafe.Pointer(c), hchanSize)
	 103  	default:
	 104  		// Elements contain pointers.
	 105  		c = new(hchan)
	 106  		c.buf = mallocgc(mem, elem, true)
	 107  	}
	 108  
	 109  	c.elemsize = uint16(elem.size)
	 110  	c.elemtype = elem
	 111  	c.dataqsiz = uint(size)
	 112  	lockInit(&c.lock, lockRankHchan)
	 113  
	 114  	if debugChan {
	 115  		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	 116  	}
	 117  	return c
	 118  }
	 119  
	 120  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
	 121  func chanbuf(c *hchan, i uint) unsafe.Pointer {
	 122  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
	 123  }
	 124  
	 125  // full reports whether a send on c would block (that is, the channel is full).
	 126  // It uses a single word-sized read of mutable state, so although
	 127  // the answer is instantaneously true, the correct answer may have changed
	 128  // by the time the calling function receives the return value.
	 129  func full(c *hchan) bool {
	 130  	// c.dataqsiz is immutable (never written after the channel is created)
	 131  	// so it is safe to read at any time during channel operation.
	 132  	if c.dataqsiz == 0 {
	 133  		// Assumes that a pointer read is relaxed-atomic.
	 134  		return c.recvq.first == nil
	 135  	}
	 136  	// Assumes that a uint read is relaxed-atomic.
	 137  	return c.qcount == c.dataqsiz
	 138  }
	 139  
	 140  // entry point for c <- x from compiled code
	 141  //go:nosplit
	 142  func chansend1(c *hchan, elem unsafe.Pointer) {
	 143  	chansend(c, elem, true, getcallerpc())
	 144  }
	 145  
	 146  /*
	 147   * generic single channel send/recv
	 148   * If block is not nil,
	 149   * then the protocol will not
	 150   * sleep but return if it could
	 151   * not complete.
	 152   *
	 153   * sleep can wake up with g.param == nil
	 154   * when a channel involved in the sleep has
	 155   * been closed.	it is easiest to loop and re-run
	 156   * the operation; we'll see that it's now closed.
	 157   */
	 158  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	 159  	if c == nil {
	 160  		if !block {
	 161  			return false
	 162  		}
	 163  		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
	 164  		throw("unreachable")
	 165  	}
	 166  
	 167  	if debugChan {
	 168  		print("chansend: chan=", c, "\n")
	 169  	}
	 170  
	 171  	if raceenabled {
	 172  		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	 173  	}
	 174  
	 175  	// Fast path: check for failed non-blocking operation without acquiring the lock.
	 176  	//
	 177  	// After observing that the channel is not closed, we observe that the channel is
	 178  	// not ready for sending. Each of these observations is a single word-sized read
	 179  	// (first c.closed and second full()).
	 180  	// Because a closed channel cannot transition from 'ready for sending' to
	 181  	// 'not ready for sending', even if the channel is closed between the two observations,
	 182  	// they imply a moment between the two when the channel was both not yet closed
	 183  	// and not ready for sending. We behave as if we observed the channel at that moment,
	 184  	// and report that the send cannot proceed.
	 185  	//
	 186  	// It is okay if the reads are reordered here: if we observe that the channel is not
	 187  	// ready for sending and then observe that it is not closed, that implies that the
	 188  	// channel wasn't closed during the first observation. However, nothing here
	 189  	// guarantees forward progress. We rely on the side effects of lock release in
	 190  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	 191  	if !block && c.closed == 0 && full(c) {
	 192  		return false
	 193  	}
	 194  
	 195  	var t0 int64
	 196  	if blockprofilerate > 0 {
	 197  		t0 = cputicks()
	 198  	}
	 199  
	 200  	lock(&c.lock)
	 201  
	 202  	if c.closed != 0 {
	 203  		unlock(&c.lock)
	 204  		panic(plainError("send on closed channel"))
	 205  	}
	 206  
	 207  	if sg := c.recvq.dequeue(); sg != nil {
	 208  		// Found a waiting receiver. We pass the value we want to send
	 209  		// directly to the receiver, bypassing the channel buffer (if any).
	 210  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
	 211  		return true
	 212  	}
	 213  
	 214  	if c.qcount < c.dataqsiz {
	 215  		// Space is available in the channel buffer. Enqueue the element to send.
	 216  		qp := chanbuf(c, c.sendx)
	 217  		if raceenabled {
	 218  			racenotify(c, c.sendx, nil)
	 219  		}
	 220  		typedmemmove(c.elemtype, qp, ep)
	 221  		c.sendx++
	 222  		if c.sendx == c.dataqsiz {
	 223  			c.sendx = 0
	 224  		}
	 225  		c.qcount++
	 226  		unlock(&c.lock)
	 227  		return true
	 228  	}
	 229  
	 230  	if !block {
	 231  		unlock(&c.lock)
	 232  		return false
	 233  	}
	 234  
	 235  	// Block on the channel. Some receiver will complete our operation for us.
	 236  	gp := getg()
	 237  	mysg := acquireSudog()
	 238  	mysg.releasetime = 0
	 239  	if t0 != 0 {
	 240  		mysg.releasetime = -1
	 241  	}
	 242  	// No stack splits between assigning elem and enqueuing mysg
	 243  	// on gp.waiting where copystack can find it.
	 244  	mysg.elem = ep
	 245  	mysg.waitlink = nil
	 246  	mysg.g = gp
	 247  	mysg.isSelect = false
	 248  	mysg.c = c
	 249  	gp.waiting = mysg
	 250  	gp.param = nil
	 251  	c.sendq.enqueue(mysg)
	 252  	// Signal to anyone trying to shrink our stack that we're about
	 253  	// to park on a channel. The window between when this G's status
	 254  	// changes and when we set gp.activeStackChans is not safe for
	 255  	// stack shrinking.
	 256  	atomic.Store8(&gp.parkingOnChan, 1)
	 257  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	 258  	// Ensure the value being sent is kept alive until the
	 259  	// receiver copies it out. The sudog has a pointer to the
	 260  	// stack object, but sudogs aren't considered as roots of the
	 261  	// stack tracer.
	 262  	KeepAlive(ep)
	 263  
	 264  	// someone woke us up.
	 265  	if mysg != gp.waiting {
	 266  		throw("G waiting list is corrupted")
	 267  	}
	 268  	gp.waiting = nil
	 269  	gp.activeStackChans = false
	 270  	closed := !mysg.success
	 271  	gp.param = nil
	 272  	if mysg.releasetime > 0 {
	 273  		blockevent(mysg.releasetime-t0, 2)
	 274  	}
	 275  	mysg.c = nil
	 276  	releaseSudog(mysg)
	 277  	if closed {
	 278  		if c.closed == 0 {
	 279  			throw("chansend: spurious wakeup")
	 280  		}
	 281  		panic(plainError("send on closed channel"))
	 282  	}
	 283  	return true
	 284  }
	 285  
	 286  // send processes a send operation on an empty channel c.
	 287  // The value ep sent by the sender is copied to the receiver sg.
	 288  // The receiver is then woken up to go on its merry way.
	 289  // Channel c must be empty and locked.	send unlocks c with unlockf.
	 290  // sg must already be dequeued from c.
	 291  // ep must be non-nil and point to the heap or the caller's stack.
	 292  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	 293  	if raceenabled {
	 294  		if c.dataqsiz == 0 {
	 295  			racesync(c, sg)
	 296  		} else {
	 297  			// Pretend we go through the buffer, even though
	 298  			// we copy directly. Note that we need to increment
	 299  			// the head/tail locations only when raceenabled.
	 300  			racenotify(c, c.recvx, nil)
	 301  			racenotify(c, c.recvx, sg)
	 302  			c.recvx++
	 303  			if c.recvx == c.dataqsiz {
	 304  				c.recvx = 0
	 305  			}
	 306  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	 307  		}
	 308  	}
	 309  	if sg.elem != nil {
	 310  		sendDirect(c.elemtype, sg, ep)
	 311  		sg.elem = nil
	 312  	}
	 313  	gp := sg.g
	 314  	unlockf()
	 315  	gp.param = unsafe.Pointer(sg)
	 316  	sg.success = true
	 317  	if sg.releasetime != 0 {
	 318  		sg.releasetime = cputicks()
	 319  	}
	 320  	goready(gp, skip+1)
	 321  }
	 322  
	 323  // Sends and receives on unbuffered or empty-buffered channels are the
	 324  // only operations where one running goroutine writes to the stack of
	 325  // another running goroutine. The GC assumes that stack writes only
	 326  // happen when the goroutine is running and are only done by that
	 327  // goroutine. Using a write barrier is sufficient to make up for
	 328  // violating that assumption, but the write barrier has to work.
	 329  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
	 330  // are not in the heap, so that will not help. We arrange to call
	 331  // memmove and typeBitsBulkBarrier instead.
	 332  
	 333  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	 334  	// src is on our stack, dst is a slot on another stack.
	 335  
	 336  	// Once we read sg.elem out of sg, it will no longer
	 337  	// be updated if the destination's stack gets copied (shrunk).
	 338  	// So make sure that no preemption points can happen between read & use.
	 339  	dst := sg.elem
	 340  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	 341  	// No need for cgo write barrier checks because dst is always
	 342  	// Go memory.
	 343  	memmove(dst, src, t.size)
	 344  }
	 345  
	 346  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	 347  	// dst is on our stack or the heap, src is on another stack.
	 348  	// The channel is locked, so src will not move during this
	 349  	// operation.
	 350  	src := sg.elem
	 351  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	 352  	memmove(dst, src, t.size)
	 353  }
	 354  
	 355  func closechan(c *hchan) {
	 356  	if c == nil {
	 357  		panic(plainError("close of nil channel"))
	 358  	}
	 359  
	 360  	lock(&c.lock)
	 361  	if c.closed != 0 {
	 362  		unlock(&c.lock)
	 363  		panic(plainError("close of closed channel"))
	 364  	}
	 365  
	 366  	if raceenabled {
	 367  		callerpc := getcallerpc()
	 368  		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
	 369  		racerelease(c.raceaddr())
	 370  	}
	 371  
	 372  	c.closed = 1
	 373  
	 374  	var glist gList
	 375  
	 376  	// release all readers
	 377  	for {
	 378  		sg := c.recvq.dequeue()
	 379  		if sg == nil {
	 380  			break
	 381  		}
	 382  		if sg.elem != nil {
	 383  			typedmemclr(c.elemtype, sg.elem)
	 384  			sg.elem = nil
	 385  		}
	 386  		if sg.releasetime != 0 {
	 387  			sg.releasetime = cputicks()
	 388  		}
	 389  		gp := sg.g
	 390  		gp.param = unsafe.Pointer(sg)
	 391  		sg.success = false
	 392  		if raceenabled {
	 393  			raceacquireg(gp, c.raceaddr())
	 394  		}
	 395  		glist.push(gp)
	 396  	}
	 397  
	 398  	// release all writers (they will panic)
	 399  	for {
	 400  		sg := c.sendq.dequeue()
	 401  		if sg == nil {
	 402  			break
	 403  		}
	 404  		sg.elem = nil
	 405  		if sg.releasetime != 0 {
	 406  			sg.releasetime = cputicks()
	 407  		}
	 408  		gp := sg.g
	 409  		gp.param = unsafe.Pointer(sg)
	 410  		sg.success = false
	 411  		if raceenabled {
	 412  			raceacquireg(gp, c.raceaddr())
	 413  		}
	 414  		glist.push(gp)
	 415  	}
	 416  	unlock(&c.lock)
	 417  
	 418  	// Ready all Gs now that we've dropped the channel lock.
	 419  	for !glist.empty() {
	 420  		gp := glist.pop()
	 421  		gp.schedlink = 0
	 422  		goready(gp, 3)
	 423  	}
	 424  }
	 425  
	 426  // empty reports whether a read from c would block (that is, the channel is
	 427  // empty).	It uses a single atomic read of mutable state.
	 428  func empty(c *hchan) bool {
	 429  	// c.dataqsiz is immutable.
	 430  	if c.dataqsiz == 0 {
	 431  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	 432  	}
	 433  	return atomic.Loaduint(&c.qcount) == 0
	 434  }
	 435  
	 436  // entry points for <- c from compiled code
	 437  //go:nosplit
	 438  func chanrecv1(c *hchan, elem unsafe.Pointer) {
	 439  	chanrecv(c, elem, true)
	 440  }
	 441  
	 442  //go:nosplit
	 443  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	 444  	_, received = chanrecv(c, elem, true)
	 445  	return
	 446  }
	 447  
	 448  // chanrecv receives on channel c and writes the received data to ep.
	 449  // ep may be nil, in which case received data is ignored.
	 450  // If block == false and no elements are available, returns (false, false).
	 451  // Otherwise, if c is closed, zeros *ep and returns (true, false).
	 452  // Otherwise, fills in *ep with an element and returns (true, true).
	 453  // A non-nil ep must point to the heap or the caller's stack.
	 454  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	 455  	// raceenabled: don't need to check ep, as it is always on the stack
	 456  	// or is new memory allocated by reflect.
	 457  
	 458  	if debugChan {
	 459  		print("chanrecv: chan=", c, "\n")
	 460  	}
	 461  
	 462  	if c == nil {
	 463  		if !block {
	 464  			return
	 465  		}
	 466  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
	 467  		throw("unreachable")
	 468  	}
	 469  
	 470  	// Fast path: check for failed non-blocking operation without acquiring the lock.
	 471  	if !block && empty(c) {
	 472  		// After observing that the channel is not ready for receiving, we observe whether the
	 473  		// channel is closed.
	 474  		//
	 475  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
	 476  		// For example, if the channel was open and not empty, was closed, and then drained,
	 477  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
	 478  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
	 479  		// separate critical sections under the same lock.	This assumption fails when closing
	 480  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
	 481  		if atomic.Load(&c.closed) == 0 {
	 482  			// Because a channel cannot be reopened, the later observation of the channel
	 483  			// being not closed implies that it was also not closed at the moment of the
	 484  			// first observation. We behave as if we observed the channel at that moment
	 485  			// and report that the receive cannot proceed.
	 486  			return
	 487  		}
	 488  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
	 489  		// to receive, which could have arrived between the empty and closed checks above.
	 490  		// Sequential consistency is also required here, when racing with such a send.
	 491  		if empty(c) {
	 492  			// The channel is irreversibly closed and empty.
	 493  			if raceenabled {
	 494  				raceacquire(c.raceaddr())
	 495  			}
	 496  			if ep != nil {
	 497  				typedmemclr(c.elemtype, ep)
	 498  			}
	 499  			return true, false
	 500  		}
	 501  	}
	 502  
	 503  	var t0 int64
	 504  	if blockprofilerate > 0 {
	 505  		t0 = cputicks()
	 506  	}
	 507  
	 508  	lock(&c.lock)
	 509  
	 510  	if c.closed != 0 && c.qcount == 0 {
	 511  		if raceenabled {
	 512  			raceacquire(c.raceaddr())
	 513  		}
	 514  		unlock(&c.lock)
	 515  		if ep != nil {
	 516  			typedmemclr(c.elemtype, ep)
	 517  		}
	 518  		return true, false
	 519  	}
	 520  
	 521  	if sg := c.sendq.dequeue(); sg != nil {
	 522  		// Found a waiting sender. If buffer is size 0, receive value
	 523  		// directly from sender. Otherwise, receive from head of queue
	 524  		// and add sender's value to the tail of the queue (both map to
	 525  		// the same buffer slot because the queue is full).
	 526  		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
	 527  		return true, true
	 528  	}
	 529  
	 530  	if c.qcount > 0 {
	 531  		// Receive directly from queue
	 532  		qp := chanbuf(c, c.recvx)
	 533  		if raceenabled {
	 534  			racenotify(c, c.recvx, nil)
	 535  		}
	 536  		if ep != nil {
	 537  			typedmemmove(c.elemtype, ep, qp)
	 538  		}
	 539  		typedmemclr(c.elemtype, qp)
	 540  		c.recvx++
	 541  		if c.recvx == c.dataqsiz {
	 542  			c.recvx = 0
	 543  		}
	 544  		c.qcount--
	 545  		unlock(&c.lock)
	 546  		return true, true
	 547  	}
	 548  
	 549  	if !block {
	 550  		unlock(&c.lock)
	 551  		return false, false
	 552  	}
	 553  
	 554  	// no sender available: block on this channel.
	 555  	gp := getg()
	 556  	mysg := acquireSudog()
	 557  	mysg.releasetime = 0
	 558  	if t0 != 0 {
	 559  		mysg.releasetime = -1
	 560  	}
	 561  	// No stack splits between assigning elem and enqueuing mysg
	 562  	// on gp.waiting where copystack can find it.
	 563  	mysg.elem = ep
	 564  	mysg.waitlink = nil
	 565  	gp.waiting = mysg
	 566  	mysg.g = gp
	 567  	mysg.isSelect = false
	 568  	mysg.c = c
	 569  	gp.param = nil
	 570  	c.recvq.enqueue(mysg)
	 571  	// Signal to anyone trying to shrink our stack that we're about
	 572  	// to park on a channel. The window between when this G's status
	 573  	// changes and when we set gp.activeStackChans is not safe for
	 574  	// stack shrinking.
	 575  	atomic.Store8(&gp.parkingOnChan, 1)
	 576  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
	 577  
	 578  	// someone woke us up
	 579  	if mysg != gp.waiting {
	 580  		throw("G waiting list is corrupted")
	 581  	}
	 582  	gp.waiting = nil
	 583  	gp.activeStackChans = false
	 584  	if mysg.releasetime > 0 {
	 585  		blockevent(mysg.releasetime-t0, 2)
	 586  	}
	 587  	success := mysg.success
	 588  	gp.param = nil
	 589  	mysg.c = nil
	 590  	releaseSudog(mysg)
	 591  	return true, success
	 592  }
	 593  
	 594  // recv processes a receive operation on a full channel c.
	 595  // There are 2 parts:
	 596  // 1) The value sent by the sender sg is put into the channel
	 597  //		and the sender is woken up to go on its merry way.
	 598  // 2) The value received by the receiver (the current G) is
	 599  //		written to ep.
	 600  // For synchronous channels, both values are the same.
	 601  // For asynchronous channels, the receiver gets its data from
	 602  // the channel buffer and the sender's data is put in the
	 603  // channel buffer.
	 604  // Channel c must be full and locked. recv unlocks c with unlockf.
	 605  // sg must already be dequeued from c.
	 606  // A non-nil ep must point to the heap or the caller's stack.
	 607  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	 608  	if c.dataqsiz == 0 {
	 609  		if raceenabled {
	 610  			racesync(c, sg)
	 611  		}
	 612  		if ep != nil {
	 613  			// copy data from sender
	 614  			recvDirect(c.elemtype, sg, ep)
	 615  		}
	 616  	} else {
	 617  		// Queue is full. Take the item at the
	 618  		// head of the queue. Make the sender enqueue
	 619  		// its item at the tail of the queue. Since the
	 620  		// queue is full, those are both the same slot.
	 621  		qp := chanbuf(c, c.recvx)
	 622  		if raceenabled {
	 623  			racenotify(c, c.recvx, nil)
	 624  			racenotify(c, c.recvx, sg)
	 625  		}
	 626  		// copy data from queue to receiver
	 627  		if ep != nil {
	 628  			typedmemmove(c.elemtype, ep, qp)
	 629  		}
	 630  		// copy data from sender to queue
	 631  		typedmemmove(c.elemtype, qp, sg.elem)
	 632  		c.recvx++
	 633  		if c.recvx == c.dataqsiz {
	 634  			c.recvx = 0
	 635  		}
	 636  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	 637  	}
	 638  	sg.elem = nil
	 639  	gp := sg.g
	 640  	unlockf()
	 641  	gp.param = unsafe.Pointer(sg)
	 642  	sg.success = true
	 643  	if sg.releasetime != 0 {
	 644  		sg.releasetime = cputicks()
	 645  	}
	 646  	goready(gp, skip+1)
	 647  }
	 648  
	 649  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
	 650  	// There are unlocked sudogs that point into gp's stack. Stack
	 651  	// copying must lock the channels of those sudogs.
	 652  	// Set activeStackChans here instead of before we try parking
	 653  	// because we could self-deadlock in stack growth on the
	 654  	// channel lock.
	 655  	gp.activeStackChans = true
	 656  	// Mark that it's safe for stack shrinking to occur now,
	 657  	// because any thread acquiring this G's stack for shrinking
	 658  	// is guaranteed to observe activeStackChans after this store.
	 659  	atomic.Store8(&gp.parkingOnChan, 0)
	 660  	// Make sure we unlock after setting activeStackChans and
	 661  	// unsetting parkingOnChan. The moment we unlock chanLock
	 662  	// we risk gp getting readied by a channel operation and
	 663  	// so gp could continue running before everything before
	 664  	// the unlock is visible (even to gp itself).
	 665  	unlock((*mutex)(chanLock))
	 666  	return true
	 667  }
	 668  
	 669  // compiler implements
	 670  //
	 671  //	select {
	 672  //	case c <- v:
	 673  //		... foo
	 674  //	default:
	 675  //		... bar
	 676  //	}
	 677  //
	 678  // as
	 679  //
	 680  //	if selectnbsend(c, v) {
	 681  //		... foo
	 682  //	} else {
	 683  //		... bar
	 684  //	}
	 685  //
	 686  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	 687  	return chansend(c, elem, false, getcallerpc())
	 688  }
	 689  
	 690  // compiler implements
	 691  //
	 692  //	select {
	 693  //	case v, ok = <-c:
	 694  //		... foo
	 695  //	default:
	 696  //		... bar
	 697  //	}
	 698  //
	 699  // as
	 700  //
	 701  //	if selected, ok = selectnbrecv(&v, c); selected {
	 702  //		... foo
	 703  //	} else {
	 704  //		... bar
	 705  //	}
	 706  //
	 707  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	 708  	return chanrecv(c, elem, false)
	 709  }
	 710  
	 711  //go:linkname reflect_chansend reflect.chansend
	 712  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
	 713  	return chansend(c, elem, !nb, getcallerpc())
	 714  }
	 715  
	 716  //go:linkname reflect_chanrecv reflect.chanrecv
	 717  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
	 718  	return chanrecv(c, elem, !nb)
	 719  }
	 720  
	 721  //go:linkname reflect_chanlen reflect.chanlen
	 722  func reflect_chanlen(c *hchan) int {
	 723  	if c == nil {
	 724  		return 0
	 725  	}
	 726  	return int(c.qcount)
	 727  }
	 728  
	 729  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
	 730  func reflectlite_chanlen(c *hchan) int {
	 731  	if c == nil {
	 732  		return 0
	 733  	}
	 734  	return int(c.qcount)
	 735  }
	 736  
	 737  //go:linkname reflect_chancap reflect.chancap
	 738  func reflect_chancap(c *hchan) int {
	 739  	if c == nil {
	 740  		return 0
	 741  	}
	 742  	return int(c.dataqsiz)
	 743  }
	 744  
	 745  //go:linkname reflect_chanclose reflect.chanclose
	 746  func reflect_chanclose(c *hchan) {
	 747  	closechan(c)
	 748  }
	 749  
	 750  func (q *waitq) enqueue(sgp *sudog) {
	 751  	sgp.next = nil
	 752  	x := q.last
	 753  	if x == nil {
	 754  		sgp.prev = nil
	 755  		q.first = sgp
	 756  		q.last = sgp
	 757  		return
	 758  	}
	 759  	sgp.prev = x
	 760  	x.next = sgp
	 761  	q.last = sgp
	 762  }
	 763  
	 764  func (q *waitq) dequeue() *sudog {
	 765  	for {
	 766  		sgp := q.first
	 767  		if sgp == nil {
	 768  			return nil
	 769  		}
	 770  		y := sgp.next
	 771  		if y == nil {
	 772  			q.first = nil
	 773  			q.last = nil
	 774  		} else {
	 775  			y.prev = nil
	 776  			q.first = y
	 777  			sgp.next = nil // mark as removed (see dequeueSudog)
	 778  		}
	 779  
	 780  		// if a goroutine was put on this queue because of a
	 781  		// select, there is a small window between the goroutine
	 782  		// being woken up by a different case and it grabbing the
	 783  		// channel locks. Once it has the lock
	 784  		// it removes itself from the queue, so we won't see it after that.
	 785  		// We use a flag in the G struct to tell us when someone
	 786  		// else has won the race to signal this goroutine but the goroutine
	 787  		// hasn't removed itself from the queue yet.
	 788  		if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
	 789  			continue
	 790  		}
	 791  
	 792  		return sgp
	 793  	}
	 794  }
	 795  
	 796  func (c *hchan) raceaddr() unsafe.Pointer {
	 797  	// Treat read-like and write-like operations on the channel to
	 798  	// happen at this address. Avoid using the address of qcount
	 799  	// or dataqsiz, because the len() and cap() builtins read
	 800  	// those addresses, and we don't want them racing with
	 801  	// operations like close().
	 802  	return unsafe.Pointer(&c.buf)
	 803  }
	 804  
	 805  func racesync(c *hchan, sg *sudog) {
	 806  	racerelease(chanbuf(c, 0))
	 807  	raceacquireg(sg.g, chanbuf(c, 0))
	 808  	racereleaseg(sg.g, chanbuf(c, 0))
	 809  	raceacquire(chanbuf(c, 0))
	 810  }
	 811  
	 812  // Notify the race detector of a send or receive involving buffer entry idx
	 813  // and a channel c or its communicating partner sg.
	 814  // This function handles the special case of c.elemsize==0.
	 815  func racenotify(c *hchan, idx uint, sg *sudog) {
	 816  	// We could have passed the unsafe.Pointer corresponding to entry idx
	 817  	// instead of idx itself.	However, in a future version of this function,
	 818  	// we can use idx to better handle the case of elemsize==0.
	 819  	// A future improvement to the detector is to call TSan with c and idx:
	 820  	// this way, Go will continue to not allocating buffer entries for channels
	 821  	// of elemsize==0, yet the race detector can be made to handle multiple
	 822  	// sync objects underneath the hood (one sync object per idx)
	 823  	qp := chanbuf(c, idx)
	 824  	// When elemsize==0, we don't allocate a full buffer for the channel.
	 825  	// Instead of individual buffer entries, the race detector uses the
	 826  	// c.buf as the only buffer entry.	This simplification prevents us from
	 827  	// following the memory model's happens-before rules (rules that are
	 828  	// implemented in racereleaseacquire).	Instead, we accumulate happens-before
	 829  	// information in the synchronization object associated with c.buf.
	 830  	if c.elemsize == 0 {
	 831  		if sg == nil {
	 832  			raceacquire(qp)
	 833  			racerelease(qp)
	 834  		} else {
	 835  			raceacquireg(sg.g, qp)
	 836  			racereleaseg(sg.g, qp)
	 837  		}
	 838  	} else {
	 839  		if sg == nil {
	 840  			racereleaseacquire(qp)
	 841  		} else {
	 842  			racereleaseacquireg(sg.g, qp)
	 843  		}
	 844  	}
	 845  }
	 846  

View as plain text