heal: Add MRF metrics to background heal API response (#12398)

This commit gathers MRF metrics from 
all nodes in a cluster and return it to the caller. This will show information about the 
number of objects in the MRF queues 
waiting to be healed.
This commit is contained in:
Anis Elleuch 2021-07-16 06:32:06 +01:00 committed by GitHub
parent ead8778305
commit b0b4696a64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 458 additions and 209 deletions

View File

@ -48,18 +48,23 @@ const (
type healingTracker struct {
disk StorageAPI `msg:"-"`
ID string
PoolIndex int
SetIndex int
DiskIndex int
Path string
Endpoint string
Started time.Time
LastUpdate time.Time
ObjectsHealed uint64
ObjectsFailed uint64
BytesDone uint64
BytesFailed uint64
ID string
PoolIndex int
SetIndex int
DiskIndex int
Path string
Endpoint string
Started time.Time
LastUpdate time.Time
ObjectsTotalCount uint64
ObjectsTotalSize uint64
ItemsHealed uint64
ItemsFailed uint64
BytesDone uint64
BytesFailed uint64
// Last object scanned.
Bucket string `json:"-"`
@ -67,10 +72,10 @@ type healingTracker struct {
// Numbers when current bucket started healing,
// for resuming with correct numbers.
ResumeObjectsHealed uint64 `json:"-"`
ResumeObjectsFailed uint64 `json:"-"`
ResumeBytesDone uint64 `json:"-"`
ResumeBytesFailed uint64 `json:"-"`
ResumeItemsHealed uint64 `json:"-"`
ResumeItemsFailed uint64 `json:"-"`
ResumeBytesDone uint64 `json:"-"`
ResumeBytesFailed uint64 `json:"-"`
// Filled on startup/restarts.
QueuedBuckets []string
@ -175,8 +180,8 @@ func (h *healingTracker) isHealed(bucket string) bool {
// resume will reset progress to the numbers at the start of the bucket.
func (h *healingTracker) resume() {
h.ObjectsHealed = h.ResumeObjectsHealed
h.ObjectsFailed = h.ResumeObjectsFailed
h.ItemsHealed = h.ResumeItemsHealed
h.ItemsFailed = h.ResumeItemsFailed
h.BytesDone = h.ResumeBytesDone
h.BytesFailed = h.ResumeBytesFailed
}
@ -184,8 +189,8 @@ func (h *healingTracker) resume() {
// bucketDone should be called when a bucket is done healing.
// Adds the bucket to the list of healed buckets and updates resume numbers.
func (h *healingTracker) bucketDone(bucket string) {
h.ResumeObjectsHealed = h.ObjectsHealed
h.ResumeObjectsFailed = h.ObjectsFailed
h.ResumeItemsHealed = h.ItemsHealed
h.ResumeItemsFailed = h.ItemsFailed
h.ResumeBytesDone = h.BytesDone
h.ResumeBytesFailed = h.BytesFailed
h.HealedBuckets = append(h.HealedBuckets, bucket)
@ -220,22 +225,28 @@ func (h *healingTracker) printTo(writer io.Writer) {
// toHealingDisk converts the information to madmin.HealingDisk
func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
return madmin.HealingDisk{
ID: h.ID,
Endpoint: h.Endpoint,
PoolIndex: h.PoolIndex,
SetIndex: h.SetIndex,
DiskIndex: h.DiskIndex,
Path: h.Path,
Started: h.Started.UTC(),
LastUpdate: h.LastUpdate.UTC(),
ObjectsHealed: h.ObjectsHealed,
ObjectsFailed: h.ObjectsFailed,
BytesDone: h.BytesDone,
BytesFailed: h.BytesFailed,
Bucket: h.Bucket,
Object: h.Object,
QueuedBuckets: h.QueuedBuckets,
HealedBuckets: h.HealedBuckets,
ID: h.ID,
Endpoint: h.Endpoint,
PoolIndex: h.PoolIndex,
SetIndex: h.SetIndex,
DiskIndex: h.DiskIndex,
Path: h.Path,
Started: h.Started.UTC(),
LastUpdate: h.LastUpdate.UTC(),
ObjectsTotalCount: h.ObjectsTotalCount,
ObjectsTotalSize: h.ObjectsTotalSize,
ItemsHealed: h.ItemsHealed,
ItemsFailed: h.ItemsFailed,
BytesDone: h.BytesDone,
BytesFailed: h.BytesFailed,
Bucket: h.Bucket,
Object: h.Object,
QueuedBuckets: h.QueuedBuckets,
HealedBuckets: h.HealedBuckets,
ObjectsHealed: h.ItemsHealed, // Deprecated July 2021
ObjectsFailed: h.ItemsFailed, // Deprecated July 2021
}
}
@ -413,6 +424,14 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
tracker = newHealingTracker(disk)
}
// Load bucket totals
cache := dataUsageCache{}
if err := cache.load(ctx, z.serverPools[i].sets[setIndex], dataUsageCacheName); err == nil {
dataUsageInfo := cache.dui(dataUsageRoot, nil)
tracker.ObjectsTotalCount = dataUsageInfo.ObjectsTotalCount
tracker.ObjectsTotalSize = dataUsageInfo.ObjectsTotalSize
}
tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc()
tracker.setQueuedBuckets(buckets)
if err := tracker.save(ctx); err != nil {

View File

@ -72,16 +72,28 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "ObjectsHealed":
z.ObjectsHealed, err = dc.ReadUint64()
case "ObjectsTotalCount":
z.ObjectsTotalCount, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
err = msgp.WrapError(err, "ObjectsTotalCount")
return
}
case "ObjectsFailed":
z.ObjectsFailed, err = dc.ReadUint64()
case "ObjectsTotalSize":
z.ObjectsTotalSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
err = msgp.WrapError(err, "ObjectsTotalSize")
return
}
case "ItemsHealed":
z.ItemsHealed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ItemsHealed")
return
}
case "ItemsFailed":
z.ItemsFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ItemsFailed")
return
}
case "BytesDone":
@ -108,16 +120,16 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Object")
return
}
case "ResumeObjectsHealed":
z.ResumeObjectsHealed, err = dc.ReadUint64()
case "ResumeItemsHealed":
z.ResumeItemsHealed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
err = msgp.WrapError(err, "ResumeItemsHealed")
return
}
case "ResumeObjectsFailed":
z.ResumeObjectsFailed, err = dc.ReadUint64()
case "ResumeItemsFailed":
z.ResumeItemsFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
case "ResumeBytesDone":
@ -183,9 +195,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 20
// map header, size 22
// write "ID"
err = en.Append(0xde, 0x0, 0x14, 0xa2, 0x49, 0x44)
err = en.Append(0xde, 0x0, 0x16, 0xa2, 0x49, 0x44)
if err != nil {
return
}
@ -264,24 +276,44 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "LastUpdate")
return
}
// write "ObjectsHealed"
err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
// write "ObjectsTotalCount"
err = en.Append(0xb1, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteUint64(z.ObjectsHealed)
err = en.WriteUint64(z.ObjectsTotalCount)
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
err = msgp.WrapError(err, "ObjectsTotalCount")
return
}
// write "ObjectsFailed"
err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
// write "ObjectsTotalSize"
err = en.Append(0xb0, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ObjectsFailed)
err = en.WriteUint64(z.ObjectsTotalSize)
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
err = msgp.WrapError(err, "ObjectsTotalSize")
return
}
// write "ItemsHealed"
err = en.Append(0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ItemsHealed)
if err != nil {
err = msgp.WrapError(err, "ItemsHealed")
return
}
// write "ItemsFailed"
err = en.Append(0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ItemsFailed)
if err != nil {
err = msgp.WrapError(err, "ItemsFailed")
return
}
// write "BytesDone"
@ -324,24 +356,24 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Object")
return
}
// write "ResumeObjectsHealed"
err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
// write "ResumeItemsHealed"
err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeObjectsHealed)
err = en.WriteUint64(z.ResumeItemsHealed)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
err = msgp.WrapError(err, "ResumeItemsHealed")
return
}
// write "ResumeObjectsFailed"
err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
// write "ResumeItemsFailed"
err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeObjectsFailed)
err = en.WriteUint64(z.ResumeItemsFailed)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
// write "ResumeBytesDone"
@ -404,9 +436,9 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 20
// map header, size 22
// string "ID"
o = append(o, 0xde, 0x0, 0x14, 0xa2, 0x49, 0x44)
o = append(o, 0xde, 0x0, 0x16, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
// string "PoolIndex"
o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
@ -429,12 +461,18 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
// string "LastUpdate"
o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65)
o = msgp.AppendTime(o, z.LastUpdate)
// string "ObjectsHealed"
o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ObjectsHealed)
// string "ObjectsFailed"
o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ObjectsFailed)
// string "ObjectsTotalCount"
o = append(o, 0xb1, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.ObjectsTotalCount)
// string "ObjectsTotalSize"
o = append(o, 0xb0, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.ObjectsTotalSize)
// string "ItemsHealed"
o = append(o, 0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ItemsHealed)
// string "ItemsFailed"
o = append(o, 0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ItemsFailed)
// string "BytesDone"
o = append(o, 0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
o = msgp.AppendUint64(o, z.BytesDone)
@ -447,12 +485,12 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
// string "Object"
o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74)
o = msgp.AppendString(o, z.Object)
// string "ResumeObjectsHealed"
o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeObjectsHealed)
// string "ResumeObjectsFailed"
o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeObjectsFailed)
// string "ResumeItemsHealed"
o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeItemsHealed)
// string "ResumeItemsFailed"
o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeItemsFailed)
// string "ResumeBytesDone"
o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
o = msgp.AppendUint64(o, z.ResumeBytesDone)
@ -540,16 +578,28 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "ObjectsHealed":
z.ObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts)
case "ObjectsTotalCount":
z.ObjectsTotalCount, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
err = msgp.WrapError(err, "ObjectsTotalCount")
return
}
case "ObjectsFailed":
z.ObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts)
case "ObjectsTotalSize":
z.ObjectsTotalSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
err = msgp.WrapError(err, "ObjectsTotalSize")
return
}
case "ItemsHealed":
z.ItemsHealed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ItemsHealed")
return
}
case "ItemsFailed":
z.ItemsFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ItemsFailed")
return
}
case "BytesDone":
@ -576,16 +626,16 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Object")
return
}
case "ResumeObjectsHealed":
z.ResumeObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts)
case "ResumeItemsHealed":
z.ResumeItemsHealed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
err = msgp.WrapError(err, "ResumeItemsHealed")
return
}
case "ResumeObjectsFailed":
z.ResumeObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts)
case "ResumeItemsFailed":
z.ResumeItemsFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
case "ResumeBytesDone":
@ -652,7 +702,7 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *healingTracker) Msgsize() (s int) {
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 14 + msgp.Uint64Size + 14 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 20 + msgp.Uint64Size + 20 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize
for za0001 := range z.QueuedBuckets {
s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001])
}

