Source file
src/runtime/chan.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "runtime/internal/atomic"
22 "runtime/internal/math"
23 "unsafe"
24 )
25
26 const (
27 maxAlign = 8
28 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
29 debugChan = false
30 )
31
32 type hchan struct {
33 qcount uint
34 dataqsiz uint
35 buf unsafe.Pointer
36 elemsize uint16
37 closed uint32
38 elemtype *_type
39 sendx uint
40 recvx uint
41 recvq waitq
42 sendq waitq
43
44
45
46
47
48
49
50 lock mutex
51 }
52
53 type waitq struct {
54 first *sudog
55 last *sudog
56 }
57
58
59 func reflect_makechan(t *chantype, size int) *hchan {
60 return makechan(t, size)
61 }
62
63 func makechan64(t *chantype, size int64) *hchan {
64 if int64(int(size)) != size {
65 panic(plainError("makechan: size out of range"))
66 }
67
68 return makechan(t, int(size))
69 }
70
71 func makechan(t *chantype, size int) *hchan {
72 elem := t.elem
73
74
75 if elem.size >= 1<<16 {
76 throw("makechan: invalid channel element type")
77 }
78 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
79 throw("makechan: bad alignment")
80 }
81
82 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
83 if overflow || mem > maxAlloc-hchanSize || size < 0 {
84 panic(plainError("makechan: size out of range"))
85 }
86
87
88
89
90
91 var c *hchan
92 switch {
93 case mem == 0:
94
95 c = (*hchan)(mallocgc(hchanSize, nil, true))
96
97 c.buf = c.raceaddr()
98 case elem.ptrdata == 0:
99
100
101 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
102 c.buf = add(unsafe.Pointer(c), hchanSize)
103 default:
104
105 c = new(hchan)
106 c.buf = mallocgc(mem, elem, true)
107 }
108
109 c.elemsize = uint16(elem.size)
110 c.elemtype = elem
111 c.dataqsiz = uint(size)
112 lockInit(&c.lock, lockRankHchan)
113
114 if debugChan {
115 print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
116 }
117 return c
118 }
119
120
121 func chanbuf(c *hchan, i uint) unsafe.Pointer {
122 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
123 }
124
125
126
127
128
129 func full(c *hchan) bool {
130
131
132 if c.dataqsiz == 0 {
133
134 return c.recvq.first == nil
135 }
136
137 return c.qcount == c.dataqsiz
138 }
139
140
141
142 func chansend1(c *hchan, elem unsafe.Pointer) {
143 chansend(c, elem, true, getcallerpc())
144 }
145
146
158 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
159 if c == nil {
160 if !block {
161 return false
162 }
163 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
164 throw("unreachable")
165 }
166
167 if debugChan {
168 print("chansend: chan=", c, "\n")
169 }
170
171 if raceenabled {
172 racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 if !block && c.closed == 0 && full(c) {
192 return false
193 }
194
195 var t0 int64
196 if blockprofilerate > 0 {
197 t0 = cputicks()
198 }
199
200 lock(&c.lock)
201
202 if c.closed != 0 {
203 unlock(&c.lock)
204 panic(plainError("send on closed channel"))
205 }
206
207 if sg := c.recvq.dequeue(); sg != nil {
208
209
210 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
211 return true
212 }
213
214 if c.qcount < c.dataqsiz {
215
216 qp := chanbuf(c, c.sendx)
217 if raceenabled {
218 racenotify(c, c.sendx, nil)
219 }
220 typedmemmove(c.elemtype, qp, ep)
221 c.sendx++
222 if c.sendx == c.dataqsiz {
223 c.sendx = 0
224 }
225 c.qcount++
226 unlock(&c.lock)
227 return true
228 }
229
230 if !block {
231 unlock(&c.lock)
232 return false
233 }
234
235
236 gp := getg()
237 mysg := acquireSudog()
238 mysg.releasetime = 0
239 if t0 != 0 {
240 mysg.releasetime = -1
241 }
242
243
244 mysg.elem = ep
245 mysg.waitlink = nil
246 mysg.g = gp
247 mysg.isSelect = false
248 mysg.c = c
249 gp.waiting = mysg
250 gp.param = nil
251 c.sendq.enqueue(mysg)
252
253
254
255
256 atomic.Store8(&gp.parkingOnChan, 1)
257 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
258
259
260
261
262 KeepAlive(ep)
263
264
265 if mysg != gp.waiting {
266 throw("G waiting list is corrupted")
267 }
268 gp.waiting = nil
269 gp.activeStackChans = false
270 closed := !mysg.success
271 gp.param = nil
272 if mysg.releasetime > 0 {
273 blockevent(mysg.releasetime-t0, 2)
274 }
275 mysg.c = nil
276 releaseSudog(mysg)
277 if closed {
278 if c.closed == 0 {
279 throw("chansend: spurious wakeup")
280 }
281 panic(plainError("send on closed channel"))
282 }
283 return true
284 }
285
286
287
288
289
290
291
292 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
293 if raceenabled {
294 if c.dataqsiz == 0 {
295 racesync(c, sg)
296 } else {
297
298
299
300 racenotify(c, c.recvx, nil)
301 racenotify(c, c.recvx, sg)
302 c.recvx++
303 if c.recvx == c.dataqsiz {
304 c.recvx = 0
305 }
306 c.sendx = c.recvx
307 }
308 }
309 if sg.elem != nil {
310 sendDirect(c.elemtype, sg, ep)
311 sg.elem = nil
312 }
313 gp := sg.g
314 unlockf()
315 gp.param = unsafe.Pointer(sg)
316 sg.success = true
317 if sg.releasetime != 0 {
318 sg.releasetime = cputicks()
319 }
320 goready(gp, skip+1)
321 }
322
323
324
325
326
327
328
329
330
331
332
333 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
334
335
336
337
338
339 dst := sg.elem
340 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
341
342
343 memmove(dst, src, t.size)
344 }
345
346 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
347
348
349
350 src := sg.elem
351 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
352 memmove(dst, src, t.size)
353 }
354
355 func closechan(c *hchan) {
356 if c == nil {
357 panic(plainError("close of nil channel"))
358 }
359
360 lock(&c.lock)
361 if c.closed != 0 {
362 unlock(&c.lock)
363 panic(plainError("close of closed channel"))
364 }
365
366 if raceenabled {
367 callerpc := getcallerpc()
368 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
369 racerelease(c.raceaddr())
370 }
371
372 c.closed = 1
373
374 var glist gList
375
376
377 for {
378 sg := c.recvq.dequeue()
379 if sg == nil {
380 break
381 }
382 if sg.elem != nil {
383 typedmemclr(c.elemtype, sg.elem)
384 sg.elem = nil
385 }
386 if sg.releasetime != 0 {
387 sg.releasetime = cputicks()
388 }
389 gp := sg.g
390 gp.param = unsafe.Pointer(sg)
391 sg.success = false
392 if raceenabled {
393 raceacquireg(gp, c.raceaddr())
394 }
395 glist.push(gp)
396 }
397
398
399 for {
400 sg := c.sendq.dequeue()
401 if sg == nil {
402 break
403 }
404 sg.elem = nil
405 if sg.releasetime != 0 {
406 sg.releasetime = cputicks()
407 }
408 gp := sg.g
409 gp.param = unsafe.Pointer(sg)
410 sg.success = false
411 if raceenabled {
412 raceacquireg(gp, c.raceaddr())
413 }
414 glist.push(gp)
415 }
416 unlock(&c.lock)
417
418
419 for !glist.empty() {
420 gp := glist.pop()
421 gp.schedlink = 0
422 goready(gp, 3)
423 }
424 }
425
426
427
428 func empty(c *hchan) bool {
429
430 if c.dataqsiz == 0 {
431 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
432 }
433 return atomic.Loaduint(&c.qcount) == 0
434 }
435
436
437
438 func chanrecv1(c *hchan, elem unsafe.Pointer) {
439 chanrecv(c, elem, true)
440 }
441
442
443 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
444 _, received = chanrecv(c, elem, true)
445 return
446 }
447
448
449
450
451
452
453
454 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
455
456
457
458 if debugChan {
459 print("chanrecv: chan=", c, "\n")
460 }
461
462 if c == nil {
463 if !block {
464 return
465 }
466 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
467 throw("unreachable")
468 }
469
470
471 if !block && empty(c) {
472
473
474
475
476
477
478
479
480
481 if atomic.Load(&c.closed) == 0 {
482
483
484
485
486 return
487 }
488
489
490
491 if empty(c) {
492
493 if raceenabled {
494 raceacquire(c.raceaddr())
495 }
496 if ep != nil {
497 typedmemclr(c.elemtype, ep)
498 }
499 return true, false
500 }
501 }
502
503 var t0 int64
504 if blockprofilerate > 0 {
505 t0 = cputicks()
506 }
507
508 lock(&c.lock)
509
510 if c.closed != 0 && c.qcount == 0 {
511 if raceenabled {
512 raceacquire(c.raceaddr())
513 }
514 unlock(&c.lock)
515 if ep != nil {
516 typedmemclr(c.elemtype, ep)
517 }
518 return true, false
519 }
520
521 if sg := c.sendq.dequeue(); sg != nil {
522
523
524
525
526 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
527 return true, true
528 }
529
530 if c.qcount > 0 {
531
532 qp := chanbuf(c, c.recvx)
533 if raceenabled {
534 racenotify(c, c.recvx, nil)
535 }
536 if ep != nil {
537 typedmemmove(c.elemtype, ep, qp)
538 }
539 typedmemclr(c.elemtype, qp)
540 c.recvx++
541 if c.recvx == c.dataqsiz {
542 c.recvx = 0
543 }
544 c.qcount--
545 unlock(&c.lock)
546 return true, true
547 }
548
549 if !block {
550 unlock(&c.lock)
551 return false, false
552 }
553
554
555 gp := getg()
556 mysg := acquireSudog()
557 mysg.releasetime = 0
558 if t0 != 0 {
559 mysg.releasetime = -1
560 }
561
562
563 mysg.elem = ep
564 mysg.waitlink = nil
565 gp.waiting = mysg
566 mysg.g = gp
567 mysg.isSelect = false
568 mysg.c = c
569 gp.param = nil
570 c.recvq.enqueue(mysg)
571
572
573
574
575 atomic.Store8(&gp.parkingOnChan, 1)
576 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
577
578
579 if mysg != gp.waiting {
580 throw("G waiting list is corrupted")
581 }
582 gp.waiting = nil
583 gp.activeStackChans = false
584 if mysg.releasetime > 0 {
585 blockevent(mysg.releasetime-t0, 2)
586 }
587 success := mysg.success
588 gp.param = nil
589 mysg.c = nil
590 releaseSudog(mysg)
591 return true, success
592 }
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
608 if c.dataqsiz == 0 {
609 if raceenabled {
610 racesync(c, sg)
611 }
612 if ep != nil {
613
614 recvDirect(c.elemtype, sg, ep)
615 }
616 } else {
617
618
619
620
621 qp := chanbuf(c, c.recvx)
622 if raceenabled {
623 racenotify(c, c.recvx, nil)
624 racenotify(c, c.recvx, sg)
625 }
626
627 if ep != nil {
628 typedmemmove(c.elemtype, ep, qp)
629 }
630
631 typedmemmove(c.elemtype, qp, sg.elem)
632 c.recvx++
633 if c.recvx == c.dataqsiz {
634 c.recvx = 0
635 }
636 c.sendx = c.recvx
637 }
638 sg.elem = nil
639 gp := sg.g
640 unlockf()
641 gp.param = unsafe.Pointer(sg)
642 sg.success = true
643 if sg.releasetime != 0 {
644 sg.releasetime = cputicks()
645 }
646 goready(gp, skip+1)
647 }
648
649 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
650
651
652
653
654
655 gp.activeStackChans = true
656
657
658
659 atomic.Store8(&gp.parkingOnChan, 0)
660
661
662
663
664
665 unlock((*mutex)(chanLock))
666 return true
667 }
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
687 return chansend(c, elem, false, getcallerpc())
688 }
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
708 return chanrecv(c, elem, false)
709 }
710
711
712 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
713 return chansend(c, elem, !nb, getcallerpc())
714 }
715
716
717 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
718 return chanrecv(c, elem, !nb)
719 }
720
721
722 func reflect_chanlen(c *hchan) int {
723 if c == nil {
724 return 0
725 }
726 return int(c.qcount)
727 }
728
729
730 func reflectlite_chanlen(c *hchan) int {
731 if c == nil {
732 return 0
733 }
734 return int(c.qcount)
735 }
736
737
738 func reflect_chancap(c *hchan) int {
739 if c == nil {
740 return 0
741 }
742 return int(c.dataqsiz)
743 }
744
745
746 func reflect_chanclose(c *hchan) {
747 closechan(c)
748 }
749
750 func (q *waitq) enqueue(sgp *sudog) {
751 sgp.next = nil
752 x := q.last
753 if x == nil {
754 sgp.prev = nil
755 q.first = sgp
756 q.last = sgp
757 return
758 }
759 sgp.prev = x
760 x.next = sgp
761 q.last = sgp
762 }
763
764 func (q *waitq) dequeue() *sudog {
765 for {
766 sgp := q.first
767 if sgp == nil {
768 return nil
769 }
770 y := sgp.next
771 if y == nil {
772 q.first = nil
773 q.last = nil
774 } else {
775 y.prev = nil
776 q.first = y
777 sgp.next = nil
778 }
779
780
781
782
783
784
785
786
787
788 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
789 continue
790 }
791
792 return sgp
793 }
794 }
795
796 func (c *hchan) raceaddr() unsafe.Pointer {
797
798
799
800
801
802 return unsafe.Pointer(&c.buf)
803 }
804
805 func racesync(c *hchan, sg *sudog) {
806 racerelease(chanbuf(c, 0))
807 raceacquireg(sg.g, chanbuf(c, 0))
808 racereleaseg(sg.g, chanbuf(c, 0))
809 raceacquire(chanbuf(c, 0))
810 }
811
812
813
814
815 func racenotify(c *hchan, idx uint, sg *sudog) {
816
817
818
819
820
821
822
823 qp := chanbuf(c, idx)
824
825
826
827
828
829
830 if c.elemsize == 0 {
831 if sg == nil {
832 raceacquire(qp)
833 racerelease(qp)
834 } else {
835 raceacquireg(sg.g, qp)
836 racereleaseg(sg.g, qp)
837 }
838 } else {
839 if sg == nil {
840 racereleaseacquire(qp)
841 } else {
842 racereleaseacquireg(sg.g, qp)
843 }
844 }
845 }
846
View as plain text