Improve retrying index issues (#27554)

Fix #27540
This commit is contained in:
Jason Song 2023-10-16 02:56:57 +08:00 committed by GitHub
parent cddf245c12
commit 1be49fdda6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 40 deletions

View file

@ -204,12 +204,13 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta
func populateIssueIndexer(ctx context.Context) { func populateIssueIndexer(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true) ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
defer finished() defer finished()
if err := PopulateIssueIndexer(ctx, true); err != nil { ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
if err := PopulateIssueIndexer(ctx); err != nil {
log.Error("Issue indexer population failed: %v", err) log.Error("Issue indexer population failed: %v", err)
} }
} }
func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error { func PopulateIssueIndexer(ctx context.Context) error {
for page := 1; ; page++ { for page := 1; ; page++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -232,21 +233,9 @@ func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
} }
for _, repo := range repos { for _, repo := range repos {
for {
select {
case <-ctx.Done():
return fmt.Errorf("shutdown before completion: %w", ctx.Err())
default:
}
if err := updateRepoIndexer(ctx, repo.ID); err != nil { if err := updateRepoIndexer(ctx, repo.ID); err != nil {
if keepRetrying && ctx.Err() == nil {
log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err)
continue
}
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err) return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
} }
break
}
} }
} }
} }
@ -259,8 +248,8 @@ func UpdateRepoIndexer(ctx context.Context, repoID int64) {
} }
// UpdateIssueIndexer add/update an issue to the issue indexer // UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issueID int64) { func UpdateIssueIndexer(ctx context.Context, issueID int64) {
if err := updateIssueIndexer(issueID); err != nil { if err := updateIssueIndexer(ctx, issueID); err != nil {
log.Error("Unable to push issue %d to issue indexer: %v", issueID, err) log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
} }
} }

View file

@ -127,15 +127,15 @@ func updateRepoIndexer(ctx context.Context, repoID int64) error {
return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err) return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err)
} }
for _, id := range ids { for _, id := range ids {
if err := updateIssueIndexer(id); err != nil { if err := updateIssueIndexer(ctx, id); err != nil {
return err return err
} }
} }
return nil return nil
} }
func updateIssueIndexer(issueID int64) error { func updateIssueIndexer(ctx context.Context, issueID int64) error {
return pushIssueIndexerQueue(&IndexerMetadata{ID: issueID}) return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID})
} }
func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error { func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
@ -148,13 +148,21 @@ func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
if len(ids) == 0 { if len(ids) == 0 {
return nil return nil
} }
return pushIssueIndexerQueue(&IndexerMetadata{ return pushIssueIndexerQueue(ctx, &IndexerMetadata{
IDs: ids, IDs: ids,
IsDelete: true, IsDelete: true,
}) })
} }
func pushIssueIndexerQueue(data *IndexerMetadata) error { type keepRetryKey struct{}
// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying.
// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking.
func contextWithKeepRetry(ctx context.Context) context.Context {
return context.WithValue(ctx, keepRetryKey{}, true)
}
func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error {
if issueIndexerQueue == nil { if issueIndexerQueue == nil {
// Some unit tests will trigger indexing, but the queue is not initialized. // Some unit tests will trigger indexing, but the queue is not initialized.
// It's OK to ignore it, but log a warning message in case it's not a unit test. // It's OK to ignore it, but log a warning message in case it's not a unit test.
@ -162,12 +170,26 @@ func pushIssueIndexerQueue(data *IndexerMetadata) error {
return nil return nil
} }
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := issueIndexerQueue.Push(data) err := issueIndexerQueue.Push(data)
if errors.Is(err, queue.ErrAlreadyInQueue) { if errors.Is(err, queue.ErrAlreadyInQueue) {
return nil return nil
} }
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) { // the queue is full
log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.") log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
if ctx.Value(keepRetryKey{}) == nil {
return err
}
// It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message.
// However, even it retries, it may still cause index loss when there's a deadline in the context.
log.Debug("Retry to push %+v to issue indexer queue", data)
continue
} }
return err return err
}
} }