View File

@ -945,7 +945,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
if disk != nil && disk.IsOnline() {
continue
}
er.addPartial(bucket, object, fi.VersionID)
er.addPartial(bucket, object, fi.VersionID, fi.Size)
break
}

View File

@ -826,16 +826,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
continue
}
er.addPartial(bucket, object, fi.VersionID)
break
}
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
@ -844,6 +834,17 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
break
}
}
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
continue
}
er.addPartial(bucket, object, fi.VersionID, fi.Size)
break
}
online = countOnlineDisks(onlineDisks)
return fi.ToObjectInfo(bucket, object), nil
@ -1029,7 +1030,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// all other direct versionId references we should
// ensure no dangling file is left over.
er.addPartial(bucket, version.Name, version.VersionID)
er.addPartial(bucket, version.Name, version.VersionID, -1)
break
}
}
@ -1177,7 +1178,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if disk != nil && disk.IsOnline() {
continue
}
er.addPartial(bucket, object, opts.VersionID)
er.addPartial(bucket, object, opts.VersionID, -1)
break
}
@ -1192,11 +1193,15 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
// Send the successful but partial upload/delete, however ignore
// if the channel is blocked by other items.
func (er erasureObjects) addPartial(bucket, object, versionID string) {
select {
case er.mrfOpCh <- partialOperation{bucket: bucket, object: object, versionID: versionID}:
default:
}
func (er erasureObjects) addPartial(bucket, object, versionID string, size int64) {
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
versionID: versionID,
size: size,
setIndex: er.setIndex,
poolIndex: er.poolIndex,
})
}
func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
@ -1421,7 +1426,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
if disk != nil && disk.IsOnline() {
continue
}
er.addPartial(bucket, object, opts.VersionID)
er.addPartial(bucket, object, opts.VersionID, -1)
break
}
// Notify object deleted event.

