From d134c3f3cee164cf641cb1b372afdb20496dbcf0 Mon Sep 17 00:00:00 2001 From: Kyle Evans Date: Tue, 5 May 2020 22:36:07 -0500 Subject: [PATCH] archiver: setup infrastructure for notifying consumers of completion This API will *not* allow consumers to subscribe to specific requests being completed, just *any* request being completed. The caller is responsible for determining if their request is satisfied and waiting again if needed. --- routers/init.go | 2 + services/archiver/archiver.go | 70 +++++++++++++++++++++++++++-------- 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/routers/init.go b/routers/init.go index 724bf84c10..9e97b5dade 100644 --- a/routers/init.go +++ b/routers/init.go @@ -28,6 +28,7 @@ import ( "code.gitea.io/gitea/modules/ssh" "code.gitea.io/gitea/modules/task" "code.gitea.io/gitea/modules/webhook" + "code.gitea.io/gitea/services/archiver" "code.gitea.io/gitea/services/mailer" mirror_service "code.gitea.io/gitea/services/mirror" pull_service "code.gitea.io/gitea/services/pull" @@ -50,6 +51,7 @@ func checkRunMode() { // NewServices init new services func NewServices() { setting.NewServices() + archiver.NewContext() mailer.NewContext() _ = cache.NewContext() notification.NewContext() diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index ebc060e943..cbf70a8d30 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -40,6 +40,7 @@ type ArchiveRequest struct { var archiveInProgress []*ArchiveRequest var archiveMutex sync.Mutex +var archiveCond *sync.Cond // These facilitate testing, by allowing the unit tests to control (to some extent) // the goroutine used for processing the queue. @@ -198,30 +199,33 @@ func doArchive(r *ArchiveRequest) { return } + // Block any attempt to finalize creating a new request if we're marking r.archiveComplete = true } // ArchiveRepository satisfies the ArchiveRequest being passed in. Processing // will occur in a separate goroutine, as this phase may take a while to // complete. If the archive already exists, ArchiveRepository will not do -// anything. -func ArchiveRepository(request *ArchiveRequest) { - if request.archiveComplete { - return - } - go func() { - // We'll take some liberties here, in that the caller may not assume that the - // specific request they submitted is the one getting enqueued. We'll just drop - // it if it turns out we've already enqueued an identical request, as they'll keep - // checking back for the status anyways. - archiveMutex.Lock() - if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil { - archiveMutex.Unlock() - return - } - archiveInProgress = append(archiveInProgress, request) +// anything. In all cases, the caller should be examining the *ArchiveRequest +// being returned for completion, as it may be different than the one they passed +// in. +func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest { + // We'll return the request that's already been enqueued if it has been + // enqueued, or we'll immediately enqueue it if it has not been enqueued + // and it is not marked complete. + archiveMutex.Lock() + if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil { archiveMutex.Unlock() + return rExisting + } + if request.archiveComplete { + archiveMutex.Unlock() + return request + } + archiveInProgress = append(archiveInProgress, request) + archiveMutex.Unlock() + go func() { // Wait to start, if we have the Cond for it. This is currently only // useful for testing, so that the start and release of queued entries // can be controlled to examine the queue. @@ -251,6 +255,11 @@ func ArchiveRepository(request *ArchiveRequest) { // correctness. archiveMutex.Lock() defer archiveMutex.Unlock() + // Wake up all other goroutines that may be waiting on a request to + // complete. They should all wake up, see if that particular request + // is complete, then return to waiting if it is not. + archiveCond.Broadcast() + idx := -1 for _idx, req := range archiveInProgress { if req == request { @@ -268,4 +277,33 @@ func ArchiveRepository(request *ArchiveRequest) { } archiveInProgress = archiveInProgress[:lastidx] }() + + return request +} + +// LockQueue will obtain the archiveMutex for the caller. This allows the +// underlying locking mechanism to remain opaque. +func LockQueue() { + archiveMutex.Lock() +} + +// UnlockQueue will release the archiveMutex for the caller, again allowing the +// underlying locking mechanism to remain opaque. +func UnlockQueue() { + archiveMutex.Unlock() +} + +// WaitForCompletion should be called with the queue locked (LockQueue), and will +// return with the queue lock held when a single archive request has finished. +// There is currently no API for getting notified of a particular request being +// completed. +func WaitForCompletion() { + archiveCond.Wait() +} + +// NewContext will initialize local state, e.g. primitives needed to be able to +// synchronize with the lock queue and allow callers to wait for an archive to +// finish. +func NewContext() { + archiveCond = sync.NewCond(&archiveMutex) }