From 523efa433b61e00e7a14bd31cac315e43842d729 Mon Sep 17 00:00:00 2001 From: zeripath Date: Thu, 4 Mar 2021 02:57:01 +0000 Subject: [PATCH] Move Bleve and Elastic code indexers to use a common cat-file --batch (#14781) * Extract out the common cat-file batch calls Signed-off-by: Andrew Thornton * Move bleve and elastic indexers to use a common cat-file --batch when indexing Signed-off-by: Andrew Thornton * move catfilebatch to batch_reader and rename to batch_reader.go Signed-off-by: Andrew Thornton Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: Lauris BH --- ...atch_reader_nogogit.go => batch_reader.go} | 35 ++++++++++++++++- modules/git/commit_info_nogogit.go | 25 +----------- modules/git/pipeline/lfs_nogogit.go | 23 +---------- modules/git/repo_language_stats_nogogit.go | 27 +------------ modules/indexer/code/bleve.go | 30 ++++++++++++--- modules/indexer/code/elastic_search.go | 38 ++++++++++++++----- 6 files changed, 91 insertions(+), 87 deletions(-) rename modules/git/{batch_reader_nogogit.go => batch_reader.go} (83%) diff --git a/modules/git/batch_reader_nogogit.go b/modules/git/batch_reader.go similarity index 83% rename from modules/git/batch_reader_nogogit.go rename to modules/git/batch_reader.go index 6a236e5002..6014508b93 100644 --- a/modules/git/batch_reader_nogogit.go +++ b/modules/git/batch_reader.go @@ -2,17 +2,48 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -// +build !gogit - package git import ( "bufio" "bytes" + "io" "math" "strconv" + "strings" ) +// CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function +func CatFileBatch(repoPath string) (*io.PipeWriter, *bufio.Reader, func()) { + // Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. + // so let's create a batch stdin and stdout + batchStdinReader, batchStdinWriter := io.Pipe() + batchStdoutReader, batchStdoutWriter := io.Pipe() + cancel := func() { + _ = batchStdinReader.Close() + _ = batchStdinWriter.Close() + _ = batchStdoutReader.Close() + _ = batchStdoutWriter.Close() + } + + go func() { + stderr := strings.Builder{} + err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repoPath, batchStdoutWriter, &stderr, batchStdinReader) + if err != nil { + _ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) + _ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) + } else { + _ = batchStdoutWriter.Close() + _ = batchStdinReader.Close() + } + }() + + // For simplicities sake we'll us a buffered reader to read from the cat-file --batch + batchReader := bufio.NewReader(batchStdoutReader) + + return batchStdinWriter, batchReader, cancel +} + // ReadBatchLine reads the header line from cat-file --batch // We expect: // SP SP LF diff --git a/modules/git/commit_info_nogogit.go b/modules/git/commit_info_nogogit.go index ac0c7cff5d..b844468c8c 100644 --- a/modules/git/commit_info_nogogit.go +++ b/modules/git/commit_info_nogogit.go @@ -141,29 +141,8 @@ func GetLastCommitForPaths(commit *Commit, treePath string, paths []string) ([]* } }() - // We feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. - // so let's create a batch stdin and stdout - batchStdinReader, batchStdinWriter := io.Pipe() - batchStdoutReader, batchStdoutWriter := io.Pipe() - defer func() { - _ = batchStdinReader.Close() - _ = batchStdinWriter.Close() - _ = batchStdoutReader.Close() - _ = batchStdoutWriter.Close() - }() - - go func() { - stderr := strings.Builder{} - err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(commit.repo.Path, batchStdoutWriter, &stderr, batchStdinReader) - if err != nil { - _ = revListWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) - } else { - _ = revListWriter.Close() - } - }() - - // For simplicities sake we'll us a buffered reader - batchReader := bufio.NewReader(batchStdoutReader) + batchStdinWriter, batchReader, cancel := CatFileBatch(commit.repo.Path) + defer cancel() mapsize := 4096 if len(paths) > mapsize { diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index 30d33e27e0..f6faa3a48a 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -64,27 +64,8 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { // Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. // so let's create a batch stdin and stdout - batchStdinReader, batchStdinWriter := io.Pipe() - batchStdoutReader, batchStdoutWriter := io.Pipe() - defer func() { - _ = batchStdinReader.Close() - _ = batchStdinWriter.Close() - _ = batchStdoutReader.Close() - _ = batchStdoutWriter.Close() - }() - - go func() { - stderr := strings.Builder{} - err := git.NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader) - if err != nil { - _ = revListWriter.CloseWithError(git.ConcatenateError(err, (&stderr).String())) - } else { - _ = revListWriter.Close() - } - }() - - // For simplicities sake we'll us a buffered reader to read from the cat-file --batch - batchReader := bufio.NewReader(batchStdoutReader) + batchStdinWriter, batchReader, cancel := git.CatFileBatch(repo.Path) + defer cancel() // We'll use a scanner for the revList because it's simpler than a bufio.Reader scan := bufio.NewScanner(revListReader) diff --git a/modules/git/repo_language_stats_nogogit.go b/modules/git/repo_language_stats_nogogit.go index 4c6f07f0fb..a929d7953b 100644 --- a/modules/git/repo_language_stats_nogogit.go +++ b/modules/git/repo_language_stats_nogogit.go @@ -11,7 +11,6 @@ import ( "bytes" "io" "math" - "strings" "code.gitea.io/gitea/modules/analyze" @@ -22,30 +21,8 @@ import ( func (repo *Repository) GetLanguageStats(commitID string) (map[string]int64, error) { // We will feed the commit IDs in order into cat-file --batch, followed by blobs as necessary. // so let's create a batch stdin and stdout - - batchStdinReader, batchStdinWriter := io.Pipe() - batchStdoutReader, batchStdoutWriter := io.Pipe() - defer func() { - _ = batchStdinReader.Close() - _ = batchStdinWriter.Close() - _ = batchStdoutReader.Close() - _ = batchStdoutWriter.Close() - }() - - go func() { - stderr := strings.Builder{} - err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader) - if err != nil { - _ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) - _ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) - } else { - _ = batchStdoutWriter.Close() - _ = batchStdinReader.Close() - } - }() - - // For simplicities sake we'll us a buffered reader - batchReader := bufio.NewReader(batchStdoutReader) + batchStdinWriter, batchReader, cancel := CatFileBatch(repo.Path) + defer cancel() writeID := func(id string) error { _, err := batchStdinWriter.Write([]byte(id)) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 1ebc74c43a..573ea8b88c 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -5,7 +5,10 @@ package code import ( + "bufio" "fmt" + "io" + "io/ioutil" "os" "strconv" "strings" @@ -173,7 +176,7 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { return indexer, created, err } -func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { +func (b *BleveIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { return nil @@ -196,8 +199,16 @@ func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *mode return b.addDelete(update.Filename, repo, batch) } - fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). - RunInDirBytes(repo.RepoPath()) + if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { + return err + } + + _, _, size, err := git.ReadBatchLine(batchReader) + if err != nil { + return err + } + + fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size)) if err != nil { return err } else if !base.IsTextFile(fileContents) { @@ -254,10 +265,17 @@ func (b *BleveIndexer) Close() { // Index indexes the data func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) - for _, update := range changes.Updates { - if err := b.addUpdate(sha, update, repo, batch); err != nil { - return err + if len(changes.Updates) > 0 { + + batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) + defer cancel() + + for _, update := range changes.Updates { + if err := b.addUpdate(batchWriter, batchReader, sha, update, repo, batch); err != nil { + return err + } } + cancel() } for _, filename := range changes.RemovedFilenames { if err := b.addDelete(filename, repo, batch); err != nil { diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go index 130cd1430a..5327eb1e51 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elastic_search.go @@ -5,8 +5,11 @@ package code import ( + "bufio" "context" "fmt" + "io" + "io/ioutil" "strconv" "strings" "time" @@ -172,7 +175,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) { return exists, nil } -func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { +func (b *ElasticSearchIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { return nil, nil @@ -195,8 +198,16 @@ func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *mo return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil } - fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). - RunInDirBytes(repo.RepoPath()) + if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { + return nil, err + } + + _, _, size, err := git.ReadBatchLine(batchReader) + if err != nil { + return nil, err + } + + fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size)) if err != nil { return nil, err } else if !base.IsTextFile(fileContents) { @@ -230,14 +241,21 @@ func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repositor // Index will save the index data func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { reqs := make([]elastic.BulkableRequest, 0) - for _, update := range changes.Updates { - updateReqs, err := b.addUpdate(sha, update, repo) - if err != nil { - return err - } - if len(updateReqs) > 0 { - reqs = append(reqs, updateReqs...) + if len(changes.Updates) > 0 { + + batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) + defer cancel() + + for _, update := range changes.Updates { + updateReqs, err := b.addUpdate(batchWriter, batchReader, sha, update, repo) + if err != nil { + return err + } + if len(updateReqs) > 0 { + reqs = append(reqs, updateReqs...) + } } + cancel() } for _, filename := range changes.RemovedFilenames {