mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-09 03:11:51 +01:00
7117c7774a
Convert the old mirror syncing queue to the more modern queue format. Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead. The documentation is also updated to add information about the deprecated configuration values. Signed-off-by: Andrew Thornton <art27@cantab.net>
186 lines
5 KiB
Go
186 lines
5 KiB
Go
// Copyright 2020 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"code.gitea.io/gitea/modules/json"
|
|
"code.gitea.io/gitea/modules/log"
|
|
)
|
|
|
|
// ChannelUniqueQueueType is the type for channel queue
|
|
const ChannelUniqueQueueType Type = "unique-channel"
|
|
|
|
// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
|
|
type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
|
|
|
|
// ChannelUniqueQueue implements UniqueQueue
|
|
//
|
|
// It is basically a thin wrapper around a WorkerPool but keeps a store of
|
|
// what has been pushed within a table.
|
|
//
|
|
// Please note that this Queue does not guarantee that a particular
|
|
// task cannot be processed twice or more at the same time. Uniqueness is
|
|
// only guaranteed whilst the task is waiting in the queue.
|
|
type ChannelUniqueQueue struct {
|
|
*WorkerPool
|
|
lock sync.Mutex
|
|
table map[string]bool
|
|
shutdownCtx context.Context
|
|
shutdownCtxCancel context.CancelFunc
|
|
terminateCtx context.Context
|
|
terminateCtxCancel context.CancelFunc
|
|
exemplar interface{}
|
|
workers int
|
|
name string
|
|
}
|
|
|
|
// NewChannelUniqueQueue create a memory channel queue
|
|
func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
|
configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config := configInterface.(ChannelUniqueQueueConfiguration)
|
|
if config.BatchLength == 0 {
|
|
config.BatchLength = 1
|
|
}
|
|
|
|
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
|
|
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
|
|
|
|
queue := &ChannelUniqueQueue{
|
|
table: map[string]bool{},
|
|
shutdownCtx: shutdownCtx,
|
|
shutdownCtxCancel: shutdownCtxCancel,
|
|
terminateCtx: terminateCtx,
|
|
terminateCtxCancel: terminateCtxCancel,
|
|
exemplar: exemplar,
|
|
workers: config.Workers,
|
|
name: config.Name,
|
|
}
|
|
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
|
|
for _, datum := range data {
|
|
// No error is possible here because PushFunc ensures that this can be marshalled
|
|
bs, _ := json.Marshal(datum)
|
|
|
|
queue.lock.Lock()
|
|
delete(queue.table, string(bs))
|
|
queue.lock.Unlock()
|
|
|
|
handle(datum)
|
|
}
|
|
}, config.WorkerPoolConfiguration)
|
|
|
|
queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
|
|
return queue, nil
|
|
}
|
|
|
|
// Run starts to run the queue
|
|
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
|
|
atShutdown(q.Shutdown)
|
|
atTerminate(q.Terminate)
|
|
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
|
|
_ = q.AddWorkers(q.workers, 0)
|
|
}
|
|
|
|
// Push will push data into the queue if the data is not already in the queue
|
|
func (q *ChannelUniqueQueue) Push(data Data) error {
|
|
return q.PushFunc(data, nil)
|
|
}
|
|
|
|
// PushFunc will push data into the queue
|
|
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
|
|
if !assignableTo(data, q.exemplar) {
|
|
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
|
|
}
|
|
|
|
bs, err := json.Marshal(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
q.lock.Lock()
|
|
locked := true
|
|
defer func() {
|
|
if locked {
|
|
q.lock.Unlock()
|
|
}
|
|
}()
|
|
if _, ok := q.table[string(bs)]; ok {
|
|
return ErrAlreadyInQueue
|
|
}
|
|
// FIXME: We probably need to implement some sort of limit here
|
|
// If the downstream queue blocks this table will grow without limit
|
|
q.table[string(bs)] = true
|
|
if fn != nil {
|
|
err := fn()
|
|
if err != nil {
|
|
delete(q.table, string(bs))
|
|
return err
|
|
}
|
|
}
|
|
locked = false
|
|
q.lock.Unlock()
|
|
q.WorkerPool.Push(data)
|
|
return nil
|
|
}
|
|
|
|
// Has checks if the data is in the queue
|
|
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
|
|
bs, err := json.Marshal(data)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
_, has := q.table[string(bs)]
|
|
return has, nil
|
|
}
|
|
|
|
// Shutdown processing from this queue
|
|
func (q *ChannelUniqueQueue) Shutdown() {
|
|
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
|
|
select {
|
|
case <-q.shutdownCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
go func() {
|
|
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
|
|
if err := q.FlushWithContext(q.terminateCtx); err != nil {
|
|
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
|
|
return
|
|
}
|
|
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
|
|
}()
|
|
q.shutdownCtxCancel()
|
|
log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
|
|
}
|
|
|
|
// Terminate this queue and close the queue
|
|
func (q *ChannelUniqueQueue) Terminate() {
|
|
log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
|
|
q.Shutdown()
|
|
select {
|
|
case <-q.terminateCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
q.terminateCtxCancel()
|
|
log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
|
|
}
|
|
|
|
// Name returns the name of this queue
|
|
func (q *ChannelUniqueQueue) Name() string {
|
|
return q.name
|
|
}
|
|
|
|
func init() {
|
|
queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
|
|
}
|