...

Source file src/sync/pool_test.go

Documentation: sync

		 1  // Copyright 2013 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  // Pool is no-op under race detector, so all these tests do not work.
		 6  //go:build !race
		 7  // +build !race
		 8  
		 9  package sync_test
		10  
		11  import (
		12  	"runtime"
		13  	"runtime/debug"
		14  	"sort"
		15  	. "sync"
		16  	"sync/atomic"
		17  	"testing"
		18  	"time"
		19  )
		20  
		21  func TestPool(t *testing.T) {
		22  	// disable GC so we can control when it happens.
		23  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
		24  	var p Pool
		25  	if p.Get() != nil {
		26  		t.Fatal("expected empty")
		27  	}
		28  
		29  	// Make sure that the goroutine doesn't migrate to another P
		30  	// between Put and Get calls.
		31  	Runtime_procPin()
		32  	p.Put("a")
		33  	p.Put("b")
		34  	if g := p.Get(); g != "a" {
		35  		t.Fatalf("got %#v; want a", g)
		36  	}
		37  	if g := p.Get(); g != "b" {
		38  		t.Fatalf("got %#v; want b", g)
		39  	}
		40  	if g := p.Get(); g != nil {
		41  		t.Fatalf("got %#v; want nil", g)
		42  	}
		43  	Runtime_procUnpin()
		44  
		45  	// Put in a large number of objects so they spill into
		46  	// stealable space.
		47  	for i := 0; i < 100; i++ {
		48  		p.Put("c")
		49  	}
		50  	// After one GC, the victim cache should keep them alive.
		51  	runtime.GC()
		52  	if g := p.Get(); g != "c" {
		53  		t.Fatalf("got %#v; want c after GC", g)
		54  	}
		55  	// A second GC should drop the victim cache.
		56  	runtime.GC()
		57  	if g := p.Get(); g != nil {
		58  		t.Fatalf("got %#v; want nil after second GC", g)
		59  	}
		60  }
		61  
		62  func TestPoolNew(t *testing.T) {
		63  	// disable GC so we can control when it happens.
		64  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
		65  
		66  	i := 0
		67  	p := Pool{
		68  		New: func() interface{} {
		69  			i++
		70  			return i
		71  		},
		72  	}
		73  	if v := p.Get(); v != 1 {
		74  		t.Fatalf("got %v; want 1", v)
		75  	}
		76  	if v := p.Get(); v != 2 {
		77  		t.Fatalf("got %v; want 2", v)
		78  	}
		79  
		80  	// Make sure that the goroutine doesn't migrate to another P
		81  	// between Put and Get calls.
		82  	Runtime_procPin()
		83  	p.Put(42)
		84  	if v := p.Get(); v != 42 {
		85  		t.Fatalf("got %v; want 42", v)
		86  	}
		87  	Runtime_procUnpin()
		88  
		89  	if v := p.Get(); v != 3 {
		90  		t.Fatalf("got %v; want 3", v)
		91  	}
		92  }
		93  
		94  // Test that Pool does not hold pointers to previously cached resources.
		95  func TestPoolGC(t *testing.T) {
		96  	testPool(t, true)
		97  }
		98  
		99  // Test that Pool releases resources on GC.
	 100  func TestPoolRelease(t *testing.T) {
	 101  	testPool(t, false)
	 102  }
	 103  
	 104  func testPool(t *testing.T, drain bool) {
	 105  	var p Pool
	 106  	const N = 100
	 107  loop:
	 108  	for try := 0; try < 3; try++ {
	 109  		if try == 1 && testing.Short() {
	 110  			break
	 111  		}
	 112  		var fin, fin1 uint32
	 113  		for i := 0; i < N; i++ {
	 114  			v := new(string)
	 115  			runtime.SetFinalizer(v, func(vv *string) {
	 116  				atomic.AddUint32(&fin, 1)
	 117  			})
	 118  			p.Put(v)
	 119  		}
	 120  		if drain {
	 121  			for i := 0; i < N; i++ {
	 122  				p.Get()
	 123  			}
	 124  		}
	 125  		for i := 0; i < 5; i++ {
	 126  			runtime.GC()
	 127  			time.Sleep(time.Duration(i*100+10) * time.Millisecond)
	 128  			// 1 pointer can remain on stack or elsewhere
	 129  			if fin1 = atomic.LoadUint32(&fin); fin1 >= N-1 {
	 130  				continue loop
	 131  			}
	 132  		}
	 133  		t.Fatalf("only %v out of %v resources are finalized on try %v", fin1, N, try)
	 134  	}
	 135  }
	 136  
	 137  func TestPoolStress(t *testing.T) {
	 138  	const P = 10
	 139  	N := int(1e6)
	 140  	if testing.Short() {
	 141  		N /= 100
	 142  	}
	 143  	var p Pool
	 144  	done := make(chan bool)
	 145  	for i := 0; i < P; i++ {
	 146  		go func() {
	 147  			var v interface{} = 0
	 148  			for j := 0; j < N; j++ {
	 149  				if v == nil {
	 150  					v = 0
	 151  				}
	 152  				p.Put(v)
	 153  				v = p.Get()
	 154  				if v != nil && v.(int) != 0 {
	 155  					t.Errorf("expect 0, got %v", v)
	 156  					break
	 157  				}
	 158  			}
	 159  			done <- true
	 160  		}()
	 161  	}
	 162  	for i := 0; i < P; i++ {
	 163  		<-done
	 164  	}
	 165  }
	 166  
	 167  func TestPoolDequeue(t *testing.T) {
	 168  	testPoolDequeue(t, NewPoolDequeue(16))
	 169  }
	 170  
	 171  func TestPoolChain(t *testing.T) {
	 172  	testPoolDequeue(t, NewPoolChain())
	 173  }
	 174  
	 175  func testPoolDequeue(t *testing.T, d PoolDequeue) {
	 176  	const P = 10
	 177  	var N int = 2e6
	 178  	if testing.Short() {
	 179  		N = 1e3
	 180  	}
	 181  	have := make([]int32, N)
	 182  	var stop int32
	 183  	var wg WaitGroup
	 184  	record := func(val int) {
	 185  		atomic.AddInt32(&have[val], 1)
	 186  		if val == N-1 {
	 187  			atomic.StoreInt32(&stop, 1)
	 188  		}
	 189  	}
	 190  
	 191  	// Start P-1 consumers.
	 192  	for i := 1; i < P; i++ {
	 193  		wg.Add(1)
	 194  		go func() {
	 195  			fail := 0
	 196  			for atomic.LoadInt32(&stop) == 0 {
	 197  				val, ok := d.PopTail()
	 198  				if ok {
	 199  					fail = 0
	 200  					record(val.(int))
	 201  				} else {
	 202  					// Speed up the test by
	 203  					// allowing the pusher to run.
	 204  					if fail++; fail%100 == 0 {
	 205  						runtime.Gosched()
	 206  					}
	 207  				}
	 208  			}
	 209  			wg.Done()
	 210  		}()
	 211  	}
	 212  
	 213  	// Start 1 producer.
	 214  	nPopHead := 0
	 215  	wg.Add(1)
	 216  	go func() {
	 217  		for j := 0; j < N; j++ {
	 218  			for !d.PushHead(j) {
	 219  				// Allow a popper to run.
	 220  				runtime.Gosched()
	 221  			}
	 222  			if j%10 == 0 {
	 223  				val, ok := d.PopHead()
	 224  				if ok {
	 225  					nPopHead++
	 226  					record(val.(int))
	 227  				}
	 228  			}
	 229  		}
	 230  		wg.Done()
	 231  	}()
	 232  	wg.Wait()
	 233  
	 234  	// Check results.
	 235  	for i, count := range have {
	 236  		if count != 1 {
	 237  			t.Errorf("expected have[%d] = 1, got %d", i, count)
	 238  		}
	 239  	}
	 240  	// Check that at least some PopHeads succeeded. We skip this
	 241  	// check in short mode because it's common enough that the
	 242  	// queue will stay nearly empty all the time and a PopTail
	 243  	// will happen during the window between every PushHead and
	 244  	// PopHead.
	 245  	if !testing.Short() && nPopHead == 0 {
	 246  		t.Errorf("popHead never succeeded")
	 247  	}
	 248  }
	 249  
	 250  func BenchmarkPool(b *testing.B) {
	 251  	var p Pool
	 252  	b.RunParallel(func(pb *testing.PB) {
	 253  		for pb.Next() {
	 254  			p.Put(1)
	 255  			p.Get()
	 256  		}
	 257  	})
	 258  }
	 259  
	 260  func BenchmarkPoolOverflow(b *testing.B) {
	 261  	var p Pool
	 262  	b.RunParallel(func(pb *testing.PB) {
	 263  		for pb.Next() {
	 264  			for b := 0; b < 100; b++ {
	 265  				p.Put(1)
	 266  			}
	 267  			for b := 0; b < 100; b++ {
	 268  				p.Get()
	 269  			}
	 270  		}
	 271  	})
	 272  }
	 273  
	 274  var globalSink interface{}
	 275  
	 276  func BenchmarkPoolSTW(b *testing.B) {
	 277  	// Take control of GC.
	 278  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
	 279  
	 280  	var mstats runtime.MemStats
	 281  	var pauses []uint64
	 282  
	 283  	var p Pool
	 284  	for i := 0; i < b.N; i++ {
	 285  		// Put a large number of items into a pool.
	 286  		const N = 100000
	 287  		var item interface{} = 42
	 288  		for i := 0; i < N; i++ {
	 289  			p.Put(item)
	 290  		}
	 291  		// Do a GC.
	 292  		runtime.GC()
	 293  		// Record pause time.
	 294  		runtime.ReadMemStats(&mstats)
	 295  		pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
	 296  	}
	 297  
	 298  	// Get pause time stats.
	 299  	sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] })
	 300  	var total uint64
	 301  	for _, ns := range pauses {
	 302  		total += ns
	 303  	}
	 304  	// ns/op for this benchmark is average STW time.
	 305  	b.ReportMetric(float64(total)/float64(b.N), "ns/op")
	 306  	b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
	 307  	b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
	 308  }
	 309  
	 310  func BenchmarkPoolExpensiveNew(b *testing.B) {
	 311  	// Populate a pool with items that are expensive to construct
	 312  	// to stress pool cleanup and subsequent reconstruction.
	 313  
	 314  	// Create a ballast so the GC has a non-zero heap size and
	 315  	// runs at reasonable times.
	 316  	globalSink = make([]byte, 8<<20)
	 317  	defer func() { globalSink = nil }()
	 318  
	 319  	// Create a pool that's "expensive" to fill.
	 320  	var p Pool
	 321  	var nNew uint64
	 322  	p.New = func() interface{} {
	 323  		atomic.AddUint64(&nNew, 1)
	 324  		time.Sleep(time.Millisecond)
	 325  		return 42
	 326  	}
	 327  	var mstats1, mstats2 runtime.MemStats
	 328  	runtime.ReadMemStats(&mstats1)
	 329  	b.RunParallel(func(pb *testing.PB) {
	 330  		// Simulate 100X the number of goroutines having items
	 331  		// checked out from the Pool simultaneously.
	 332  		items := make([]interface{}, 100)
	 333  		var sink []byte
	 334  		for pb.Next() {
	 335  			// Stress the pool.
	 336  			for i := range items {
	 337  				items[i] = p.Get()
	 338  				// Simulate doing some work with this
	 339  				// item checked out.
	 340  				sink = make([]byte, 32<<10)
	 341  			}
	 342  			for i, v := range items {
	 343  				p.Put(v)
	 344  				items[i] = nil
	 345  			}
	 346  		}
	 347  		_ = sink
	 348  	})
	 349  	runtime.ReadMemStats(&mstats2)
	 350  
	 351  	b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op")
	 352  	b.ReportMetric(float64(nNew)/float64(b.N), "New/op")
	 353  }
	 354  

View as plain text