add format header and version

This commit is contained in:
Harshavardhana 2021-11-09 13:42:09 -08:00
parent bf19e9b52b
commit c81a2e2d53
2 changed files with 60 additions and 59 deletions

View file

@ -20,8 +20,10 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"time" "time"
@ -29,18 +31,16 @@ import (
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
) )
// PoolDecommissionInfo currently decomissioning information // PoolDecommissionInfo currently decomissioning information
type PoolDecommissionInfo struct { type PoolDecommissionInfo struct {
StartTime time.Time `json:"startTime" msg:"st"` StartTime time.Time `json:"startTime" msg:"st"`
StartSize int64 `json:"startSize" msg:"ss"` StartSize int64 `json:"startSize" msg:"ss"`
TotalSize int64 `json:"totalSize" msg:"ts"` TotalSize int64 `json:"totalSize" msg:"ts"`
Duration time.Duration `json:"duration" msg:"du"` CurrentSize int64 `json:"currentSize" msg:"cs"`
CurrentSize int64 `json:"currentSize" msg:"cs"` Complete bool `json:"complete" msg:"cmp"`
Complete bool `json:"complete" msg:"cmp"` Failed bool `json:"failed" msg:"fl"`
Failed bool `json:"failed" msg:"fl"`
} }
// PoolStatus captures current pool status // PoolStatus captures current pool status
@ -53,7 +53,7 @@ type PoolStatus struct {
//go:generate msgp -file $GOFILE -unexported //go:generate msgp -file $GOFILE -unexported
type poolMeta struct { type poolMeta struct {
Version string `msg:"v"` Version int `msg:"v"`
Pools []PoolStatus `msg:"pls"` Pools []PoolStatus `msg:"pls"`
} }
@ -103,22 +103,44 @@ func (p poolMeta) IsSuspended(idx int) bool {
func (p *poolMeta) load(ctx context.Context, set *erasureSets, sets []*erasureSets) (bool, error) { func (p *poolMeta) load(ctx context.Context, set *erasureSets, sets []*erasureSets) (bool, error) {
gr, err := set.GetObjectNInfo(ctx, minioMetaBucket, poolMetaName, gr, err := set.GetObjectNInfo(ctx, minioMetaBucket, poolMetaName,
nil, http.Header{}, readLock, ObjectOptions{}) nil, http.Header{}, readLock, ObjectOptions{})
if err != nil && !isErrObjectNotFound(err) { if err != nil {
if isErrObjectNotFound(err) {
return true, nil
}
return false, err return false, err
} }
if isErrObjectNotFound(err) { defer gr.Close()
data, err := ioutil.ReadAll(gr)
if err != nil {
return false, err
}
if len(data) == 0 {
return true, nil return true, nil
} }
defer gr.Close() if len(data) <= 4 {
return false, fmt.Errorf("poolMeta: no data")
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case poolMetaFormat:
default:
return false, fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case poolMetaVersion:
default:
return false, fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
if err = p.DecodeMsg(msgp.NewReader(gr)); err != nil { // OK, parse data.
if _, err = p.UnmarshalMsg(data[4:]); err != nil {
return false, err return false, err
} }
switch p.Version { switch p.Version {
case poolMetaV1: case poolMetaVersionV1:
default: default:
return false, fmt.Errorf("unexpected pool meta version: %s", p.Version) return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
} }
// Total pools cannot reduce upon restart, but allow for // Total pools cannot reduce upon restart, but allow for
@ -164,10 +186,17 @@ func (p poolMeta) Clone() poolMeta {
} }
func (p poolMeta) save(ctx context.Context, sets []*erasureSets) error { func (p poolMeta) save(ctx context.Context, sets []*erasureSets) error {
buf, err := p.MarshalMsg(nil) data := make([]byte, 4, p.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion)
buf, err := p.MarshalMsg(data)
if err != nil { if err != nil {
return err return err
} }
br := bytes.NewReader(buf) br := bytes.NewReader(buf)
for _, set := range sets { for _, set := range sets {
r, err := hash.NewReader(br, br.Size(), "", "", br.Size()) r, err := hash.NewReader(br, br.Size(), "", "", br.Size())
@ -184,8 +213,10 @@ func (p poolMeta) save(ctx context.Context, sets []*erasureSets) error {
} }
const ( const (
poolMetaName = "pool.meta" poolMetaName = "pool.meta"
poolMetaV1 = "1" poolMetaFormat = 1
poolMetaVersionV1 = 1
poolMetaVersion = poolMetaVersionV1
) )
// Init() initializes pools and saves additional information about them // Init() initializes pools and saves additional information about them
@ -210,7 +241,7 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
// looks like new pool was added we need to update, // looks like new pool was added we need to update,
// or this is a fresh installation (or an existing // or this is a fresh installation (or an existing
// installation with pool removed) // installation with pool removed)
meta.Version = "1" meta.Version = poolMetaVersion
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
meta.Pools = append(meta.Pools, PoolStatus{ meta.Pools = append(meta.Pools, PoolStatus{
CmdLine: pool.endpoints.CmdLine, CmdLine: pool.endpoints.CmdLine,
@ -481,7 +512,6 @@ func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, e
poolInfo := z.poolMeta.Pools[idx] poolInfo := z.poolMeta.Pools[idx]
if poolInfo.Decommission != nil { if poolInfo.Decommission != nil {
poolInfo.Decommission.Duration = time.Since(poolInfo.Decommission.StartTime)
poolInfo.Decommission.CurrentSize = currentSize poolInfo.Decommission.CurrentSize = currentSize
} else { } else {
poolInfo.Decommission = &PoolDecommissionInfo{ poolInfo.Decommission = &PoolDecommissionInfo{

View file

@ -42,12 +42,6 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "TotalSize") err = msgp.WrapError(err, "TotalSize")
return return
} }
case "du":
err = z.Duration.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Duration")
return
}
case "cs": case "cs":
z.CurrentSize, err = dc.ReadInt64() z.CurrentSize, err = dc.ReadInt64()
if err != nil { if err != nil {
@ -79,9 +73,9 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 7 // map header, size 6
// write "st" // write "st"
err = en.Append(0x87, 0xa2, 0x73, 0x74) err = en.Append(0x86, 0xa2, 0x73, 0x74)
if err != nil { if err != nil {
return return
} }
@ -110,16 +104,6 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "TotalSize") err = msgp.WrapError(err, "TotalSize")
return return
} }
// write "du"
err = en.Append(0xa2, 0x64, 0x75)
if err != nil {
return
}
err = z.Duration.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Duration")
return
}
// write "cs" // write "cs"
err = en.Append(0xa2, 0x63, 0x73) err = en.Append(0xa2, 0x63, 0x73)
if err != nil { if err != nil {
@ -156,9 +140,9 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) { func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 7 // map header, size 6
// string "st" // string "st"
o = append(o, 0x87, 0xa2, 0x73, 0x74) o = append(o, 0x86, 0xa2, 0x73, 0x74)
o = msgp.AppendTime(o, z.StartTime) o = msgp.AppendTime(o, z.StartTime)
// string "ss" // string "ss"
o = append(o, 0xa2, 0x73, 0x73) o = append(o, 0xa2, 0x73, 0x73)
@ -166,13 +150,6 @@ func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "ts" // string "ts"
o = append(o, 0xa2, 0x74, 0x73) o = append(o, 0xa2, 0x74, 0x73)
o = msgp.AppendInt64(o, z.TotalSize) o = msgp.AppendInt64(o, z.TotalSize)
// string "du"
o = append(o, 0xa2, 0x64, 0x75)
o, err = z.Duration.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Duration")
return
}
// string "cs" // string "cs"
o = append(o, 0xa2, 0x63, 0x73) o = append(o, 0xa2, 0x63, 0x73)
o = msgp.AppendInt64(o, z.CurrentSize) o = msgp.AppendInt64(o, z.CurrentSize)
@ -221,12 +198,6 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "TotalSize") err = msgp.WrapError(err, "TotalSize")
return return
} }
case "du":
bts, err = z.Duration.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Duration")
return
}
case "cs": case "cs":
z.CurrentSize, bts, err = msgp.ReadInt64Bytes(bts) z.CurrentSize, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil { if err != nil {
@ -259,7 +230,7 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *PoolDecommissionInfo) Msgsize() (s int) { func (z *PoolDecommissionInfo) Msgsize() (s int) {
s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + z.Duration.Msgsize() + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize
return return
} }
@ -503,7 +474,7 @@ func (z *poolMeta) DecodeMsg(dc *msgp.Reader) (err error) {
} }
switch msgp.UnsafeString(field) { switch msgp.UnsafeString(field) {
case "v": case "v":
z.Version, err = dc.ReadString() z.Version, err = dc.ReadInt()
if err != nil { if err != nil {
err = msgp.WrapError(err, "Version") err = msgp.WrapError(err, "Version")
return return
@ -546,7 +517,7 @@ func (z *poolMeta) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil { if err != nil {
return return
} }
err = en.WriteString(z.Version) err = en.WriteInt(z.Version)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Version") err = msgp.WrapError(err, "Version")
return return
@ -577,7 +548,7 @@ func (z *poolMeta) MarshalMsg(b []byte) (o []byte, err error) {
// map header, size 2 // map header, size 2
// string "v" // string "v"
o = append(o, 0x82, 0xa1, 0x76) o = append(o, 0x82, 0xa1, 0x76)
o = msgp.AppendString(o, z.Version) o = msgp.AppendInt(o, z.Version)
// string "pls" // string "pls"
o = append(o, 0xa3, 0x70, 0x6c, 0x73) o = append(o, 0xa3, 0x70, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.Pools))) o = msgp.AppendArrayHeader(o, uint32(len(z.Pools)))
@ -610,7 +581,7 @@ func (z *poolMeta) UnmarshalMsg(bts []byte) (o []byte, err error) {
} }
switch msgp.UnsafeString(field) { switch msgp.UnsafeString(field) {
case "v": case "v":
z.Version, bts, err = msgp.ReadStringBytes(bts) z.Version, bts, err = msgp.ReadIntBytes(bts)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Version") err = msgp.WrapError(err, "Version")
return return
@ -648,7 +619,7 @@ func (z *poolMeta) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *poolMeta) Msgsize() (s int) { func (z *poolMeta) Msgsize() (s int) {
s = 1 + 2 + msgp.StringPrefixSize + len(z.Version) + 4 + msgp.ArrayHeaderSize s = 1 + 2 + msgp.IntSize + 4 + msgp.ArrayHeaderSize
for za0001 := range z.Pools { for za0001 := range z.Pools {
s += z.Pools[za0001].Msgsize() s += z.Pools[za0001].Msgsize()
} }