...

Source file src/runtime/profbuf.go

Documentation: runtime

		 1  // Copyright 2017 The Go Authors.	All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  package runtime
		 6  
		 7  import (
		 8  	"runtime/internal/atomic"
		 9  	"unsafe"
		10  )
		11  
		12  // A profBuf is a lock-free buffer for profiling events,
		13  // safe for concurrent use by one reader and one writer.
		14  // The writer may be a signal handler running without a user g.
		15  // The reader is assumed to be a user g.
		16  //
		17  // Each logged event corresponds to a fixed size header, a list of
		18  // uintptrs (typically a stack), and exactly one unsafe.Pointer tag.
		19  // The header and uintptrs are stored in the circular buffer data and the
		20  // tag is stored in a circular buffer tags, running in parallel.
		21  // In the circular buffer data, each event takes 2+hdrsize+len(stk)
		22  // words: the value 2+hdrsize+len(stk), then the time of the event, then
		23  // hdrsize words giving the fixed-size header, and then len(stk) words
		24  // for the stack.
		25  //
		26  // The current effective offsets into the tags and data circular buffers
		27  // for reading and writing are stored in the high 30 and low 32 bits of r and w.
		28  // The bottom bits of the high 32 are additional flag bits in w, unused in r.
		29  // "Effective" offsets means the total number of reads or writes, mod 2^length.
		30  // The offset in the buffer is the effective offset mod the length of the buffer.
		31  // To make wraparound mod 2^length match wraparound mod length of the buffer,
		32  // the length of the buffer must be a power of two.
		33  //
		34  // If the reader catches up to the writer, a flag passed to read controls
		35  // whether the read blocks until more data is available. A read returns a
		36  // pointer to the buffer data itself; the caller is assumed to be done with
		37  // that data at the next read. The read offset rNext tracks the next offset to
		38  // be returned by read. By definition, r ≤ rNext ≤ w (before wraparound),
		39  // and rNext is only used by the reader, so it can be accessed without atomics.
		40  //
		41  // If the writer gets ahead of the reader, so that the buffer fills,
		42  // future writes are discarded and replaced in the output stream by an
		43  // overflow entry, which has size 2+hdrsize+1, time set to the time of
		44  // the first discarded write, a header of all zeroed words, and a "stack"
		45  // containing one word, the number of discarded writes.
		46  //
		47  // Between the time the buffer fills and the buffer becomes empty enough
		48  // to hold more data, the overflow entry is stored as a pending overflow
		49  // entry in the fields overflow and overflowTime. The pending overflow
		50  // entry can be turned into a real record by either the writer or the
		51  // reader. If the writer is called to write a new record and finds that
		52  // the output buffer has room for both the pending overflow entry and the
		53  // new record, the writer emits the pending overflow entry and the new
		54  // record into the buffer. If the reader is called to read data and finds
		55  // that the output buffer is empty but that there is a pending overflow
		56  // entry, the reader will return a synthesized record for the pending
		57  // overflow entry.
		58  //
		59  // Only the writer can create or add to a pending overflow entry, but
		60  // either the reader or the writer can clear the pending overflow entry.
		61  // A pending overflow entry is indicated by the low 32 bits of 'overflow'
		62  // holding the number of discarded writes, and overflowTime holding the
		63  // time of the first discarded write. The high 32 bits of 'overflow'
		64  // increment each time the low 32 bits transition from zero to non-zero
		65  // or vice versa. This sequence number avoids ABA problems in the use of
		66  // compare-and-swap to coordinate between reader and writer.
		67  // The overflowTime is only written when the low 32 bits of overflow are
		68  // zero, that is, only when there is no pending overflow entry, in
		69  // preparation for creating a new one. The reader can therefore fetch and
		70  // clear the entry atomically using
		71  //
		72  //	for {
		73  //		overflow = load(&b.overflow)
		74  //		if uint32(overflow) == 0 {
		75  //			// no pending entry
		76  //			break
		77  //		}
		78  //		time = load(&b.overflowTime)
		79  //		if cas(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
		80  //			// pending entry cleared
		81  //			break
		82  //		}
		83  //	}
		84  //	if uint32(overflow) > 0 {
		85  //		emit entry for uint32(overflow), time
		86  //	}
		87  //
		88  type profBuf struct {
		89  	// accessed atomically
		90  	r, w				 profAtomic
		91  	overflow		 uint64
		92  	overflowTime uint64
		93  	eof					uint32
		94  
		95  	// immutable (excluding slice content)
		96  	hdrsize uintptr
		97  	data		[]uint64
		98  	tags		[]unsafe.Pointer
		99  
	 100  	// owned by reader
	 101  	rNext			 profIndex
	 102  	overflowBuf []uint64 // for use by reader to return overflow record
	 103  	wait				note
	 104  }
	 105  
	 106  // A profAtomic is the atomically-accessed word holding a profIndex.
	 107  type profAtomic uint64
	 108  
	 109  // A profIndex is the packet tag and data counts and flags bits, described above.
	 110  type profIndex uint64
	 111  
	 112  const (
	 113  	profReaderSleeping profIndex = 1 << 32 // reader is sleeping and must be woken up
	 114  	profWriteExtra		 profIndex = 1 << 33 // overflow or eof waiting
	 115  )
	 116  
	 117  func (x *profAtomic) load() profIndex {
	 118  	return profIndex(atomic.Load64((*uint64)(x)))
	 119  }
	 120  
	 121  func (x *profAtomic) store(new profIndex) {
	 122  	atomic.Store64((*uint64)(x), uint64(new))
	 123  }
	 124  
	 125  func (x *profAtomic) cas(old, new profIndex) bool {
	 126  	return atomic.Cas64((*uint64)(x), uint64(old), uint64(new))
	 127  }
	 128  
	 129  func (x profIndex) dataCount() uint32 {
	 130  	return uint32(x)
	 131  }
	 132  
	 133  func (x profIndex) tagCount() uint32 {
	 134  	return uint32(x >> 34)
	 135  }
	 136  
	 137  // countSub subtracts two counts obtained from profIndex.dataCount or profIndex.tagCount,
	 138  // assuming that they are no more than 2^29 apart (guaranteed since they are never more than
	 139  // len(data) or len(tags) apart, respectively).
	 140  // tagCount wraps at 2^30, while dataCount wraps at 2^32.
	 141  // This function works for both.
	 142  func countSub(x, y uint32) int {
	 143  	// x-y is 32-bit signed or 30-bit signed; sign-extend to 32 bits and convert to int.
	 144  	return int(int32(x-y) << 2 >> 2)
	 145  }
	 146  
	 147  // addCountsAndClearFlags returns the packed form of "x + (data, tag) - all flags".
	 148  func (x profIndex) addCountsAndClearFlags(data, tag int) profIndex {
	 149  	return profIndex((uint64(x)>>34+uint64(uint32(tag)<<2>>2))<<34 | uint64(uint32(x)+uint32(data)))
	 150  }
	 151  
	 152  // hasOverflow reports whether b has any overflow records pending.
	 153  func (b *profBuf) hasOverflow() bool {
	 154  	return uint32(atomic.Load64(&b.overflow)) > 0
	 155  }
	 156  
	 157  // takeOverflow consumes the pending overflow records, returning the overflow count
	 158  // and the time of the first overflow.
	 159  // When called by the reader, it is racing against incrementOverflow.
	 160  func (b *profBuf) takeOverflow() (count uint32, time uint64) {
	 161  	overflow := atomic.Load64(&b.overflow)
	 162  	time = atomic.Load64(&b.overflowTime)
	 163  	for {
	 164  		count = uint32(overflow)
	 165  		if count == 0 {
	 166  			time = 0
	 167  			break
	 168  		}
	 169  		// Increment generation, clear overflow count in low bits.
	 170  		if atomic.Cas64(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
	 171  			break
	 172  		}
	 173  		overflow = atomic.Load64(&b.overflow)
	 174  		time = atomic.Load64(&b.overflowTime)
	 175  	}
	 176  	return uint32(overflow), time
	 177  }
	 178  
	 179  // incrementOverflow records a single overflow at time now.
	 180  // It is racing against a possible takeOverflow in the reader.
	 181  func (b *profBuf) incrementOverflow(now int64) {
	 182  	for {
	 183  		overflow := atomic.Load64(&b.overflow)
	 184  
	 185  		// Once we see b.overflow reach 0, it's stable: no one else is changing it underfoot.
	 186  		// We need to set overflowTime if we're incrementing b.overflow from 0.
	 187  		if uint32(overflow) == 0 {
	 188  			// Store overflowTime first so it's always available when overflow != 0.
	 189  			atomic.Store64(&b.overflowTime, uint64(now))
	 190  			atomic.Store64(&b.overflow, (((overflow>>32)+1)<<32)+1)
	 191  			break
	 192  		}
	 193  		// Otherwise we're racing to increment against reader
	 194  		// who wants to set b.overflow to 0.
	 195  		// Out of paranoia, leave 2³²-1 a sticky overflow value,
	 196  		// to avoid wrapping around. Extremely unlikely.
	 197  		if int32(overflow) == -1 {
	 198  			break
	 199  		}
	 200  		if atomic.Cas64(&b.overflow, overflow, overflow+1) {
	 201  			break
	 202  		}
	 203  	}
	 204  }
	 205  
	 206  // newProfBuf returns a new profiling buffer with room for
	 207  // a header of hdrsize words and a buffer of at least bufwords words.
	 208  func newProfBuf(hdrsize, bufwords, tags int) *profBuf {
	 209  	if min := 2 + hdrsize + 1; bufwords < min {
	 210  		bufwords = min
	 211  	}
	 212  
	 213  	// Buffer sizes must be power of two, so that we don't have to
	 214  	// worry about uint32 wraparound changing the effective position
	 215  	// within the buffers. We store 30 bits of count; limiting to 28
	 216  	// gives us some room for intermediate calculations.
	 217  	if bufwords >= 1<<28 || tags >= 1<<28 {
	 218  		throw("newProfBuf: buffer too large")
	 219  	}
	 220  	var i int
	 221  	for i = 1; i < bufwords; i <<= 1 {
	 222  	}
	 223  	bufwords = i
	 224  	for i = 1; i < tags; i <<= 1 {
	 225  	}
	 226  	tags = i
	 227  
	 228  	b := new(profBuf)
	 229  	b.hdrsize = uintptr(hdrsize)
	 230  	b.data = make([]uint64, bufwords)
	 231  	b.tags = make([]unsafe.Pointer, tags)
	 232  	b.overflowBuf = make([]uint64, 2+b.hdrsize+1)
	 233  	return b
	 234  }
	 235  
	 236  // canWriteRecord reports whether the buffer has room
	 237  // for a single contiguous record with a stack of length nstk.
	 238  func (b *profBuf) canWriteRecord(nstk int) bool {
	 239  	br := b.r.load()
	 240  	bw := b.w.load()
	 241  
	 242  	// room for tag?
	 243  	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 1 {
	 244  		return false
	 245  	}
	 246  
	 247  	// room for data?
	 248  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
	 249  	want := 2 + int(b.hdrsize) + nstk
	 250  	i := int(bw.dataCount() % uint32(len(b.data)))
	 251  	if i+want > len(b.data) {
	 252  		// Can't fit in trailing fragment of slice.
	 253  		// Skip over that and start over at beginning of slice.
	 254  		nd -= len(b.data) - i
	 255  	}
	 256  	return nd >= want
	 257  }
	 258  
	 259  // canWriteTwoRecords reports whether the buffer has room
	 260  // for two records with stack lengths nstk1, nstk2, in that order.
	 261  // Each record must be contiguous on its own, but the two
	 262  // records need not be contiguous (one can be at the end of the buffer
	 263  // and the other can wrap around and start at the beginning of the buffer).
	 264  func (b *profBuf) canWriteTwoRecords(nstk1, nstk2 int) bool {
	 265  	br := b.r.load()
	 266  	bw := b.w.load()
	 267  
	 268  	// room for tag?
	 269  	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 2 {
	 270  		return false
	 271  	}
	 272  
	 273  	// room for data?
	 274  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
	 275  
	 276  	// first record
	 277  	want := 2 + int(b.hdrsize) + nstk1
	 278  	i := int(bw.dataCount() % uint32(len(b.data)))
	 279  	if i+want > len(b.data) {
	 280  		// Can't fit in trailing fragment of slice.
	 281  		// Skip over that and start over at beginning of slice.
	 282  		nd -= len(b.data) - i
	 283  		i = 0
	 284  	}
	 285  	i += want
	 286  	nd -= want
	 287  
	 288  	// second record
	 289  	want = 2 + int(b.hdrsize) + nstk2
	 290  	if i+want > len(b.data) {
	 291  		// Can't fit in trailing fragment of slice.
	 292  		// Skip over that and start over at beginning of slice.
	 293  		nd -= len(b.data) - i
	 294  		i = 0
	 295  	}
	 296  	return nd >= want
	 297  }
	 298  
	 299  // write writes an entry to the profiling buffer b.
	 300  // The entry begins with a fixed hdr, which must have
	 301  // length b.hdrsize, followed by a variable-sized stack
	 302  // and a single tag pointer *tagPtr (or nil if tagPtr is nil).
	 303  // No write barriers allowed because this might be called from a signal handler.
	 304  func (b *profBuf) write(tagPtr *unsafe.Pointer, now int64, hdr []uint64, stk []uintptr) {
	 305  	if b == nil {
	 306  		return
	 307  	}
	 308  	if len(hdr) > int(b.hdrsize) {
	 309  		throw("misuse of profBuf.write")
	 310  	}
	 311  
	 312  	if hasOverflow := b.hasOverflow(); hasOverflow && b.canWriteTwoRecords(1, len(stk)) {
	 313  		// Room for both an overflow record and the one being written.
	 314  		// Write the overflow record if the reader hasn't gotten to it yet.
	 315  		// Only racing against reader, not other writers.
	 316  		count, time := b.takeOverflow()
	 317  		if count > 0 {
	 318  			var stk [1]uintptr
	 319  			stk[0] = uintptr(count)
	 320  			b.write(nil, int64(time), nil, stk[:])
	 321  		}
	 322  	} else if hasOverflow || !b.canWriteRecord(len(stk)) {
	 323  		// Pending overflow without room to write overflow and new records
	 324  		// or no overflow but also no room for new record.
	 325  		b.incrementOverflow(now)
	 326  		b.wakeupExtra()
	 327  		return
	 328  	}
	 329  
	 330  	// There's room: write the record.
	 331  	br := b.r.load()
	 332  	bw := b.w.load()
	 333  
	 334  	// Profiling tag
	 335  	//
	 336  	// The tag is a pointer, but we can't run a write barrier here.
	 337  	// We have interrupted the OS-level execution of gp, but the
	 338  	// runtime still sees gp as executing. In effect, we are running
	 339  	// in place of the real gp. Since gp is the only goroutine that
	 340  	// can overwrite gp.labels, the value of gp.labels is stable during
	 341  	// this signal handler: it will still be reachable from gp when
	 342  	// we finish executing. If a GC is in progress right now, it must
	 343  	// keep gp.labels alive, because gp.labels is reachable from gp.
	 344  	// If gp were to overwrite gp.labels, the deletion barrier would
	 345  	// still shade that pointer, which would preserve it for the
	 346  	// in-progress GC, so all is well. Any future GC will see the
	 347  	// value we copied when scanning b.tags (heap-allocated).
	 348  	// We arrange that the store here is always overwriting a nil,
	 349  	// so there is no need for a deletion barrier on b.tags[wt].
	 350  	wt := int(bw.tagCount() % uint32(len(b.tags)))
	 351  	if tagPtr != nil {
	 352  		*(*uintptr)(unsafe.Pointer(&b.tags[wt])) = uintptr(unsafe.Pointer(*tagPtr))
	 353  	}
	 354  
	 355  	// Main record.
	 356  	// It has to fit in a contiguous section of the slice, so if it doesn't fit at the end,
	 357  	// leave a rewind marker (0) and start over at the beginning of the slice.
	 358  	wd := int(bw.dataCount() % uint32(len(b.data)))
	 359  	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
	 360  	skip := 0
	 361  	if wd+2+int(b.hdrsize)+len(stk) > len(b.data) {
	 362  		b.data[wd] = 0
	 363  		skip = len(b.data) - wd
	 364  		nd -= skip
	 365  		wd = 0
	 366  	}
	 367  	data := b.data[wd:]
	 368  	data[0] = uint64(2 + b.hdrsize + uintptr(len(stk))) // length
	 369  	data[1] = uint64(now)															 // time stamp
	 370  	// header, zero-padded
	 371  	i := uintptr(copy(data[2:2+b.hdrsize], hdr))
	 372  	for ; i < b.hdrsize; i++ {
	 373  		data[2+i] = 0
	 374  	}
	 375  	for i, pc := range stk {
	 376  		data[2+b.hdrsize+uintptr(i)] = uint64(pc)
	 377  	}
	 378  
	 379  	for {
	 380  		// Commit write.
	 381  		// Racing with reader setting flag bits in b.w, to avoid lost wakeups.
	 382  		old := b.w.load()
	 383  		new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1)
	 384  		if !b.w.cas(old, new) {
	 385  			continue
	 386  		}
	 387  		// If there was a reader, wake it up.
	 388  		if old&profReaderSleeping != 0 {
	 389  			notewakeup(&b.wait)
	 390  		}
	 391  		break
	 392  	}
	 393  }
	 394  
	 395  // close signals that there will be no more writes on the buffer.
	 396  // Once all the data has been read from the buffer, reads will return eof=true.
	 397  func (b *profBuf) close() {
	 398  	if atomic.Load(&b.eof) > 0 {
	 399  		throw("runtime: profBuf already closed")
	 400  	}
	 401  	atomic.Store(&b.eof, 1)
	 402  	b.wakeupExtra()
	 403  }
	 404  
	 405  // wakeupExtra must be called after setting one of the "extra"
	 406  // atomic fields b.overflow or b.eof.
	 407  // It records the change in b.w and wakes up the reader if needed.
	 408  func (b *profBuf) wakeupExtra() {
	 409  	for {
	 410  		old := b.w.load()
	 411  		new := old | profWriteExtra
	 412  		if !b.w.cas(old, new) {
	 413  			continue
	 414  		}
	 415  		if old&profReaderSleeping != 0 {
	 416  			notewakeup(&b.wait)
	 417  		}
	 418  		break
	 419  	}
	 420  }
	 421  
	 422  // profBufReadMode specifies whether to block when no data is available to read.
	 423  type profBufReadMode int
	 424  
	 425  const (
	 426  	profBufBlocking profBufReadMode = iota
	 427  	profBufNonBlocking
	 428  )
	 429  
	 430  var overflowTag [1]unsafe.Pointer // always nil
	 431  
	 432  func (b *profBuf) read(mode profBufReadMode) (data []uint64, tags []unsafe.Pointer, eof bool) {
	 433  	if b == nil {
	 434  		return nil, nil, true
	 435  	}
	 436  
	 437  	br := b.rNext
	 438  
	 439  	// Commit previous read, returning that part of the ring to the writer.
	 440  	// First clear tags that have now been read, both to avoid holding
	 441  	// up the memory they point at for longer than necessary
	 442  	// and so that b.write can assume it is always overwriting
	 443  	// nil tag entries (see comment in b.write).
	 444  	rPrev := b.r.load()
	 445  	if rPrev != br {
	 446  		ntag := countSub(br.tagCount(), rPrev.tagCount())
	 447  		ti := int(rPrev.tagCount() % uint32(len(b.tags)))
	 448  		for i := 0; i < ntag; i++ {
	 449  			b.tags[ti] = nil
	 450  			if ti++; ti == len(b.tags) {
	 451  				ti = 0
	 452  			}
	 453  		}
	 454  		b.r.store(br)
	 455  	}
	 456  
	 457  Read:
	 458  	bw := b.w.load()
	 459  	numData := countSub(bw.dataCount(), br.dataCount())
	 460  	if numData == 0 {
	 461  		if b.hasOverflow() {
	 462  			// No data to read, but there is overflow to report.
	 463  			// Racing with writer flushing b.overflow into a real record.
	 464  			count, time := b.takeOverflow()
	 465  			if count == 0 {
	 466  				// Lost the race, go around again.
	 467  				goto Read
	 468  			}
	 469  			// Won the race, report overflow.
	 470  			dst := b.overflowBuf
	 471  			dst[0] = uint64(2 + b.hdrsize + 1)
	 472  			dst[1] = uint64(time)
	 473  			for i := uintptr(0); i < b.hdrsize; i++ {
	 474  				dst[2+i] = 0
	 475  			}
	 476  			dst[2+b.hdrsize] = uint64(count)
	 477  			return dst[:2+b.hdrsize+1], overflowTag[:1], false
	 478  		}
	 479  		if atomic.Load(&b.eof) > 0 {
	 480  			// No data, no overflow, EOF set: done.
	 481  			return nil, nil, true
	 482  		}
	 483  		if bw&profWriteExtra != 0 {
	 484  			// Writer claims to have published extra information (overflow or eof).
	 485  			// Attempt to clear notification and then check again.
	 486  			// If we fail to clear the notification it means b.w changed,
	 487  			// so we still need to check again.
	 488  			b.w.cas(bw, bw&^profWriteExtra)
	 489  			goto Read
	 490  		}
	 491  
	 492  		// Nothing to read right now.
	 493  		// Return or sleep according to mode.
	 494  		if mode == profBufNonBlocking {
	 495  			return nil, nil, false
	 496  		}
	 497  		if !b.w.cas(bw, bw|profReaderSleeping) {
	 498  			goto Read
	 499  		}
	 500  		// Committed to sleeping.
	 501  		notetsleepg(&b.wait, -1)
	 502  		noteclear(&b.wait)
	 503  		goto Read
	 504  	}
	 505  	data = b.data[br.dataCount()%uint32(len(b.data)):]
	 506  	if len(data) > numData {
	 507  		data = data[:numData]
	 508  	} else {
	 509  		numData -= len(data) // available in case of wraparound
	 510  	}
	 511  	skip := 0
	 512  	if data[0] == 0 {
	 513  		// Wraparound record. Go back to the beginning of the ring.
	 514  		skip = len(data)
	 515  		data = b.data
	 516  		if len(data) > numData {
	 517  			data = data[:numData]
	 518  		}
	 519  	}
	 520  
	 521  	ntag := countSub(bw.tagCount(), br.tagCount())
	 522  	if ntag == 0 {
	 523  		throw("runtime: malformed profBuf buffer - tag and data out of sync")
	 524  	}
	 525  	tags = b.tags[br.tagCount()%uint32(len(b.tags)):]
	 526  	if len(tags) > ntag {
	 527  		tags = tags[:ntag]
	 528  	}
	 529  
	 530  	// Count out whole data records until either data or tags is done.
	 531  	// They are always in sync in the buffer, but due to an end-of-slice
	 532  	// wraparound we might need to stop early and return the rest
	 533  	// in the next call.
	 534  	di := 0
	 535  	ti := 0
	 536  	for di < len(data) && data[di] != 0 && ti < len(tags) {
	 537  		if uintptr(di)+uintptr(data[di]) > uintptr(len(data)) {
	 538  			throw("runtime: malformed profBuf buffer - invalid size")
	 539  		}
	 540  		di += int(data[di])
	 541  		ti++
	 542  	}
	 543  
	 544  	// Remember how much we returned, to commit read on next call.
	 545  	b.rNext = br.addCountsAndClearFlags(skip+di, ti)
	 546  
	 547  	if raceenabled {
	 548  		// Match racereleasemerge in runtime_setProfLabel,
	 549  		// so that the setting of the labels in runtime_setProfLabel
	 550  		// is treated as happening before any use of the labels
	 551  		// by our caller. The synchronization on labelSync itself is a fiction
	 552  		// for the race detector. The actual synchronization is handled
	 553  		// by the fact that the signal handler only reads from the current
	 554  		// goroutine and uses atomics to write the updated queue indices,
	 555  		// and then the read-out from the signal handler buffer uses
	 556  		// atomics to read those queue indices.
	 557  		raceacquire(unsafe.Pointer(&labelSync))
	 558  	}
	 559  
	 560  	return data[:di], tags[:ti], false
	 561  }
	 562  

View as plain text