View File

@ -96,8 +96,6 @@ type erasureSets struct {
disksStorageInfoCache timedValue
mrfMU sync.Mutex
mrfOperations map[healSource]int
lastConnectDisksOpTime time.Time
}
@ -268,20 +266,11 @@ func (s *erasureSets) connectDisks() {
wg.Wait()
go func() {
idler := time.NewTimer(100 * time.Millisecond)
defer idler.Stop()
for setIndex, justConnected := range setsJustConnected {
if !justConnected {
continue
}
// Send a new set connect event with a timeout
idler.Reset(100 * time.Millisecond)
select {
case s.setReconnectEvent <- setIndex:
case <-idler.C:
}
globalMRFState.newSetReconnected(setIndex, s.poolIndex)
}
}()
}
@ -375,7 +364,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
setReconnectEvent: make(chan int),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
mrfOperations: make(map[healSource]int),
poolIndex: poolIdx,
}
@ -446,7 +434,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
nsMutex: mutex,
bp: bp,
bpOld: bpOld,
mrfOpCh: make(chan partialOperation, 10000),
}
}
@ -466,8 +453,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
// Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval)
go s.maintainMRFList()
go s.healMRFRoutine()
return s, nil
}
@ -1388,71 +1373,6 @@ func (s *erasureSets) GetObjectTags(ctx context.Context, bucket, object string,
return er.GetObjectTags(ctx, bucket, object, opts)
}
// maintainMRFList gathers the list of successful partial uploads
// from all underlying er.sets and puts them in a global map which
// should not have more than 10000 entries.
func (s *erasureSets) maintainMRFList() {
var agg = make(chan partialOperation, 10000)
for i, er := range s.sets {
go func(c <-chan partialOperation, setIndex int) {
for msg := range c {
msg.failedSet = setIndex
select {
case agg <- msg:
default:
}
}
}(er.mrfOpCh, i)
}
for fOp := range agg {
s.mrfMU.Lock()
if len(s.mrfOperations) > 10000 {
s.mrfMU.Unlock()
continue
}
s.mrfOperations[healSource{
bucket: fOp.bucket,
object: fOp.object,
versionID: fOp.versionID,
opts: &madmin.HealOpts{Remove: true},
}] = fOp.failedSet
s.mrfMU.Unlock()
}
}
// healMRFRoutine monitors new disks connection, sweep the MRF list
// to find objects related to the new disk that needs to be healed.
func (s *erasureSets) healMRFRoutine() {
// Wait until background heal state is initialized
bgSeq := mustGetHealSequence(GlobalContext)
for setIndex := range s.setReconnectEvent {
// Get the list of objects related the er.set
// to which the connected disk belongs.
var mrfOperations []healSource
s.mrfMU.Lock()
for k, v := range s.mrfOperations {
if v == setIndex {
mrfOperations = append(mrfOperations, k)
}
}
s.mrfMU.Unlock()
// Heal objects
for _, u := range mrfOperations {
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
// Send an object to background heal
bgSeq.sourceCh <- u
s.mrfMU.Lock()
delete(s.mrfOperations, u)
s.mrfMU.Unlock()
}
}
}
// TransitionObject - transition object content to target tier.
func (s *erasureSets) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
return s.getHashedSet(object).TransitionObject(ctx, bucket, object, opts)

