Source file
src/runtime/netpoll.go
Documentation: runtime
1
2
3
4
5
6
7
8 package runtime
9
10 import (
11 "runtime/internal/atomic"
12 "unsafe"
13 )
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 const (
43 pollNoError = 0
44 pollErrClosing = 1
45 pollErrTimeout = 2
46 pollErrNotPollable = 3
47 )
48
49
50
51
52
53
54
55
56
57
58
59
60
61 const (
62 pdReady uintptr = 1
63 pdWait uintptr = 2
64 )
65
66 const pollBlockSize = 4 * 1024
67
68
69
70
71
72
73 type pollDesc struct {
74 link *pollDesc
75 fd uintptr
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 atomicInfo uint32
93
94
95
96 rg uintptr
97 wg uintptr
98
99 lock mutex
100 closing bool
101 user uint32
102 rseq uintptr
103 rt timer
104 rd int64
105 wseq uintptr
106 wt timer
107 wd int64
108 self *pollDesc
109 }
110
111
112
113
114
115 type pollInfo uint32
116
117 const (
118 pollClosing = 1 << iota
119 pollEventErr
120 pollExpiredReadDeadline
121 pollExpiredWriteDeadline
122 )
123
124 func (i pollInfo) closing() bool { return i&pollClosing != 0 }
125 func (i pollInfo) eventErr() bool { return i&pollEventErr != 0 }
126 func (i pollInfo) expiredReadDeadline() bool { return i&pollExpiredReadDeadline != 0 }
127 func (i pollInfo) expiredWriteDeadline() bool { return i&pollExpiredWriteDeadline != 0 }
128
129
130 func (pd *pollDesc) info() pollInfo {
131 return pollInfo(atomic.Load(&pd.atomicInfo))
132 }
133
134
135
136
137
138
139
140
141 func (pd *pollDesc) publishInfo() {
142 var info uint32
143 if pd.closing {
144 info |= pollClosing
145 }
146 if pd.rd < 0 {
147 info |= pollExpiredReadDeadline
148 }
149 if pd.wd < 0 {
150 info |= pollExpiredWriteDeadline
151 }
152
153
154 x := atomic.Load(&pd.atomicInfo)
155 for !atomic.Cas(&pd.atomicInfo, x, (x&pollEventErr)|info) {
156 x = atomic.Load(&pd.atomicInfo)
157 }
158 }
159
160
161 func (pd *pollDesc) setEventErr(b bool) {
162 x := atomic.Load(&pd.atomicInfo)
163 for (x&pollEventErr != 0) != b && !atomic.Cas(&pd.atomicInfo, x, x^pollEventErr) {
164 x = atomic.Load(&pd.atomicInfo)
165 }
166 }
167
168 type pollCache struct {
169 lock mutex
170 first *pollDesc
171
172
173
174
175
176 }
177
178 var (
179 netpollInitLock mutex
180 netpollInited uint32
181
182 pollcache pollCache
183 netpollWaiters uint32
184 )
185
186
187 func poll_runtime_pollServerInit() {
188 netpollGenericInit()
189 }
190
191 func netpollGenericInit() {
192 if atomic.Load(&netpollInited) == 0 {
193 lockInit(&netpollInitLock, lockRankNetpollInit)
194 lock(&netpollInitLock)
195 if netpollInited == 0 {
196 netpollinit()
197 atomic.Store(&netpollInited, 1)
198 }
199 unlock(&netpollInitLock)
200 }
201 }
202
203 func netpollinited() bool {
204 return atomic.Load(&netpollInited) != 0
205 }
206
207
208
209
210
211 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
212 return netpollIsPollDescriptor(fd)
213 }
214
215
216 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
217 pd := pollcache.alloc()
218 lock(&pd.lock)
219 wg := atomic.Loaduintptr(&pd.wg)
220 if wg != 0 && wg != pdReady {
221 throw("runtime: blocked write on free polldesc")
222 }
223 rg := atomic.Loaduintptr(&pd.rg)
224 if rg != 0 && rg != pdReady {
225 throw("runtime: blocked read on free polldesc")
226 }
227 pd.fd = fd
228 pd.closing = false
229 pd.setEventErr(false)
230 pd.rseq++
231 atomic.Storeuintptr(&pd.rg, 0)
232 pd.rd = 0
233 pd.wseq++
234 atomic.Storeuintptr(&pd.wg, 0)
235 pd.wd = 0
236 pd.self = pd
237 pd.publishInfo()
238 unlock(&pd.lock)
239
240 errno := netpollopen(fd, pd)
241 if errno != 0 {
242 pollcache.free(pd)
243 return nil, int(errno)
244 }
245 return pd, 0
246 }
247
248
249 func poll_runtime_pollClose(pd *pollDesc) {
250 if !pd.closing {
251 throw("runtime: close polldesc w/o unblock")
252 }
253 wg := atomic.Loaduintptr(&pd.wg)
254 if wg != 0 && wg != pdReady {
255 throw("runtime: blocked write on closing polldesc")
256 }
257 rg := atomic.Loaduintptr(&pd.rg)
258 if rg != 0 && rg != pdReady {
259 throw("runtime: blocked read on closing polldesc")
260 }
261 netpollclose(pd.fd)
262 pollcache.free(pd)
263 }
264
265 func (c *pollCache) free(pd *pollDesc) {
266 lock(&c.lock)
267 pd.link = c.first
268 c.first = pd
269 unlock(&c.lock)
270 }
271
272
273
274
275
276 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
277 errcode := netpollcheckerr(pd, int32(mode))
278 if errcode != pollNoError {
279 return errcode
280 }
281 if mode == 'r' {
282 atomic.Storeuintptr(&pd.rg, 0)
283 } else if mode == 'w' {
284 atomic.Storeuintptr(&pd.wg, 0)
285 }
286 return pollNoError
287 }
288
289
290
291
292
293
294 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
295 errcode := netpollcheckerr(pd, int32(mode))
296 if errcode != pollNoError {
297 return errcode
298 }
299
300 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
301 netpollarm(pd, mode)
302 }
303 for !netpollblock(pd, int32(mode), false) {
304 errcode = netpollcheckerr(pd, int32(mode))
305 if errcode != pollNoError {
306 return errcode
307 }
308
309
310
311 }
312 return pollNoError
313 }
314
315
316 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
317
318
319 for !netpollblock(pd, int32(mode), true) {
320 }
321 }
322
323
324 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
325 lock(&pd.lock)
326 if pd.closing {
327 unlock(&pd.lock)
328 return
329 }
330 rd0, wd0 := pd.rd, pd.wd
331 combo0 := rd0 > 0 && rd0 == wd0
332 if d > 0 {
333 d += nanotime()
334 if d <= 0 {
335
336
337 d = 1<<63 - 1
338 }
339 }
340 if mode == 'r' || mode == 'r'+'w' {
341 pd.rd = d
342 }
343 if mode == 'w' || mode == 'r'+'w' {
344 pd.wd = d
345 }
346 pd.publishInfo()
347 combo := pd.rd > 0 && pd.rd == pd.wd
348 rtf := netpollReadDeadline
349 if combo {
350 rtf = netpollDeadline
351 }
352 if pd.rt.f == nil {
353 if pd.rd > 0 {
354 pd.rt.f = rtf
355
356
357
358 pd.rt.arg = pd.makeArg()
359 pd.rt.seq = pd.rseq
360 resettimer(&pd.rt, pd.rd)
361 }
362 } else if pd.rd != rd0 || combo != combo0 {
363 pd.rseq++
364 if pd.rd > 0 {
365 modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
366 } else {
367 deltimer(&pd.rt)
368 pd.rt.f = nil
369 }
370 }
371 if pd.wt.f == nil {
372 if pd.wd > 0 && !combo {
373 pd.wt.f = netpollWriteDeadline
374 pd.wt.arg = pd.makeArg()
375 pd.wt.seq = pd.wseq
376 resettimer(&pd.wt, pd.wd)
377 }
378 } else if pd.wd != wd0 || combo != combo0 {
379 pd.wseq++
380 if pd.wd > 0 && !combo {
381 modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
382 } else {
383 deltimer(&pd.wt)
384 pd.wt.f = nil
385 }
386 }
387
388
389 var rg, wg *g
390 if pd.rd < 0 {
391 rg = netpollunblock(pd, 'r', false)
392 }
393 if pd.wd < 0 {
394 wg = netpollunblock(pd, 'w', false)
395 }
396 unlock(&pd.lock)
397 if rg != nil {
398 netpollgoready(rg, 3)
399 }
400 if wg != nil {
401 netpollgoready(wg, 3)
402 }
403 }
404
405
406 func poll_runtime_pollUnblock(pd *pollDesc) {
407 lock(&pd.lock)
408 if pd.closing {
409 throw("runtime: unblock on closing polldesc")
410 }
411 pd.closing = true
412 pd.rseq++
413 pd.wseq++
414 var rg, wg *g
415 pd.publishInfo()
416 rg = netpollunblock(pd, 'r', false)
417 wg = netpollunblock(pd, 'w', false)
418 if pd.rt.f != nil {
419 deltimer(&pd.rt)
420 pd.rt.f = nil
421 }
422 if pd.wt.f != nil {
423 deltimer(&pd.wt)
424 pd.wt.f = nil
425 }
426 unlock(&pd.lock)
427 if rg != nil {
428 netpollgoready(rg, 3)
429 }
430 if wg != nil {
431 netpollgoready(wg, 3)
432 }
433 }
434
435
436
437
438
439
440
441
442
443 func netpollready(toRun *gList, pd *pollDesc, mode int32) {
444 var rg, wg *g
445 if mode == 'r' || mode == 'r'+'w' {
446 rg = netpollunblock(pd, 'r', true)
447 }
448 if mode == 'w' || mode == 'r'+'w' {
449 wg = netpollunblock(pd, 'w', true)
450 }
451 if rg != nil {
452 toRun.push(rg)
453 }
454 if wg != nil {
455 toRun.push(wg)
456 }
457 }
458
459 func netpollcheckerr(pd *pollDesc, mode int32) int {
460 info := pd.info()
461 if info.closing() {
462 return pollErrClosing
463 }
464 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
465 return pollErrTimeout
466 }
467
468
469
470 if mode == 'r' && info.eventErr() {
471 return pollErrNotPollable
472 }
473 return pollNoError
474 }
475
476 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
477 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
478 if r {
479
480
481
482 atomic.Xadd(&netpollWaiters, 1)
483 }
484 return r
485 }
486
487 func netpollgoready(gp *g, traceskip int) {
488 atomic.Xadd(&netpollWaiters, -1)
489 goready(gp, traceskip+1)
490 }
491
492
493
494
495
496 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
497 gpp := &pd.rg
498 if mode == 'w' {
499 gpp = &pd.wg
500 }
501
502
503 for {
504
505 if atomic.Casuintptr(gpp, pdReady, 0) {
506 return true
507 }
508 if atomic.Casuintptr(gpp, 0, pdWait) {
509 break
510 }
511
512
513
514 if v := atomic.Loaduintptr(gpp); v != pdReady && v != 0 {
515 throw("runtime: double wait")
516 }
517 }
518
519
520
521
522 if waitio || netpollcheckerr(pd, mode) == 0 {
523 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
524 }
525
526 old := atomic.Xchguintptr(gpp, 0)
527 if old > pdWait {
528 throw("runtime: corrupted polldesc")
529 }
530 return old == pdReady
531 }
532
533 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
534 gpp := &pd.rg
535 if mode == 'w' {
536 gpp = &pd.wg
537 }
538
539 for {
540 old := atomic.Loaduintptr(gpp)
541 if old == pdReady {
542 return nil
543 }
544 if old == 0 && !ioready {
545
546
547 return nil
548 }
549 var new uintptr
550 if ioready {
551 new = pdReady
552 }
553 if atomic.Casuintptr(gpp, old, new) {
554 if old == pdWait {
555 old = 0
556 }
557 return (*g)(unsafe.Pointer(old))
558 }
559 }
560 }
561
562 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
563 lock(&pd.lock)
564
565
566 currentSeq := pd.rseq
567 if !read {
568 currentSeq = pd.wseq
569 }
570 if seq != currentSeq {
571
572 unlock(&pd.lock)
573 return
574 }
575 var rg *g
576 if read {
577 if pd.rd <= 0 || pd.rt.f == nil {
578 throw("runtime: inconsistent read deadline")
579 }
580 pd.rd = -1
581 pd.publishInfo()
582 rg = netpollunblock(pd, 'r', false)
583 }
584 var wg *g
585 if write {
586 if pd.wd <= 0 || pd.wt.f == nil && !read {
587 throw("runtime: inconsistent write deadline")
588 }
589 pd.wd = -1
590 pd.publishInfo()
591 wg = netpollunblock(pd, 'w', false)
592 }
593 unlock(&pd.lock)
594 if rg != nil {
595 netpollgoready(rg, 0)
596 }
597 if wg != nil {
598 netpollgoready(wg, 0)
599 }
600 }
601
602 func netpollDeadline(arg interface{}, seq uintptr) {
603 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
604 }
605
606 func netpollReadDeadline(arg interface{}, seq uintptr) {
607 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
608 }
609
610 func netpollWriteDeadline(arg interface{}, seq uintptr) {
611 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
612 }
613
614 func (c *pollCache) alloc() *pollDesc {
615 lock(&c.lock)
616 if c.first == nil {
617 const pdSize = unsafe.Sizeof(pollDesc{})
618 n := pollBlockSize / pdSize
619 if n == 0 {
620 n = 1
621 }
622
623
624 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
625 for i := uintptr(0); i < n; i++ {
626 pd := (*pollDesc)(add(mem, i*pdSize))
627 pd.link = c.first
628 c.first = pd
629 }
630 }
631 pd := c.first
632 c.first = pd.link
633 lockInit(&pd.lock, lockRankPollDesc)
634 unlock(&c.lock)
635 return pd
636 }
637
638
639
640
641
642
643 func (pd *pollDesc) makeArg() (i interface{}) {
644 x := (*eface)(unsafe.Pointer(&i))
645 x._type = pdType
646 x.data = unsafe.Pointer(&pd.self)
647 return
648 }
649
650 var (
651 pdEface interface{} = (*pollDesc)(nil)
652 pdType *_type = efaceOf(&pdEface)._type
653 )
654
View as plain text