Avoid pointer based copy, instead use Clone() (#8547)

This PR adds functional test to test expanded
cluster syntax.
This commit is contained in:
Harshavardhana 2019-11-21 04:24:51 -08:00 committed by Nitish Tiwari
parent 9565641b9b
commit 4e9de58675
18 changed files with 198 additions and 165 deletions

View file

@ -129,6 +129,33 @@ function start_minio_dist_erasure_sets()
echo "${minio_pids[@]}"
}
function start_minio_zone_erasure_sets()
{
declare -a minio_pids
export MINIO_ACCESS_KEY=$ACCESS_KEY
export MINIO_SECRET_KEY=$SECRET_KEY
"${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/zone-disk-sets{1...4}" >/dev/null 2>&1 &
current_pid=$!
sleep 10
kill -9 "${current_pid}"
"${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9001${WORK_DIR}/zone-disk-sets{5...8}" >/dev/null 2>&1 &
current_pid=$!
sleep 10
kill -9 "${current_pid}"
"${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/zone-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/zone-disk-sets{5...8}" >"$WORK_DIR/zone-minio-9000.log" 2>&1 &
minio_pids[0]=$!
"${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9000${WORK_DIR}/zone-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/zone-disk-sets{5...8}" >"$WORK_DIR/zone-minio-9001.log" 2>&1 &
minio_pids[1]=$!
sleep 10
echo "${minio_pids[@]}"
}
function start_minio_dist_erasure()
{
declare -a minio_pids
@ -236,6 +263,32 @@ function run_test_dist_erasure_sets()
return "$rv"
}
function run_test_zone_erasure_sets()
{
minio_pids=( $(start_minio_zone_erasure_sets) )
(cd "$WORK_DIR" && "$FUNCTIONAL_TESTS")
rv=$?
for pid in "${minio_pids[@]}"; do
kill "$pid"
done
sleep 3
if [ "$rv" -ne 0 ]; then
for i in $(seq 0 1); do
echo "server$i log:"
cat "$WORK_DIR/zone-minio-900$i.log"
done
fi
for i in $(seq 0 1); do
rm -f "$WORK_DIR/zone-minio-900$i.log"
done
return "$rv"
}
function run_test_erasure()
{
minio_pid="$(start_minio_erasure)"
@ -358,6 +411,13 @@ function main()
exit 1
fi
echo "Testing in Distributed Eraure expanded setup"
if ! run_test_zone_erasure_sets; then
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
echo "Testing in Distributed Erasure setup as sets with ipv6"
if ! run_test_dist_erasure_sets_ipv6; then
echo "FAILED"

View file

@ -85,7 +85,6 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
return
}
oldCfg := cfg.Clone()
scanner := bufio.NewScanner(bytes.NewReader(kvBytes))
for scanner.Scan() {
// Skip any empty lines
@ -102,7 +101,7 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
return
}
if err = saveServerConfig(ctx, objectAPI, cfg, oldCfg); err != nil {
if err = saveServerConfig(ctx, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -149,7 +148,6 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
}
}
oldCfg := cfg.Clone()
scanner := bufio.NewScanner(bytes.NewReader(kvBytes))
for scanner.Scan() {
// Skip any empty lines, or comment like characters
@ -172,7 +170,7 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
}
// Update the actual server config on disk.
if err = saveServerConfig(ctx, objectAPI, cfg, oldCfg); err != nil {
if err = saveServerConfig(ctx, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -308,7 +306,6 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r
}
}
oldCfg := cfg.Clone()
scanner := bufio.NewScanner(bytes.NewReader(kvBytes))
for scanner.Scan() {
// Skip any empty lines, or comment like characters
@ -331,7 +328,7 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r
return
}
if err = saveServerConfig(ctx, objectAPI, cfg, oldCfg); err != nil {
if err = saveServerConfig(ctx, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -444,7 +441,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques
return
}
if err = saveServerConfig(ctx, objectAPI, cfg, nil); err != nil {
if err = saveServerConfig(ctx, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}

View file

@ -485,7 +485,7 @@ func newSrvConfig(objAPI ObjectLayer) error {
globalServerConfigMu.Unlock()
// Save config into file.
return saveServerConfig(context.Background(), objAPI, globalServerConfig, nil)
return saveServerConfig(context.Background(), objAPI, globalServerConfig)
}
func getValidConfig(objAPI ObjectLayer) (config.Config, error) {

View file

@ -49,7 +49,7 @@ func TestServerConfig(t *testing.T) {
t.Errorf("Expecting region `us-west-1` found %s", globalServerRegion)
}
if err := saveServerConfig(context.Background(), objLayer, globalServerConfig, nil); err != nil {
if err := saveServerConfig(context.Background(), objLayer, globalServerConfig); err != nil {
t.Fatalf("Unable to save updated config file %s", err)
}

View file

@ -2466,7 +2466,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
// Initialize the server config, if no config exists.
return newSrvConfig(objAPI)
}
return saveServerConfig(context.Background(), objAPI, config, nil)
return saveServerConfig(context.Background(), objAPI, config)
}
// Migrates '.minio.sys/config.json' to v33.
@ -2548,7 +2548,7 @@ func migrateV27ToV28MinioSys(objAPI ObjectLayer) error {
cfg.Version = "28"
cfg.KMS = crypto.KMSConfig{}
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from 27 to 28. %v", err)
}
@ -2575,7 +2575,7 @@ func migrateV28ToV29MinioSys(objAPI ObjectLayer) error {
}
cfg.Version = "29"
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from 28 to 29. %v", err)
}
@ -2607,7 +2607,7 @@ func migrateV29ToV30MinioSys(objAPI ObjectLayer) error {
cfg.Compression.Extensions = strings.Split(compress.DefaultExtensions, config.ValueSeparator)
cfg.Compression.MimeTypes = strings.Split(compress.DefaultMimeTypes, config.ValueSeparator)
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from 29 to 30. %v", err)
}
@ -2642,7 +2642,7 @@ func migrateV30ToV31MinioSys(objAPI ObjectLayer) error {
AuthToken: "",
}
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from 30 to 31. %v", err)
}
@ -2672,7 +2672,7 @@ func migrateV31ToV32MinioSys(objAPI ObjectLayer) error {
cfg.Notify.NSQ = make(map[string]target.NSQArgs)
cfg.Notify.NSQ["1"] = target.NSQArgs{}
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from 31 to 32. %v", err)
}
@ -2700,8 +2700,8 @@ func migrateV32ToV33MinioSys(objAPI ObjectLayer) error {
cfg.Version = "33"
if err = saveServerConfig(context.Background(), objAPI, cfg, nil); err != nil {
return fmt.Errorf("Failed to migrate config from 32 to 33 . %v", err)
if err = saveServerConfig(context.Background(), objAPI, cfg); err != nil {
return fmt.Errorf("Failed to migrate config from '32' to '33' . %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "32", "33")
@ -2777,7 +2777,7 @@ func migrateMinioSysConfigToKV(objAPI ObjectLayer) error {
notify.SetNotifyWebhook(newCfg, k, args)
}
if err = saveServerConfig(context.Background(), objAPI, newCfg, cfg); err != nil {
if err = saveServerConfig(context.Background(), objAPI, newCfg); err != nil {
return err
}

View file

@ -129,49 +129,12 @@ func saveServerConfigHistory(ctx context.Context, objAPI ObjectLayer, kv []byte)
return saveConfig(ctx, objAPI, historyFile, kv)
}
func saveServerConfig(ctx context.Context, objAPI ObjectLayer, config interface{}, oldConfig interface{}) error {
func saveServerConfig(ctx context.Context, objAPI ObjectLayer, config interface{}) error {
data, err := json.Marshal(config)
if err != nil {
return err
}
configFile := path.Join(minioConfigPrefix, minioConfigFile)
// Create a backup of the current config
backupConfigFile := path.Join(minioConfigPrefix, minioConfigBackupFile)
var oldData []byte
var freshConfig bool
if oldConfig == nil {
oldData, err = readConfig(ctx, objAPI, configFile)
if err != nil && err != errConfigNotFound {
return err
}
if err == errConfigNotFound {
// Current config not found, so nothing to backup.
freshConfig = true
}
// Do not need to decrypt oldData since we are going to
// save it anyway if freshConfig is false.
} else {
oldData, err = json.Marshal(oldConfig)
if err != nil {
return err
}
if globalConfigEncrypted {
oldData, err = madmin.EncryptData(globalActiveCred.String(), oldData)
if err != nil {
return err
}
}
}
// No need to take backups for fresh setups.
if !freshConfig {
if err = saveConfig(ctx, objAPI, backupConfigFile, oldData); err != nil {
return err
}
}
if globalConfigEncrypted {
data, err = madmin.EncryptData(globalActiveCred.String(), data)
if err != nil {
@ -179,6 +142,7 @@ func saveServerConfig(ctx context.Context, objAPI ObjectLayer, config interface{
}
}
configFile := path.Join(minioConfigPrefix, minioConfigFile)
// Save the new config in the std config path
return saveConfig(ctx, objAPI, configFile, data)
}

View file

@ -257,61 +257,60 @@ func GetAllSets(args ...string) ([][]string, error) {
// CreateServerEndpoints - validates and creates new endpoints from input args, supports
// both ellipses and without ellipses transparently.
func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, SetupType, error) {
func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, int, SetupType, error) {
if len(args) == 0 {
return nil, -1, errInvalidArgument
return nil, -1, -1, errInvalidArgument
}
var endpointZones EndpointZones
var setupType SetupType
var drivesPerSet int
if !ellipses.HasEllipses(args...) {
setArgs, err := GetAllSets(args...)
if err != nil {
return nil, -1, err
return nil, -1, -1, err
}
endpointList, newSetupType, err := CreateEndpoints(serverAddr, setArgs...)
if err != nil {
return nil, -1, err
return nil, -1, -1, err
}
endpointZones = append(endpointZones, ZoneEndpoints{
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
})
globalXLSetDriveCount = len(setArgs[0])
setupType = newSetupType
return endpointZones, setupType, nil
return endpointZones, len(setArgs[0]), setupType, nil
}
// Look for duplicate args.
if _, err := GetAllSets(args...); err != nil {
return nil, -1, err
return nil, -1, -1, err
}
for _, arg := range args {
setArgs, err := GetAllSets(arg)
if err != nil {
return nil, -1, err
return nil, -1, -1, err
}
endpointList, newSetupType, err := CreateEndpoints(serverAddr, setArgs...)
if err != nil {
return nil, -1, err
return nil, -1, -1, err
}
if setupType != 0 && setupType != newSetupType {
return nil, -1, fmt.Errorf("Mixed modes of operation %s and %s are not allowed",
return nil, -1, -1, fmt.Errorf("Mixed modes of operation %s and %s are not allowed",
setupType, newSetupType)
}
if globalXLSetDriveCount != 0 && globalXLSetDriveCount != len(setArgs[0]) {
return nil, -1, fmt.Errorf("All zones should have same drive per set ratio - expected %d, got %d",
globalXLSetDriveCount, len(setArgs[0]))
if drivesPerSet != 0 && drivesPerSet != len(setArgs[0]) {
return nil, -1, -1, fmt.Errorf("All zones should have same drive per set ratio - expected %d, got %d", drivesPerSet, len(setArgs[0]))
}
endpointZones = append(endpointZones, ZoneEndpoints{
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
})
globalXLSetDriveCount = len(setArgs[0])
drivesPerSet = len(setArgs[0])
setupType = newSetupType
}
return endpointZones, setupType, nil
return endpointZones, drivesPerSet, setupType, nil
}

View file

@ -53,14 +53,17 @@ func TestCreateServerEndpoints(t *testing.T) {
{":9001", []string{"http://localhost:9001/export{01...64}"}, true},
}
for i, testCase := range testCases {
_, _, err := createServerEndpoints(testCase.serverAddr, testCase.args...)
if err != nil && testCase.success {
t.Errorf("Test %d: Expected success but failed instead %s", i+1, err)
}
if err == nil && !testCase.success {
t.Errorf("Test %d: Expected failure but passed instead", i+1)
}
for _, testCase := range testCases {
testCase := testCase
t.Run("", func(t *testing.T) {
_, _, _, err := createServerEndpoints(testCase.serverAddr, testCase.args...)
if err != nil && testCase.success {
t.Errorf("Expected success but failed instead %s", err)
}
if err == nil && !testCase.success {
t.Errorf("Expected failure but passed instead")
}
})
}
}

View file

@ -196,6 +196,11 @@ type ZoneEndpoints struct {
// EndpointZones - list of list of endpoints
type EndpointZones []ZoneEndpoints
// First returns true if the first endpoint is local.
func (l EndpointZones) First() bool {
return l[0].Endpoints[0].IsLocal
}
// HTTPS - returns true if secure for URLEndpointType.
func (l EndpointZones) HTTPS() bool {
return l[0].Endpoints.HTTPS()

View file

@ -124,6 +124,18 @@ type formatXLV3 struct {
} `json:"xl"`
}
func (f *formatXLV3) Clone() *formatXLV3 {
b, err := json.Marshal(f)
if err != nil {
panic(err)
}
var dst formatXLV3
if err = json.Unmarshal(b, &dst); err != nil {
panic(err)
}
return &dst
}
// Returns formatXL.XL.Version
func newFormatXLV3(numSets int, setLen int) *formatXLV3 {
format := &formatXLV3{}
@ -581,9 +593,9 @@ func getFormatXLInQuorum(formats []*formatXLV3) (*formatXLV3, error) {
for i, hash := range formatHashes {
if hash == maxHash {
format := *formats[i]
format := formats[i].Clone()
format.XL.This = ""
return &format, nil
return format, nil
}
}
@ -591,7 +603,7 @@ func getFormatXLInQuorum(formats []*formatXLV3) (*formatXLV3, error) {
}
func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error {
tmpFormat := *format
tmpFormat := format.Clone()
this := tmpFormat.XL.This
tmpFormat.XL.This = ""
if len(reference.XL.Sets) != len(format.XL.Sets) {
@ -718,32 +730,32 @@ func fixFormatXLV3(storageDisks []StorageAPI, endpoints Endpoints, formats []*fo
}
// initFormatXL - save XL format configuration on all disks.
func initFormatXL(ctx context.Context, storageDisks []StorageAPI, setCount, disksPerSet int, deploymentID string) (format *formatXLV3, err error) {
format = newFormatXLV3(setCount, disksPerSet)
func initFormatXL(ctx context.Context, storageDisks []StorageAPI, setCount, drivesPerSet int, deploymentID string) (*formatXLV3, error) {
format := newFormatXLV3(setCount, drivesPerSet)
formats := make([]*formatXLV3, len(storageDisks))
for i := 0; i < setCount; i++ {
for j := 0; j < disksPerSet; j++ {
newFormat := *format
for j := 0; j < drivesPerSet; j++ {
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[i][j]
if deploymentID != "" {
newFormat.ID = deploymentID
}
formats[i*disksPerSet+j] = &newFormat
formats[i*drivesPerSet+j] = newFormat
}
}
// Initialize meta volume, if volume already exists ignores it.
if err = initFormatXLMetaVolume(storageDisks, formats); err != nil {
if err := initFormatXLMetaVolume(storageDisks, formats); err != nil {
return format, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Save formats `format.json` across all disks.
if err = saveFormatXLAll(ctx, storageDisks, formats); err != nil {
if err := saveFormatXLAll(ctx, storageDisks, formats); err != nil {
return nil, err
}
return format, nil
return getFormatXLInQuorum(formats)
}
// Make XL backend meta volumes.
@ -857,14 +869,14 @@ func markUUIDsOffline(refFormat *formatXLV3, formats []*formatXLV3) {
}
// Initialize a new set of set formats which will be written to all disks.
func newHealFormatSets(refFormat *formatXLV3, setCount, disksPerSet int, formats []*formatXLV3, errs []error) [][]*formatXLV3 {
func newHealFormatSets(refFormat *formatXLV3, setCount, drivesPerSet int, formats []*formatXLV3, errs []error) [][]*formatXLV3 {
newFormats := make([][]*formatXLV3, setCount)
for i := range refFormat.XL.Sets {
newFormats[i] = make([]*formatXLV3, disksPerSet)
newFormats[i] = make([]*formatXLV3, drivesPerSet)
}
for i := range refFormat.XL.Sets {
for j := range refFormat.XL.Sets[i] {
if errs[i*disksPerSet+j] == errUnformattedDisk || errs[i*disksPerSet+j] == nil {
if errs[i*drivesPerSet+j] == errUnformattedDisk || errs[i*drivesPerSet+j] == nil {
newFormats[i][j] = &formatXLV3{}
newFormats[i][j].Version = refFormat.Version
newFormats[i][j].ID = refFormat.ID
@ -872,13 +884,13 @@ func newHealFormatSets(refFormat *formatXLV3, setCount, disksPerSet int, formats
newFormats[i][j].XL.Version = refFormat.XL.Version
newFormats[i][j].XL.DistributionAlgo = refFormat.XL.DistributionAlgo
}
if errs[i*disksPerSet+j] == errUnformattedDisk {
if errs[i*drivesPerSet+j] == errUnformattedDisk {
newFormats[i][j].XL.This = ""
newFormats[i][j].XL.Sets = nil
continue
}
if errs[i*disksPerSet+j] == nil {
newFormats[i][j].XL.This = formats[i*disksPerSet+j].XL.This
if errs[i*drivesPerSet+j] == nil {
newFormats[i][j].XL.This = formats[i*drivesPerSet+j].XL.This
newFormats[i][j].XL.Sets = nil
}
}

View file

@ -96,9 +96,9 @@ func TestFixFormatV3(t *testing.T) {
formats := make([]*formatXLV3, 8)
for j := 0; j < 8; j++ {
newFormat := *format
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[0][j]
formats[j] = &newFormat
formats[j] = newFormat
}
if err = initFormatXLMetaVolume(storageDisks, formats); err != nil {
@ -130,9 +130,9 @@ func TestFormatXLEmpty(t *testing.T) {
formats := make([]*formatXLV3, 16)
for j := 0; j < 16; j++ {
newFormat := *format
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[0][j]
formats[j] = &newFormat
formats[j] = newFormat
}
// empty format to indicate disk not found, but this
@ -411,16 +411,16 @@ func TestCheckFormatXLValue(t *testing.T) {
// Tests getFormatXLInQuorum()
func TestGetFormatXLInQuorumCheck(t *testing.T) {
setCount := 2
disksPerSet := 16
drivesPerSet := 16
format := newFormatXLV3(setCount, disksPerSet)
format := newFormatXLV3(setCount, drivesPerSet)
formats := make([]*formatXLV3, 32)
for i := 0; i < setCount; i++ {
for j := 0; j < disksPerSet; j++ {
newFormat := *format
for j := 0; j < drivesPerSet; j++ {
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[i][j]
formats[i*disksPerSet+j] = &newFormat
formats[i*drivesPerSet+j] = newFormat
}
}
@ -477,16 +477,16 @@ func TestGetFormatXLInQuorumCheck(t *testing.T) {
// Tests formatXLGetDeploymentID()
func TestGetXLID(t *testing.T) {
setCount := 2
disksPerSet := 8
drivesPerSet := 8
format := newFormatXLV3(setCount, disksPerSet)
format := newFormatXLV3(setCount, drivesPerSet)
formats := make([]*formatXLV3, 16)
for i := 0; i < setCount; i++ {
for j := 0; j < disksPerSet; j++ {
newFormat := *format
for j := 0; j < drivesPerSet; j++ {
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[i][j]
formats[i*disksPerSet+j] = &newFormat
formats[i*drivesPerSet+j] = newFormat
}
}
@ -532,17 +532,17 @@ func TestGetXLID(t *testing.T) {
// Initialize new format sets.
func TestNewFormatSets(t *testing.T) {
setCount := 2
disksPerSet := 16
drivesPerSet := 16
format := newFormatXLV3(setCount, disksPerSet)
format := newFormatXLV3(setCount, drivesPerSet)
formats := make([]*formatXLV3, 32)
errs := make([]error, 32)
for i := 0; i < setCount; i++ {
for j := 0; j < disksPerSet; j++ {
newFormat := *format
for j := 0; j < drivesPerSet; j++ {
newFormat := format.Clone()
newFormat.XL.This = format.XL.Sets[i][j]
formats[i*disksPerSet+j] = &newFormat
formats[i*drivesPerSet+j] = newFormat
}
}
@ -554,7 +554,7 @@ func TestNewFormatSets(t *testing.T) {
// 16th disk is unformatted.
errs[15] = errUnformattedDisk
newFormats := newHealFormatSets(quorumFormat, setCount, disksPerSet, formats, errs)
newFormats := newHealFormatSets(quorumFormat, setCount, drivesPerSet, formats, errs)
if newFormats == nil {
t.Fatal("Unexpected failure")
}

View file

@ -286,8 +286,8 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
}
// Format disks before initialization of object layer.
func waitForFormatXL(firstDisk bool, endpoints Endpoints, setCount, disksPerSet int, deploymentID string) (format *formatXLV3, err error) {
if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 {
func waitForFormatXL(firstDisk bool, endpoints Endpoints, setCount, drivesPerSet int, deploymentID string) (format *formatXLV3, err error) {
if len(endpoints) == 0 || setCount == 0 || drivesPerSet == 0 {
return nil, errInvalidArgument
}
@ -318,7 +318,7 @@ func waitForFormatXL(firstDisk bool, endpoints Endpoints, setCount, disksPerSet
for {
select {
case retryCount := <-retryTimerCh:
format, err := connectLoadInitFormats(retryCount, firstDisk, endpoints, setCount, disksPerSet, deploymentID)
format, err := connectLoadInitFormats(retryCount, firstDisk, endpoints, setCount, drivesPerSet, deploymentID)
if err != nil {
switch err {
case errNotFirstDisk:

View file

@ -146,9 +146,9 @@ func serverHandleCmdArgs(ctx *cli.Context) {
endpoints := strings.Fields(env.Get(config.EnvEndpoints, ""))
if len(endpoints) > 0 {
globalEndpoints, setupType, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
} else {
globalEndpoints, setupType, err = createServerEndpoints(globalCLIContext.Addr, ctx.Args()...)
globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, ctx.Args()...)
}
logger.FatalIf(err, "Invalid command line arguments")
@ -437,17 +437,5 @@ func newObjectLayer(endpointZones EndpointZones) (newObject ObjectLayer, err err
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
}
var formats = make([]*formatXLV3, len(endpointZones))
var deploymentID string
for i, ep := range endpointZones {
formats[i], err = waitForFormatXL(ep.Endpoints[0].IsLocal, ep.Endpoints,
ep.SetCount, ep.DrivesPerSet, deploymentID)
if err != nil {
return nil, err
}
if deploymentID == "" {
deploymentID = formats[i].ID
}
}
return newXLZones(endpointZones, formats)
return newXLZones(endpointZones)
}

View file

@ -515,7 +515,7 @@ func newTestConfig(bucketLocation string, obj ObjectLayer) (err error) {
config.SetRegion(globalServerConfig, bucketLocation)
// Save config.
return saveServerConfig(context.Background(), obj, globalServerConfig, nil)
return saveServerConfig(context.Background(), obj, globalServerConfig)
}
// Deleting the temporary backend and stopping the server.
@ -1585,20 +1585,7 @@ func newTestObjectLayer(endpointZones EndpointZones) (newObject ObjectLayer, err
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
}
var formats = make([]*formatXLV3, len(endpointZones))
var deploymentID string
for i, ep := range endpointZones {
formats[i], err = waitForFormatXL(ep.Endpoints[0].IsLocal, ep.Endpoints,
ep.SetCount, ep.DrivesPerSet, deploymentID)
if err != nil {
return nil, err
}
if deploymentID == "" {
deploymentID = formats[i].ID
}
}
zones, err := newXLZones(endpointZones, formats)
z, err := newXLZones(endpointZones)
if err != nil {
return nil, err
}
@ -1606,15 +1593,15 @@ func newTestObjectLayer(endpointZones EndpointZones) (newObject ObjectLayer, err
globalConfigSys = NewConfigSys()
globalIAMSys = NewIAMSys()
globalIAMSys.Init(zones)
globalIAMSys.Init(z)
globalPolicySys = NewPolicySys()
globalPolicySys.Init(nil, zones)
globalPolicySys.Init(nil, z)
globalNotificationSys = NewNotificationSys(endpointZones)
globalNotificationSys.Init(nil, zones)
globalNotificationSys.Init(nil, z)
return zones, nil
return z, nil
}
// initObjectLayer - Instantiates object layer and returns it.

View file

@ -646,7 +646,8 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
// Rename the successfully written temporary object to final location.
if _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil {
_, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}

View file

@ -50,15 +50,30 @@ func (z *xlZones) quickHealBuckets(ctx context.Context) {
}
}
// Initialize new zone of erasure codes.
func newXLZones(endpointZones EndpointZones, formats []*formatXLV3) (ObjectLayer, error) {
z := &xlZones{}
// Initialize new zone of erasure sets.
func newXLZones(endpointZones EndpointZones) (ObjectLayer, error) {
var (
deploymentID string
err error
formats = make([]*formatXLV3, len(endpointZones))
z = &xlZones{zones: make([]*xlSets, len(endpointZones))}
)
for i, ep := range endpointZones {
sets, err := newXLSets(ep.Endpoints, formats[i], ep.SetCount, ep.DrivesPerSet)
formats[i], err = waitForFormatXL(endpointZones.First(), ep.Endpoints,
ep.SetCount, ep.DrivesPerSet, deploymentID)
if err != nil {
return nil, err
}
if deploymentID == "" {
deploymentID = formats[i].ID
}
}
for i, ep := range endpointZones {
z.zones[i], err = newXLSets(ep.Endpoints, formats[i], ep.SetCount, ep.DrivesPerSet)
if err != nil {
return nil, err
}
z.zones = append(z.zones, sets)
}
z.quickHealBuckets(context.Background())
return z, nil

View file

@ -28,8 +28,8 @@ func TestNameExpand(t *testing.T) {
name Name
expectedResult []Name
}{
{ObjectAccessedAll, []Name{ObjectAccessedGet, ObjectAccessedHead}},
{ObjectCreatedAll, []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy, ObjectCreatedPost, ObjectCreatedPut}},
{ObjectAccessedAll, []Name{ObjectAccessedGet, ObjectAccessedHead, ObjectAccessedGetRetention}},
{ObjectCreatedAll, []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy, ObjectCreatedPost, ObjectCreatedPut, ObjectCreatedPutRetention}},
{ObjectRemovedAll, []Name{ObjectRemovedDelete}},
{ObjectAccessedHead, []Name{ObjectAccessedHead}},
}
@ -38,7 +38,7 @@ func TestNameExpand(t *testing.T) {
result := testCase.name.Expand()
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
t.Errorf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}

View file

@ -153,10 +153,12 @@ func TestRulesMapMatch(t *testing.T) {
func TestNewRulesMap(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapCase1.add([]Name{ObjectAccessedGet, ObjectAccessedHead}, "*", TargetID{"1", "webhook"})
rulesMapCase1.add([]Name{ObjectAccessedGet, ObjectAccessedHead, ObjectAccessedGetRetention},
"*", TargetID{"1", "webhook"})
rulesMapCase2 := make(RulesMap)
rulesMapCase2.add([]Name{ObjectAccessedGet, ObjectAccessedHead, ObjectCreatedPut}, "*", TargetID{"1", "webhook"})
rulesMapCase2.add([]Name{ObjectAccessedGet, ObjectAccessedHead,
ObjectCreatedPut, ObjectAccessedGetRetention}, "*", TargetID{"1", "webhook"})
rulesMapCase3 := make(RulesMap)
rulesMapCase3.add([]Name{ObjectRemovedDelete}, "2010*.jpg", TargetID{"1", "webhook"})
@ -176,7 +178,7 @@ func TestNewRulesMap(t *testing.T) {
result := NewRulesMap(testCase.eventNames, testCase.pattern, testCase.targetID)
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
t.Errorf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}