...

Source file src/runtime/netpoll_kqueue.go

Documentation: runtime

		 1  // Copyright 2013 The Go Authors. All rights reserved.
		 2  // Use of this source code is governed by a BSD-style
		 3  // license that can be found in the LICENSE file.
		 4  
		 5  //go:build darwin || dragonfly || freebsd || netbsd || openbsd
		 6  // +build darwin dragonfly freebsd netbsd openbsd
		 7  
		 8  package runtime
		 9  
		10  // Integrated network poller (kqueue-based implementation).
		11  
		12  import (
		13  	"runtime/internal/atomic"
		14  	"unsafe"
		15  )
		16  
		17  var (
		18  	kq int32 = -1
		19  
		20  	netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
		21  
		22  	netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak
		23  )
		24  
		25  func netpollinit() {
		26  	kq = kqueue()
		27  	if kq < 0 {
		28  		println("runtime: kqueue failed with", -kq)
		29  		throw("runtime: netpollinit failed")
		30  	}
		31  	closeonexec(kq)
		32  	r, w, errno := nonblockingPipe()
		33  	if errno != 0 {
		34  		println("runtime: pipe failed with", -errno)
		35  		throw("runtime: pipe failed")
		36  	}
		37  	ev := keventt{
		38  		filter: _EVFILT_READ,
		39  		flags:	_EV_ADD,
		40  	}
		41  	*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
		42  	n := kevent(kq, &ev, 1, nil, 0, nil)
		43  	if n < 0 {
		44  		println("runtime: kevent failed with", -n)
		45  		throw("runtime: kevent failed")
		46  	}
		47  	netpollBreakRd = uintptr(r)
		48  	netpollBreakWr = uintptr(w)
		49  }
		50  
		51  func netpollIsPollDescriptor(fd uintptr) bool {
		52  	return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
		53  }
		54  
		55  func netpollopen(fd uintptr, pd *pollDesc) int32 {
		56  	// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
		57  	// for the whole fd lifetime. The notifications are automatically unregistered
		58  	// when fd is closed.
		59  	var ev [2]keventt
		60  	*(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
		61  	ev[0].filter = _EVFILT_READ
		62  	ev[0].flags = _EV_ADD | _EV_CLEAR
		63  	ev[0].fflags = 0
		64  	ev[0].data = 0
		65  	ev[0].udata = (*byte)(unsafe.Pointer(pd))
		66  	ev[1] = ev[0]
		67  	ev[1].filter = _EVFILT_WRITE
		68  	n := kevent(kq, &ev[0], 2, nil, 0, nil)
		69  	if n < 0 {
		70  		return -n
		71  	}
		72  	return 0
		73  }
		74  
		75  func netpollclose(fd uintptr) int32 {
		76  	// Don't need to unregister because calling close()
		77  	// on fd will remove any kevents that reference the descriptor.
		78  	return 0
		79  }
		80  
		81  func netpollarm(pd *pollDesc, mode int) {
		82  	throw("runtime: unused")
		83  }
		84  
		85  // netpollBreak interrupts a kevent.
		86  func netpollBreak() {
		87  	if atomic.Cas(&netpollWakeSig, 0, 1) {
		88  		for {
		89  			var b byte
		90  			n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
		91  			if n == 1 || n == -_EAGAIN {
		92  				break
		93  			}
		94  			if n == -_EINTR {
		95  				continue
		96  			}
		97  			println("runtime: netpollBreak write failed with", -n)
		98  			throw("runtime: netpollBreak write failed")
		99  		}
	 100  	}
	 101  }
	 102  
	 103  // netpoll checks for ready network connections.
	 104  // Returns list of goroutines that become runnable.
	 105  // delay < 0: blocks indefinitely
	 106  // delay == 0: does not block, just polls
	 107  // delay > 0: block for up to that many nanoseconds
	 108  func netpoll(delay int64) gList {
	 109  	if kq == -1 {
	 110  		return gList{}
	 111  	}
	 112  	var tp *timespec
	 113  	var ts timespec
	 114  	if delay < 0 {
	 115  		tp = nil
	 116  	} else if delay == 0 {
	 117  		tp = &ts
	 118  	} else {
	 119  		ts.setNsec(delay)
	 120  		if ts.tv_sec > 1e6 {
	 121  			// Darwin returns EINVAL if the sleep time is too long.
	 122  			ts.tv_sec = 1e6
	 123  		}
	 124  		tp = &ts
	 125  	}
	 126  	var events [64]keventt
	 127  retry:
	 128  	n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
	 129  	if n < 0 {
	 130  		if n != -_EINTR {
	 131  			println("runtime: kevent on fd", kq, "failed with", -n)
	 132  			throw("runtime: netpoll failed")
	 133  		}
	 134  		// If a timed sleep was interrupted, just return to
	 135  		// recalculate how long we should sleep now.
	 136  		if delay > 0 {
	 137  			return gList{}
	 138  		}
	 139  		goto retry
	 140  	}
	 141  	var toRun gList
	 142  	for i := 0; i < int(n); i++ {
	 143  		ev := &events[i]
	 144  
	 145  		if uintptr(ev.ident) == netpollBreakRd {
	 146  			if ev.filter != _EVFILT_READ {
	 147  				println("runtime: netpoll: break fd ready for", ev.filter)
	 148  				throw("runtime: netpoll: break fd ready for something unexpected")
	 149  			}
	 150  			if delay != 0 {
	 151  				// netpollBreak could be picked up by a
	 152  				// nonblocking poll. Only read the byte
	 153  				// if blocking.
	 154  				var tmp [16]byte
	 155  				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
	 156  				atomic.Store(&netpollWakeSig, 0)
	 157  			}
	 158  			continue
	 159  		}
	 160  
	 161  		var mode int32
	 162  		switch ev.filter {
	 163  		case _EVFILT_READ:
	 164  			mode += 'r'
	 165  
	 166  			// On some systems when the read end of a pipe
	 167  			// is closed the write end will not get a
	 168  			// _EVFILT_WRITE event, but will get a
	 169  			// _EVFILT_READ event with EV_EOF set.
	 170  			// Note that setting 'w' here just means that we
	 171  			// will wake up a goroutine waiting to write;
	 172  			// that goroutine will try the write again,
	 173  			// and the appropriate thing will happen based
	 174  			// on what that write returns (success, EPIPE, EAGAIN).
	 175  			if ev.flags&_EV_EOF != 0 {
	 176  				mode += 'w'
	 177  			}
	 178  		case _EVFILT_WRITE:
	 179  			mode += 'w'
	 180  		}
	 181  		if mode != 0 {
	 182  			pd := (*pollDesc)(unsafe.Pointer(ev.udata))
	 183  			pd.setEventErr(ev.flags == _EV_ERROR)
	 184  			netpollready(&toRun, pd, mode)
	 185  		}
	 186  	}
	 187  	return toRun
	 188  }
	 189  

View as plain text