From aea735c6c33c7de237b2af634662fcf5ca352f18 Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Fri, 29 Mar 2024 14:31:25 +0800 Subject: [PATCH 2/2] [1.17 backport]runtime: adjust netpollWaiters after goroutines are ready The runtime was adjusting netpollWaiters before the waiting goroutines were marked as ready. This could cause the scheduler to report a deadlock because there were no goroutines ready to run. Keeping netpollWaiters non-zero ensures that at least one goroutine will call netpoll(-1) from findRunnable. This does mean that if a program has network activity for a while and then never has it again, and also has no timers, then we can leave an M stranded in a call to netpoll from which it will never return. At least this won't be a common case. And it's not new; this has been a potential problem for some time. Note: The upstream does not submit this change to go1.17 according to the rules of MinorReleases. Edited-by: wangshuo Fixes #61454 Change-Id: I17c7f891c2bb1262fda12c6929664e64686463c8 Reviewed-on: https://go-review.googlesource.com/c/go/+/511455 TryBot-Result: Gopher Robot Run-TryBot: Ian Lance Taylor Reviewed-by: Michael Knyszek Auto-Submit: Ian Lance Taylor Reviewed-by: Heschi Kreinick Signed-off-by: Wang Shuo --- src/runtime/netpoll.go | 67 +++++++++++++++++++++++++--------- src/runtime/netpoll_aix.go | 11 +++--- src/runtime/netpoll_epoll.go | 11 +++--- src/runtime/netpoll_fake.go | 4 +- src/runtime/netpoll_kqueue.go | 11 +++--- src/runtime/netpoll_solaris.go | 11 +++--- src/runtime/netpoll_stub.go | 12 ++++-- src/runtime/netpoll_windows.go | 15 ++++---- src/runtime/proc.go | 22 +++++++---- 9 files changed, 106 insertions(+), 58 deletions(-) diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go index 7175c7f..1a96679 100644 --- a/src/runtime/netpoll.go +++ b/src/runtime/netpoll.go @@ -26,10 +26,12 @@ import ( // func netpollclose(fd uintptr) int32 // Disable notifications for fd. Return an errno value. // -// func netpoll(delta int64) gList +// func netpoll(delta int64) (gList, int32) // Poll the network. If delta < 0, block indefinitely. If delta == 0, // poll without blocking. If delta > 0, block for up to delta nanoseconds. -// Return a list of goroutines built by calling netpollready. +// Return a list of goroutines built by calling netpollready, +// and a delta to add to netpollWaiters when all goroutines are ready. +// This will never return an empty list with a non-zero delta. // // func netpollBreak() // Wake up the network poller, assumed to be blocked in netpoll. @@ -57,8 +59,9 @@ const ( // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to pdReady or nil respectively // and unparks the goroutine. -// nil - none of the above. +// pdNil - none of the above. const ( + pdNil uintptr = 0 pdReady uintptr = 1 pdWait uintptr = 2 ) @@ -315,14 +318,16 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { } } // If we set the new deadline in the past, unblock currently pending IO if any. + // Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd. + delta := int32(0) var rg, wg *g if pd.rd < 0 || pd.wd < 0 { atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if pd.rd < 0 { - rg = netpollunblock(pd, 'r', false) + rg = netpollunblock(pd, 'r', false, &delta) } if pd.wd < 0 { - wg = netpollunblock(pd, 'w', false) + wg = netpollunblock(pd, 'w', false, &delta) } } unlock(&pd.lock) @@ -332,6 +337,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { if wg != nil { netpollgoready(wg, 3) } + netpollAdjustWaiters(delta) } //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock @@ -345,8 +351,9 @@ func poll_runtime_pollUnblock(pd *pollDesc) { pd.wseq++ var rg, wg *g atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock - rg = netpollunblock(pd, 'r', false) - wg = netpollunblock(pd, 'w', false) + delta := int32(0) + rg = netpollunblock(pd, 'r', false, &delta) + wg = netpollunblock(pd, 'w', false, &delta) if pd.rt.f != nil { deltimer(&pd.rt) pd.rt.f = nil @@ -362,6 +369,7 @@ func poll_runtime_pollUnblock(pd *pollDesc) { if wg != nil { netpollgoready(wg, 3) } + netpollAdjustWaiters(delta) } // netpollready is called by the platform-specific netpoll function. @@ -370,15 +378,18 @@ func poll_runtime_pollUnblock(pd *pollDesc) { // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate // whether the fd is ready for reading or writing or both. // +// This returns a delta to apply to netpollWaiters. +// // This may run while the world is stopped, so write barriers are not allowed. //go:nowritebarrier -func netpollready(toRun *gList, pd *pollDesc, mode int32) { +func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 { + delta := int32(0) var rg, wg *g if mode == 'r' || mode == 'r'+'w' { - rg = netpollunblock(pd, 'r', true) + rg = netpollunblock(pd, 'r', true, &delta) } if mode == 'w' || mode == 'r'+'w' { - wg = netpollunblock(pd, 'w', true) + wg = netpollunblock(pd, 'w', true, &delta) } if rg != nil { toRun.push(rg) @@ -386,6 +397,7 @@ func netpollready(toRun *gList, pd *pollDesc, mode int32) { if wg != nil { toRun.push(wg) } + return delta } func netpollcheckerr(pd *pollDesc, mode int32) int { @@ -410,7 +422,7 @@ func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { // Bump the count of goroutines waiting for the poller. // The scheduler uses this to decide whether to block // waiting for the poller if there is nothing else to do. - atomic.Xadd(&netpollWaiters, 1) + netpollAdjustWaiters(1) } return r } @@ -461,7 +473,13 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { return old == pdReady } -func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { +// netpollunblock moves either pd.rg (if mode == 'r') or +// pd.wg (if mode == 'w') into the pdReady state. +// This returns any goroutine blocked on pd.{rg,wg}. +// It adds any adjustment to netpollWaiters to *delta; +// this adjustment should be applied after the goroutine has +// been marked ready. +func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg @@ -472,7 +490,7 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { if old == pdReady { return nil } - if old == 0 && !ioready { + if old == pdNil && !ioready { // Only set pdReady for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil @@ -483,9 +501,9 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { } if atomic.Casuintptr(gpp, old, new) { if old == pdWait { - old = 0 - } else if old != 0 { - netpollWaiters.Add(-1) + old = pdNil + } else if old != pdNil { + *delta -= 1 } return (*g)(unsafe.Pointer(old)) } @@ -505,6 +523,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { unlock(&pd.lock) return } + delta := int32(0) var rg *g if read { if pd.rd <= 0 || pd.rt.f == nil { @@ -512,7 +531,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { } pd.rd = -1 atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock - rg = netpollunblock(pd, 'r', false) + rg = netpollunblock(pd, 'r', false, &delta) } var wg *g if write { @@ -521,7 +540,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { } pd.wd = -1 atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock - wg = netpollunblock(pd, 'w', false) + wg = netpollunblock(pd, 'w', false, &delta) } unlock(&pd.lock) if rg != nil { @@ -530,6 +549,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { if wg != nil { netpollgoready(wg, 0) } + netpollAdjustWaiters(delta) } func netpollDeadline(arg interface{}, seq uintptr) { @@ -544,6 +564,17 @@ func netpollWriteDeadline(arg interface{}, seq uintptr) { netpolldeadlineimpl(arg.(*pollDesc), seq, false, true) } +// netpollAnyWaiters reports whether any goroutines are waiting for I/O. +func netpollAnyWaiters() bool { + return atomic.Load(&netpollWaiters) > 0 +} +// netpollAdjustWaiters adds delta to netpollWaiters. +func netpollAdjustWaiters(delta int32) { + if delta != 0 { + atomic.Xadd(&netpollWaiters, delta) + } +} + func (c *pollCache) alloc() *pollDesc { lock(&c.lock) if c.first == nil { diff --git a/src/runtime/netpoll_aix.go b/src/runtime/netpoll_aix.go index 4590ed8..a94e098 100644 --- a/src/runtime/netpoll_aix.go +++ b/src/runtime/netpoll_aix.go @@ -147,13 +147,13 @@ func netpollBreak() { // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds //go:nowritebarrierrec -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { var timeout uintptr if delay < 0 { timeout = ^uintptr(0) } else if delay == 0 { // TODO: call poll with timeout == 0 - return gList{} + return gList{}, 0 } else if delay < 1e6 { timeout = 1 } else if delay < 1e15 { @@ -179,7 +179,7 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if timeout > 0 { - return gList{} + return gList{}, 0 } goto retry } @@ -199,6 +199,7 @@ retry: n-- } var toRun gList + delta := int32(0) for i := 1; i < len(pfds) && n > 0; i++ { pfd := &pfds[i] @@ -216,10 +217,10 @@ retry: if pfd.revents == _POLLERR { pds[i].everr = true } - netpollready(&toRun, pds[i], mode) + delta += netpollready(&toRun, pds[i], mode) n-- } } unlock(&mtxset) - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go index 371ac59..63bcd27 100644 --- a/src/runtime/netpoll_epoll.go +++ b/src/runtime/netpoll_epoll.go @@ -104,9 +104,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if epfd == -1 { - return gList{} + return gList{}, 0 } var waitms int32 if delay < 0 { @@ -133,11 +133,12 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { @@ -173,8 +174,8 @@ retry: if ev.events == _EPOLLERR { pd.everr = true } - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_fake.go b/src/runtime/netpoll_fake.go index 8366f28..adc3c29 100644 --- a/src/runtime/netpoll_fake.go +++ b/src/runtime/netpoll_fake.go @@ -31,6 +31,6 @@ func netpollarm(pd *pollDesc, mode int) { func netpollBreak() { } -func netpoll(delay int64) gList { - return gList{} +func netpoll(delay int64) (gList, int32) { + return gList{}, 0 } diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go index 80d1b0c..6178a6a 100644 --- a/src/runtime/netpoll_kqueue.go +++ b/src/runtime/netpoll_kqueue.go @@ -105,9 +105,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if kq == -1 { - return gList{} + return gList{}, 0 } var tp *timespec var ts timespec @@ -134,11 +134,12 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if delay > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := 0; i < int(n); i++ { ev := &events[i] @@ -184,8 +185,8 @@ retry: if ev.flags == _EV_ERROR { pd.everr = true } - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go index d217d5b..dbfa162 100644 --- a/src/runtime/netpoll_solaris.go +++ b/src/runtime/netpoll_solaris.go @@ -211,9 +211,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if portfd == -1 { - return gList{} + return gList{}, 0 } var wait *timespec @@ -251,12 +251,13 @@ retry: // If a timed sleep was interrupted and there are no events, // just return to recalculate how long we should sleep now. if delay > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := 0; i < int(n); i++ { ev := &events[i] @@ -311,9 +312,9 @@ retry: // about the event port on SmartOS. // // See golang.org/x/issue/30840. - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_stub.go b/src/runtime/netpoll_stub.go index 33ab8eb..70b23a3 100644 --- a/src/runtime/netpoll_stub.go +++ b/src/runtime/netpoll_stub.go @@ -10,7 +10,6 @@ package runtime import "runtime/internal/atomic" var netpollInited uint32 -var netpollWaiters uint32 var netpollStubLock mutex var netpollNote note @@ -35,7 +34,7 @@ func netpollBreak() { // Polls for ready network connections. // Returns list of goroutines that become runnable. -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { // Implementation for platforms that do not support // integrated network poller. if delay != 0 { @@ -54,9 +53,16 @@ func netpoll(delay int64) gList { // (eg when running TestNetpollBreak). osyield() } - return gList{} + return gList{}, 0 } func netpollinited() bool { return atomic.Load(&netpollInited) != 0 } + +func netpollAnyWaiters() bool { + return false +} + +func netpollAdjustWaiters(delta int32) { +} diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go index 4c1cd26..3782254 100644 --- a/src/runtime/netpoll_windows.go +++ b/src/runtime/netpoll_windows.go @@ -80,7 +80,7 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { var entries [64]overlappedEntry var wait, qty, flags, n, i uint32 var errno int32 @@ -90,7 +90,7 @@ func netpoll(delay int64) gList { mp := getg().m if iocphandle == _INVALID_HANDLE_VALUE { - return gList{} + return gList{}, 0 } if delay < 0 { wait = _INFINITE @@ -117,12 +117,13 @@ func netpoll(delay int64) gList { mp.blocked = false errno = int32(getlasterror()) if errno == _WAIT_TIMEOUT { - return gList{} + return gList{}, 0 } println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")") throw("runtime: netpoll failed") } mp.blocked = false + dleta := int32(0) for i = 0; i < n; i++ { op = entries[i].op if op != nil { @@ -131,7 +132,7 @@ func netpoll(delay int64) gList { if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 { errno = int32(getlasterror()) } - handlecompletion(&toRun, op, errno, qty) + delta += handlecompletion(&toRun, op, errno, qty) } else { atomic.Store(&netpollWakeSig, 0) if delay == 0 { @@ -141,10 +142,10 @@ func netpoll(delay int64) gList { } } } - return toRun + return toRun, delta } -func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) { +func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) int32 { mode := op.mode if mode != 'r' && mode != 'w' { println("runtime: GetQueuedCompletionStatusEx returned invalid mode=", mode) @@ -152,5 +153,5 @@ func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) { } op.errno = errno op.qty = qty - netpollready(toRun, op.pd, mode) + return netpollready(toRun, op.pd, mode) } diff --git a/src/runtime/proc.go b/src/runtime/proc.go index e1fe26b..5edc8e3 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -1244,8 +1244,9 @@ func startTheWorldWithSema(emitTraceEvent bool) int64 { mp := acquirem() // disable preemption because it can be holding p in a local var if netpollinited() { - list := netpoll(0) // non-blocking + list, delta := netpoll(0) // non-blocking injectglist(&list) + netpollAdjustWaiters(delta) } lock(&sched.lock) @@ -2753,10 +2754,11 @@ top: // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. - if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { - if list := netpoll(0); !list.empty() { // non-blocking + if netpollinited() && netpollAnyWaiters() && atomic.Load64(&sched.lastpoll) != 0 { + if list, delta := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) + netpollAdjustWaiters(delta) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) @@ -2923,7 +2925,7 @@ top: } // Poll network until next timer. - if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { atomic.Store64(&sched.pollUntil, uint64(pollUntil)) if _g_.m.p != 0 { throw("findrunnable: netpoll with p") @@ -2945,7 +2947,7 @@ top: // When using fake time, just poll. delay = 0 } - list := netpoll(delay) // block until new work is available + list, delta := netpoll(delay) // block until new work is available atomic.Store64(&sched.pollUntil, 0) atomic.Store64(&sched.lastpoll, uint64(nanotime())) if faketime != 0 && list.empty() { @@ -2959,11 +2961,13 @@ top: unlock(&sched.lock) if _p_ == nil { injectglist(&list) + netpollAdjustWaiters(delta) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&list) + netpollAdjustWaiters(delta) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) @@ -2998,9 +3002,10 @@ func pollWork() bool { if !runqempty(p) { return true } - if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 { - if list := netpoll(0); !list.empty() { + if netpollinited() && netpollAnyWaiters() && sched.lastpoll != 0 { + if list, delta := netpoll(0); !list.empty() { injectglist(&list) + netpollAdjustWaiters(delta) return true } } @@ -5402,7 +5407,7 @@ func sysmon() { lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) - list := netpoll(0) // non-blocking - returns list of goroutines + list, delta := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. @@ -5414,6 +5419,7 @@ func sysmon() { incidlelocked(-1) injectglist(&list) incidlelocked(1) + netpollAdjustWaiters(delta) } } mDoFixup() -- 2.27.0