XL: PutObjectPart update checksum, re-read from xl.json for the part being written. (#2191)

This commit is contained in:
Krishnan Parthasarathi 2016-07-12 18:23:40 -07:00 committed by Harshavardhana
parent 0fcfb5df3b
commit 0610527868
3 changed files with 85 additions and 16 deletions

View file

@ -444,28 +444,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure)
}
var checksums []checkSumInfo
for index, eInfo := range newEInfos {
if eInfo.IsValid() {
// Use a map to find union of checksums of parts that
// we concurrently written and committed before this
// part. N B For a different, concurrent upload of the
// same part, the last written content remains.
checksumSet := make(map[string]checkSumInfo)
checksums = newEInfos[index].Checksum
for _, cksum := range checksums {
checksumSet[cksum.Name] = cksum
}
checksums = updatedEInfos[index].Checksum
for _, cksum := range checksums {
checksumSet[cksum.Name] = cksum
}
// Form the checksumInfo to be committed in xl.json
// from the map.
var finalChecksums []checkSumInfo
for _, cksum := range checksumSet {
finalChecksums = append(finalChecksums, cksum)
}
finalChecksums := unionChecksumInfos(newEInfos[index].Checksum, updatedEInfos[index].Checksum, partSuffix)
updatedEInfos[index] = eInfo
updatedEInfos[index].Checksum = finalChecksums
}

View file

@ -132,3 +132,41 @@ func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1,
// Return structured `xl.json`.
return xlMeta, nil
}
// Uses a map to find union of checksums of parts that were concurrently written
// but committed before this part. N B For a different, concurrent upload of
// the same part, the ongoing request's data/metadata prevails.
// cur - corresponds to parts written to disk before the ongoing putObjectPart request
// updated - corresponds to parts written to disk while the ongoing putObjectPart is in progress
// curPartName - name of the part that is being written
// returns []checkSumInfo containing the set union of checksums of parts that
// have been written so far incl. the part being written.
func unionChecksumInfos(cur []checkSumInfo, updated []checkSumInfo, curPartName string) []checkSumInfo {
checksumSet := make(map[string]checkSumInfo)
var checksums []checkSumInfo
checksums = cur
for _, cksum := range checksums {
checksumSet[cksum.Name] = cksum
}
checksums = updated
for _, cksum := range checksums {
// skip updating checksum of the part that is
// written in this request because the checksum
// from cur, corresponding to this part,
// should remain.
if cksum.Name == curPartName {
continue
}
checksumSet[cksum.Name] = cksum
}
// Form the checksumInfo to be committed in xl.json
// from the map.
var finalChecksums []checkSumInfo
for _, cksum := range checksumSet {
finalChecksums = append(finalChecksums, cksum)
}
return finalChecksums
}

View file

@ -41,3 +41,49 @@ func TestReduceErrs(t *testing.T) {
}
}
}
// Test for unionChecksums
func TestUnionChecksumInfos(t *testing.T) {
cur := []checkSumInfo{
{"part.1", "dummy", "cur-hash.1"},
{"part.2", "dummy", "cur-hash.2"},
{"part.3", "dummy", "cur-hash.3"},
{"part.4", "dummy", "cur-hash.4"},
{"part.5", "dummy", "cur-hash.5"},
}
updated := []checkSumInfo{
{"part.1", "dummy", "updated-hash.1"},
{"part.2", "dummy", "updated-hash.2"},
{"part.3", "dummy", "updated-hash.3"},
}
curPartcksum := cur[0] // part.1 is the current part being written
// Verify that hash of current part being written must be from cur []checkSumInfo
finalChecksums := unionChecksumInfos(cur, updated, curPartcksum.Name)
for _, cksum := range finalChecksums {
if cksum.Name == curPartcksum.Name && cksum.Hash != curPartcksum.Hash {
t.Errorf("expected Hash = %s but received Hash = %s\n", curPartcksum.Hash, cksum.Hash)
}
}
// Verify that all part checksums are present in the union and nothing more.
// Map to store all unique part names
allPartNames := make(map[string]struct{})
// Insert part names from cur and updated []checkSumInfo
for _, cksum := range cur {
allPartNames[cksum.Name] = struct{}{}
}
for _, cksum := range updated {
allPartNames[cksum.Name] = struct{}{}
}
// All parts must have an entry in the []checkSumInfo returned from unionChecksums
for _, finalcksum := range finalChecksums {
if _, ok := allPartNames[finalcksum.Name]; !ok {
t.Errorf("expected to find %s but not present in the union, where current part is %s\n",
finalcksum.Name, curPartcksum.Name)
}
}
if len(finalChecksums) != len(allPartNames) {
t.Error("Union of Checksums doesn't have same number of elements as unique parts in total")
}
}