View File

@ -39,15 +39,6 @@ import (
// OfflineDisk represents an unavailable disk.
var OfflineDisk StorageAPI // zero value is nil
// partialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum)
type partialOperation struct {
bucket string
object string
versionID string
failedSet int
}
// erasureObjects - Implements ER object layer.
type erasureObjects struct {
GatewayUnsupported
@ -78,8 +69,6 @@ type erasureObjects struct {
// legacy objects.
bpOld *bpool.BytePoolCap
mrfOpCh chan partialOperation
deletedCleanupSleeper *dynamicSleeper
}

View File

@ -68,6 +68,17 @@ func newBgHealSequence() *healSequence {
}
}
func getCurrentMRFStatus() madmin.MRFStatus {
mrfInfo := globalMRFState.getCurrentMRFRoundInfo()
return madmin.MRFStatus{
BytesHealed: mrfInfo.bytesHealed,
ItemsHealed: mrfInfo.itemsHealed,
TotalItems: mrfInfo.itemsHealed + mrfInfo.pendingItems,
TotalBytes: mrfInfo.bytesHealed + mrfInfo.pendingBytes,
Started: mrfInfo.triggeredAt,
}
}
// getBackgroundHealStatus will return the
func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealState, bool) {
if globalBackgroundHealState == nil {
@ -79,13 +90,20 @@ func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealS
return madmin.BgHealState{}, false
}
status := madmin.BgHealState{
ScannedItemsCount: bgSeq.getScannedItemsCount(),
}
if globalMRFState != nil {
status.MRF = map[string]madmin.MRFStatus{
globalLocalNodeName: getCurrentMRFStatus(),
}
}
var healDisksMap = map[string]struct{}{}
for _, ep := range getLocalDisksToHeal() {
healDisksMap[ep.String()] = struct{}{}
}
status := madmin.BgHealState{
ScannedItemsCount: bgSeq.getScannedItemsCount(),
}
if o == nil {
healing := globalBackgroundHealState.getLocalHealingDisks()
@ -225,12 +243,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// If not deleted, assume they failed.
tracker.ObjectsFailed++
tracker.ItemsFailed++
tracker.BytesFailed += uint64(version.Size)
logger.LogIf(ctx, err)
}
} else {
tracker.ObjectsHealed++
tracker.ItemsHealed++
tracker.BytesDone += uint64(version.Size)
}
bgSeq.logHeal(madmin.HealItemObject)

