mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-09 19:31:21 +01:00
ba526ceffe
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <art27@cantab.net>
348 lines
11 KiB
Go
348 lines
11 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package graceful
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/process"
|
|
"code.gitea.io/gitea/modules/setting"
|
|
)
|
|
|
|
type state uint8
|
|
|
|
const (
|
|
stateInit state = iota
|
|
stateRunning
|
|
stateShuttingDown
|
|
stateTerminate
|
|
)
|
|
|
|
// There are three places that could inherit sockets:
|
|
//
|
|
// * HTTP or HTTPS main listener
|
|
// * HTTP redirection fallback
|
|
// * SSH
|
|
//
|
|
// If you add an additional place you must increment this number
|
|
// and add a function to call manager.InformCleanup if it's not going to be used
|
|
const numberOfServersToCreate = 4
|
|
|
|
// Manager represents the graceful server manager interface
|
|
var manager *Manager
|
|
|
|
var initOnce = sync.Once{}
|
|
|
|
// GetManager returns the Manager
|
|
func GetManager() *Manager {
|
|
InitManager(context.Background())
|
|
return manager
|
|
}
|
|
|
|
// InitManager creates the graceful manager in the provided context
|
|
func InitManager(ctx context.Context) {
|
|
initOnce.Do(func() {
|
|
manager = newGracefulManager(ctx)
|
|
|
|
// Set the process default context to the HammerContext
|
|
process.DefaultContext = manager.HammerContext()
|
|
})
|
|
}
|
|
|
|
// WithCallback is a runnable to call when the caller has finished
|
|
type WithCallback func(callback func())
|
|
|
|
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
|
|
// After the callback to atShutdown is called and is complete, the main function must return.
|
|
// Similarly the callback function provided to atTerminate must return once termination is complete.
|
|
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
|
|
// - users must therefore be careful to only call these as necessary.
|
|
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
|
|
type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
|
|
|
|
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
|
|
// After the callback to atShutdown is called and is complete, the main function must return.
|
|
// Similarly the callback function provided to atTerminate must return once termination is complete.
|
|
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
|
|
// - users must therefore be careful to only call these as necessary.
|
|
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
|
|
func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
|
|
g.runningServerWaitGroup.Add(1)
|
|
defer g.runningServerWaitGroup.Done()
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
|
|
g.doShutdown()
|
|
}
|
|
}()
|
|
run(func(atShutdown func()) {
|
|
g.lock.Lock()
|
|
defer g.lock.Unlock()
|
|
g.toRunAtShutdown = append(g.toRunAtShutdown,
|
|
func() {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
|
|
g.doShutdown()
|
|
}
|
|
}()
|
|
atShutdown()
|
|
})
|
|
}, func(atTerminate func()) {
|
|
g.RunAtTerminate(atTerminate)
|
|
})
|
|
}
|
|
|
|
// RunnableWithShutdownChan is a runnable with functions to run at shutdown and terminate.
|
|
// After the atShutdown channel is closed, the main function must return once shutdown is complete.
|
|
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
|
|
// The callback function provided to atTerminate must return once termination is complete.
|
|
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
|
|
type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
|
|
|
|
// RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
|
|
// After the atShutdown channel is closed, the main function must return once shutdown is complete.
|
|
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
|
|
// The callback function provided to atTerminate must return once termination is complete.
|
|
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
|
|
func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
|
|
g.runningServerWaitGroup.Add(1)
|
|
defer g.runningServerWaitGroup.Done()
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunWithShutdownChan: %v\nStacktrace: %s", err, log.Stack(2))
|
|
g.doShutdown()
|
|
}
|
|
}()
|
|
run(g.IsShutdown(), func(atTerminate func()) {
|
|
g.RunAtTerminate(atTerminate)
|
|
})
|
|
}
|
|
|
|
// RunWithShutdownContext takes a function that has a context to watch for shutdown.
|
|
// After the provided context is Done(), the main function must return once shutdown is complete.
|
|
// (Optionally the HammerContext may be obtained and waited for however, this should be avoided if possible.)
|
|
func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
|
|
g.runningServerWaitGroup.Add(1)
|
|
defer g.runningServerWaitGroup.Done()
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunWithShutdownContext: %v\nStacktrace: %s", err, log.Stack(2))
|
|
g.doShutdown()
|
|
}
|
|
}()
|
|
run(g.ShutdownContext())
|
|
}
|
|
|
|
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
|
|
func (g *Manager) RunAtTerminate(terminate func()) {
|
|
g.terminateWaitGroup.Add(1)
|
|
g.lock.Lock()
|
|
defer g.lock.Unlock()
|
|
g.toRunAtTerminate = append(g.toRunAtTerminate,
|
|
func() {
|
|
defer g.terminateWaitGroup.Done()
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
|
|
}
|
|
}()
|
|
terminate()
|
|
})
|
|
}
|
|
|
|
// RunAtShutdown creates a go-routine to run the provided function at shutdown
|
|
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
|
|
g.lock.Lock()
|
|
defer g.lock.Unlock()
|
|
g.toRunAtShutdown = append(g.toRunAtShutdown,
|
|
func() {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
|
|
}
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
shutdown()
|
|
}
|
|
})
|
|
}
|
|
|
|
// RunAtHammer creates a go-routine to run the provided function at shutdown
|
|
func (g *Manager) RunAtHammer(hammer func()) {
|
|
g.lock.Lock()
|
|
defer g.lock.Unlock()
|
|
g.toRunAtHammer = append(g.toRunAtHammer,
|
|
func() {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
|
|
}
|
|
}()
|
|
hammer()
|
|
})
|
|
}
|
|
func (g *Manager) doShutdown() {
|
|
if !g.setStateTransition(stateRunning, stateShuttingDown) {
|
|
return
|
|
}
|
|
g.lock.Lock()
|
|
g.shutdownCtxCancel()
|
|
for _, fn := range g.toRunAtShutdown {
|
|
go fn()
|
|
}
|
|
g.lock.Unlock()
|
|
|
|
if setting.GracefulHammerTime >= 0 {
|
|
go g.doHammerTime(setting.GracefulHammerTime)
|
|
}
|
|
go func() {
|
|
g.WaitForServers()
|
|
// Mop up any remaining unclosed events.
|
|
g.doHammerTime(0)
|
|
<-time.After(1 * time.Second)
|
|
g.doTerminate()
|
|
g.WaitForTerminate()
|
|
g.lock.Lock()
|
|
g.doneCtxCancel()
|
|
g.lock.Unlock()
|
|
}()
|
|
}
|
|
|
|
func (g *Manager) doHammerTime(d time.Duration) {
|
|
time.Sleep(d)
|
|
g.lock.Lock()
|
|
select {
|
|
case <-g.hammerCtx.Done():
|
|
default:
|
|
log.Warn("Setting Hammer condition")
|
|
g.hammerCtxCancel()
|
|
for _, fn := range g.toRunAtHammer {
|
|
go fn()
|
|
}
|
|
}
|
|
g.lock.Unlock()
|
|
}
|
|
|
|
func (g *Manager) doTerminate() {
|
|
if !g.setStateTransition(stateShuttingDown, stateTerminate) {
|
|
return
|
|
}
|
|
g.lock.Lock()
|
|
select {
|
|
case <-g.terminateCtx.Done():
|
|
default:
|
|
log.Warn("Terminating")
|
|
g.terminateCtxCancel()
|
|
for _, fn := range g.toRunAtTerminate {
|
|
go fn()
|
|
}
|
|
}
|
|
g.lock.Unlock()
|
|
}
|
|
|
|
// IsChild returns if the current process is a child of previous Gitea process
|
|
func (g *Manager) IsChild() bool {
|
|
return g.isChild
|
|
}
|
|
|
|
// IsShutdown returns a channel which will be closed at shutdown.
|
|
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
|
func (g *Manager) IsShutdown() <-chan struct{} {
|
|
return g.shutdownCtx.Done()
|
|
}
|
|
|
|
// IsHammer returns a channel which will be closed at hammer
|
|
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
|
// Servers running within the running server wait group should respond to IsHammer
|
|
// if not shutdown already
|
|
func (g *Manager) IsHammer() <-chan struct{} {
|
|
return g.hammerCtx.Done()
|
|
}
|
|
|
|
// IsTerminate returns a channel which will be closed at terminate
|
|
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
|
// IsTerminate will only close once all running servers have stopped
|
|
func (g *Manager) IsTerminate() <-chan struct{} {
|
|
return g.terminateCtx.Done()
|
|
}
|
|
|
|
// ServerDone declares a running server done and subtracts one from the
|
|
// running server wait group. Users probably do not want to call this
|
|
// and should use one of the RunWithShutdown* functions
|
|
func (g *Manager) ServerDone() {
|
|
g.runningServerWaitGroup.Done()
|
|
}
|
|
|
|
// WaitForServers waits for all running servers to finish. Users should probably
|
|
// instead use AtTerminate or IsTerminate
|
|
func (g *Manager) WaitForServers() {
|
|
g.runningServerWaitGroup.Wait()
|
|
}
|
|
|
|
// WaitForTerminate waits for all terminating actions to finish.
|
|
// Only the main go-routine should use this
|
|
func (g *Manager) WaitForTerminate() {
|
|
g.terminateWaitGroup.Wait()
|
|
}
|
|
|
|
func (g *Manager) getState() state {
|
|
g.lock.RLock()
|
|
defer g.lock.RUnlock()
|
|
return g.state
|
|
}
|
|
|
|
func (g *Manager) setStateTransition(old, new state) bool {
|
|
if old != g.getState() {
|
|
return false
|
|
}
|
|
g.lock.Lock()
|
|
if g.state != old {
|
|
g.lock.Unlock()
|
|
return false
|
|
}
|
|
g.state = new
|
|
g.lock.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (g *Manager) setState(st state) {
|
|
g.lock.Lock()
|
|
defer g.lock.Unlock()
|
|
|
|
g.state = st
|
|
}
|
|
|
|
// InformCleanup tells the cleanup wait group that we have either taken a listener
|
|
// or will not be taking a listener
|
|
func (g *Manager) InformCleanup() {
|
|
g.createServerWaitGroup.Done()
|
|
}
|
|
|
|
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
|
|
func (g *Manager) Done() <-chan struct{} {
|
|
return g.doneCtx.Done()
|
|
}
|
|
|
|
// Err allows the manager to be viewed as a context.Context done at Terminate
|
|
func (g *Manager) Err() error {
|
|
return g.doneCtx.Err()
|
|
}
|
|
|
|
// Value allows the manager to be viewed as a context.Context done at Terminate
|
|
func (g *Manager) Value(key interface{}) interface{} {
|
|
return g.doneCtx.Value(key)
|
|
}
|
|
|
|
// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
|
|
func (g *Manager) Deadline() (deadline time.Time, ok bool) {
|
|
return g.doneCtx.Deadline()
|
|
}
|