View file

@ -219,7 +219,7 @@ func registerRebuildIssueIndexer() {
RunAtStart: false, RunAtStart: false,
Schedule: "@annually", Schedule: "@annually",
}, func(ctx context.Context, _ *user_model.User, config Config) error { }, func(ctx context.Context, _ *user_model.User, config Config) error {
return issue_indexer.PopulateIssueIndexer(ctx, false) return issue_indexer.PopulateIssueIndexer(ctx)
}) })
} }

View file

@ -36,11 +36,11 @@ func (r *indexerNotifier) AdoptRepository(ctx context.Context, doer, u *user_mod
func (r *indexerNotifier) CreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository, func (r *indexerNotifier) CreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User, issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User,
) { ) {
issue_indexer.UpdateIssueIndexer(issue.ID) issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
} }
func (r *indexerNotifier) NewIssue(ctx context.Context, issue *issues_model.Issue, mentions []*user_model.User) { func (r *indexerNotifier) NewIssue(ctx context.Context, issue *issues_model.Issue, mentions []*user_model.User) {
issue_indexer.UpdateIssueIndexer(issue.ID) issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
} }
func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.PullRequest, mentions []*user_model.User) { func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.PullRequest, mentions []*user_model.User) {
@ -48,7 +48,7 @@ func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.P
log.Error("LoadIssue: %v", err) log.Error("LoadIssue: %v", err)
return return
} }
issue_indexer.UpdateIssueIndexer(pr.Issue.ID) issue_indexer.UpdateIssueIndexer(ctx, pr.Issue.ID)
} }
func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment, oldContent string) { func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment, oldContent string) {
@ -56,7 +56,7 @@ func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.Us
log.Error("LoadIssue: %v", err) log.Error("LoadIssue: %v", err)
return return
} }
issue_indexer.UpdateIssueIndexer(c.Issue.ID) issue_indexer.UpdateIssueIndexer(ctx, c.Issue.ID)
} }
func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) { func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) {
@ -64,7 +64,7 @@ func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.Us
log.Error("LoadIssue: %v", err) log.Error("LoadIssue: %v", err)
return return
} }
issue_indexer.UpdateIssueIndexer(comment.Issue.ID) issue_indexer.UpdateIssueIndexer(ctx, comment.Issue.ID)
} }
func (r *indexerNotifier) DeleteRepository(ctx context.Context, doer *user_model.User, repo *repo_model.Repository) { func (r *indexerNotifier) DeleteRepository(ctx context.Context, doer *user_model.User, repo *repo_model.Repository) {
@ -120,13 +120,13 @@ func (r *indexerNotifier) ChangeDefaultBranch(ctx context.Context, repo *repo_mo
} }
func (r *indexerNotifier) IssueChangeContent(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldContent string) { func (r *indexerNotifier) IssueChangeContent(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldContent string) {
issue_indexer.UpdateIssueIndexer(issue.ID) issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
} }
func (r *indexerNotifier) IssueChangeTitle(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldTitle string) { func (r *indexerNotifier) IssueChangeTitle(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldTitle string) {
issue_indexer.UpdateIssueIndexer(issue.ID) issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
} }
func (r *indexerNotifier) IssueChangeRef(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldRef string) { func (r *indexerNotifier) IssueChangeRef(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldRef string) {
issue_indexer.UpdateIssueIndexer(issue.ID) issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
} }

View file

@ -4,6 +4,7 @@
package integration package integration
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
@ -99,7 +100,7 @@ func TestViewIssuesKeyword(t *testing.T) {
RepoID: repo.ID, RepoID: repo.ID,
Index: 1, Index: 1,
}) })
issues.UpdateIssueIndexer(issue.ID) issues.UpdateIssueIndexer(context.Background(), issue.ID)
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
const keyword = "first" const keyword = "first"
req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.Link(), keyword) req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.Link(), keyword)