View File

@ -293,6 +293,8 @@ var (
globalBackgroundHealRoutine *healRoutine
globalBackgroundHealState *allHealState
globalMRFState *mrfState
// If writes to FS backend should be O_SYNC.
globalFSOSync bool

242
cmd/mrf.go Normal file
View File

@ -0,0 +1,242 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"sync"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger"
)
var mrfHealingOpts = madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}
const (
mrfInfoResetInterval = 10 * time.Second
mrfOpsQueueSize = 10000
)
// partialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum)
type partialOperation struct {
bucket string
object string
versionID string
size int64
setIndex int
poolIndex int
}
type setInfo struct {
index, pool int
}
type mrfStats struct {
triggeredAt time.Time
itemsHealed uint64
bytesHealed uint64
pendingItems uint64
pendingBytes uint64
}
// mrfState sncapsulates all the information
// related to the global background MRF.
type mrfState struct {
ctx context.Context
objectAPI ObjectLayer
mu sync.Mutex
opCh chan partialOperation
pendingOps map[partialOperation]setInfo
setReconnectEvent chan setInfo
itemsHealed uint64
bytesHealed uint64
pendingItems uint64
pendingBytes uint64
triggeredAt time.Time
}
// Add a partial S3 operation (put/delete) when one or more disks are offline.
func (m *mrfState) addPartialOp(op partialOperation) {
if m == nil {
return
}
select {
case m.opCh <- op:
default:
}
}
// Receive the new set (disk) reconnection event
func (m *mrfState) newSetReconnected(pool, set int) {
if m == nil {
return
}
idler := time.NewTimer(100 * time.Millisecond)
defer idler.Stop()
select {
case m.setReconnectEvent <- setInfo{index: set, pool: pool}:
case <-idler.C:
}
}
// Get current MRF stats
func (m *mrfState) getCurrentMRFRoundInfo() mrfStats {
m.mu.Lock()
triggeredAt := m.triggeredAt
itemsHealed := m.itemsHealed
bytesHealed := m.bytesHealed
pendingItems := m.pendingItems
pendingBytes := m.pendingBytes
m.mu.Unlock()
if pendingItems == 0 {
return mrfStats{}
}
return mrfStats{
triggeredAt: triggeredAt,
itemsHealed: itemsHealed,
bytesHealed: bytesHealed,
pendingItems: pendingItems,
pendingBytes: pendingBytes,
}
}
// maintainMRFList gathers the list of successful partial uploads
// from all underlying er.sets and puts them in a global map which
// should not have more than 10000 entries.
func (m *mrfState) maintainMRFList() {
for fOp := range m.opCh {
m.mu.Lock()
if len(m.pendingOps) > mrfOpsQueueSize {
m.mu.Unlock()
continue
}
m.pendingOps[fOp] = setInfo{index: fOp.setIndex, pool: fOp.poolIndex}
m.pendingItems++
if fOp.size > 0 {
m.pendingBytes += uint64(fOp.size)
}
m.mu.Unlock()
}
}
// Reset current MRF stats
func (m *mrfState) resetMRFInfoIfNoPendingOps() {
m.mu.Lock()
defer m.mu.Unlock()
if m.pendingItems > 0 {
return
}
m.itemsHealed = 0
m.bytesHealed = 0
m.pendingItems = 0
m.pendingBytes = 0
m.triggeredAt = time.Time{}
}
// healRoutine listens to new disks reconnection events and
// issues healing requests for queued objects belonging to the
// corresponding erasure set
func (m *mrfState) healRoutine() {
idler := time.NewTimer(mrfInfoResetInterval)
defer idler.Stop()
for {
idler.Reset(mrfInfoResetInterval)
select {
case <-m.ctx.Done():
return
case <-idler.C:
m.resetMRFInfoIfNoPendingOps()
case setInfo := <-m.setReconnectEvent:
// Get the list of objects related the er.set
// to which the connected disk belongs.
var mrfOperations []partialOperation
m.mu.Lock()
for k, v := range m.pendingOps {
if v == setInfo {
mrfOperations = append(mrfOperations, k)
}
}
m.mu.Unlock()
if len(mrfOperations) == 0 {
continue
}
m.mu.Lock()
m.triggeredAt = time.Now().UTC()
m.mu.Unlock()
// Heal objects
for _, u := range mrfOperations {
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// If not deleted, assume they failed.
logger.LogIf(m.ctx, err)
} else {
m.mu.Lock()
m.itemsHealed++
m.pendingItems--
m.mu.Unlock()
}
} else {
m.mu.Lock()
m.itemsHealed++
m.pendingItems--
m.bytesHealed += uint64(u.size)
m.pendingBytes -= uint64(u.size)
m.mu.Unlock()
}
m.mu.Lock()
delete(m.pendingOps, u)
m.mu.Unlock()
}
}
}
}
// Initialize healing MRF
func initHealMRF(ctx context.Context, obj ObjectLayer) {
globalMRFState = &mrfState{
ctx: ctx,
objectAPI: obj,
opCh: make(chan partialOperation, mrfOpsQueueSize),
pendingOps: make(map[partialOperation]setInfo),
setReconnectEvent: make(chan setInfo),
}
go globalMRFState.maintainMRFList()
go globalMRFState.healRoutine()
}

View File

@ -537,6 +537,7 @@ func serverMain(ctx *cli.Context) {
// Enable background operations for erasure coding
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
initHealMRF(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
}

1
go.mod
View File

@ -75,6 +75,7 @@ require (
github.com/secure-io/sio-go v0.3.1
github.com/streadway/amqp v1.0.0
github.com/tinylib/msgp v1.1.6-0.20210521143832-0becd170c402
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c

2
go.sum
View File

@ -1410,6 +1410,8 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8=
github.com/ulikunitz/xz v0.5.7/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/ultraware/funlen v0.0.2/go.mod h1:Dp4UiAus7Wdb9KUZsYWZEWiRzGuM2kXM1lPbfaF6xhA=
github.com/ultraware/whitespace v0.0.4/go.mod h1:aVMh/gQve5Maj9hQ/hg+F75lr/X5A89uZnzAmWSineA=
github.com/unrolled/secure v1.0.7 h1:BcQHp3iKZyZCKj5gRqwQG+5urnGBF00wGgoPPwtheVQ=