Fix configuration handling bugs: (#5473)

* Update the GetConfig admin API to use the latest version of
  configuration, along with fixes to the corresponding RPCs.
* Remove mutex inside the configuration struct, and inside
  notification struct.
* Use global config mutex where needed.
* Add `serverConfig.ConfigDiff()` that provides a more granular diff
  of what is different between two configurations.
This commit is contained in:
Aditya Manthramurthy 2018-01-31 08:15:54 -08:00 committed by kannappanr
parent e608e05cda
commit 018813b98f
10 changed files with 166 additions and 102 deletions

View file

@ -795,7 +795,7 @@ func (a adminAPIHandlers) UpdateCredentialsHandler(w http.ResponseWriter,
}
// Take a lock on minio/config.json. Prevents concurrent
// config file/credentials updates.
// config file updates.
configLock := globalNSMutex.NewNSLock(minioReservedBucket, minioConfigFile)
if configLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponseJSON(w, ErrOperationTimedOut, r.URL)
@ -803,6 +803,10 @@ func (a adminAPIHandlers) UpdateCredentialsHandler(w http.ResponseWriter,
}
defer configLock.Unlock()
// Acquire lock before updating global configuration.
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
// Notify all other Minio peers to update credentials
updateErrs := updateCredsOnPeers(creds)
for peer, err := range updateErrs {

View file

@ -23,7 +23,6 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"sort"
"sync"
"time"
@ -478,7 +477,7 @@ func getPeerConfig(peers adminPeers) ([]byte, error) {
// Find the maximally occurring config among peers in a
// distributed setup.
serverConfigs := make([]serverConfigV13, len(peers))
serverConfigs := make([]serverConfig, len(peers))
for i, configBytes := range configs {
if errs[i] != nil {
continue
@ -505,7 +504,7 @@ func getPeerConfig(peers adminPeers) ([]byte, error) {
// getValidServerConfig - finds the server config that is present in
// quorum or more number of servers.
func getValidServerConfig(serverConfigs []serverConfigV13, errs []error) (scv serverConfigV13, e error) {
func getValidServerConfig(serverConfigs []serverConfig, errs []error) (scv serverConfig, e error) {
// majority-based quorum
quorum := len(serverConfigs)/2 + 1
@ -548,7 +547,7 @@ func getValidServerConfig(serverConfigs []serverConfigV13, errs []error) (scv se
// seen. See example above for
// clarity.
continue
} else if j < i && reflect.DeepEqual(serverConfigs[i], serverConfigs[j]) {
} else if j < i && serverConfigs[i].ConfigDiff(&serverConfigs[j]) == "" {
// serverConfigs[i] is equal to
// serverConfigs[j], update
// serverConfigs[j]'s counter since it
@ -567,7 +566,7 @@ func getValidServerConfig(serverConfigs []serverConfigV13, errs []error) (scv se
// We find the maximally occurring server config and check if
// there is quorum.
var configJSON serverConfigV13
var configJSON serverConfig
maxOccurrence := 0
for i, count := range configCounter {
if maxOccurrence < count {

View file

@ -220,7 +220,7 @@ var (
// TestGetValidServerConfig - test for getValidServerConfig.
func TestGetValidServerConfig(t *testing.T) {
var c1, c2 serverConfigV13
var c1, c2 serverConfig
err := json.Unmarshal(config1, &c1)
if err != nil {
t.Fatalf("json unmarshal of %s failed: %v", string(config1), err)
@ -233,7 +233,7 @@ func TestGetValidServerConfig(t *testing.T) {
// Valid config.
noErrs := []error{nil, nil, nil, nil}
serverConfigs := []serverConfigV13{c1, c2, c1, c1}
serverConfigs := []serverConfig{c1, c2, c1, c1}
validConfig, err := getValidServerConfig(serverConfigs, noErrs)
if err != nil {
t.Errorf("Expected a valid config but received %v instead", err)
@ -244,7 +244,7 @@ func TestGetValidServerConfig(t *testing.T) {
}
// Invalid config - no quorum.
serverConfigs = []serverConfigV13{c1, c2, c2, c1}
serverConfigs = []serverConfig{c1, c2, c2, c1}
_, err = getValidServerConfig(serverConfigs, noErrs)
if err != errXLWriteQuorum {
t.Errorf("Expected to fail due to lack of quorum but received %v", err)
@ -252,7 +252,7 @@ func TestGetValidServerConfig(t *testing.T) {
// All errors
allErrs := []error{errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound}
serverConfigs = []serverConfigV13{{}, {}, {}, {}}
serverConfigs = []serverConfig{{}, {}, {}, {}}
_, err = getValidServerConfig(serverConfigs, allErrs)
if err != errXLWriteQuorum {
t.Errorf("Expected to fail due to lack of quorum but received %v", err)

View file

@ -50,6 +50,10 @@ func (br *browserPeerAPIHandlers) SetAuthPeer(args SetAuthPeerArgs, reply *AuthR
return fmt.Errorf("Invalid credential passed")
}
// Acquire lock before updating global configuration.
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
// Update credentials in memory
prevCred := globalServerConfig.SetCredential(args.Creds)

View file

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"reflect"
"sync"
"github.com/minio/minio/pkg/auth"
@ -48,34 +49,22 @@ var (
// GetVersion get current config version.
func (s *serverConfig) GetVersion() string {
s.RLock()
defer s.RUnlock()
return s.Version
}
// SetRegion set a new region.
func (s *serverConfig) SetRegion(region string) {
s.Lock()
defer s.Unlock()
// Save new region.
s.Region = region
}
// GetRegion get current region.
func (s *serverConfig) GetRegion() string {
s.RLock()
defer s.RUnlock()
return s.Region
}
// SetCredential sets new credential and returns the previous credential.
func (s *serverConfig) SetCredential(creds auth.Credentials) (prevCred auth.Credentials) {
s.Lock()
defer s.Unlock()
// Save previous credential.
prevCred = s.Credential
@ -88,25 +77,16 @@ func (s *serverConfig) SetCredential(creds auth.Credentials) (prevCred auth.Cred
// GetCredentials get current credentials.
func (s *serverConfig) GetCredential() auth.Credentials {
s.RLock()
defer s.RUnlock()
return s.Credential
}
// SetBrowser set if browser is enabled.
func (s *serverConfig) SetBrowser(b bool) {
s.Lock()
defer s.Unlock()
// Set the new value.
s.Browser = BrowserFlag(b)
}
func (s *serverConfig) SetStorageClass(standardClass, rrsClass storageClass) {
s.Lock()
defer s.Unlock()
s.StorageClass.Standard = standardClass
s.StorageClass.RRS = rrsClass
}
@ -114,9 +94,6 @@ func (s *serverConfig) SetStorageClass(standardClass, rrsClass storageClass) {
// GetStorageClass reads storage class fields from current config, parses and validates it.
// It returns the standard and reduced redundancy storage class struct
func (s *serverConfig) GetStorageClass() (storageClass, storageClass) {
s.RLock()
defer s.RUnlock()
var err error
// Storage Class from config.json is already parsed and stored in s.StorageClass
// Now validate the storage class fields
@ -140,21 +117,59 @@ func (s *serverConfig) GetStorageClass() (storageClass, storageClass) {
// GetCredentials get current credentials.
func (s *serverConfig) GetBrowser() bool {
s.RLock()
defer s.RUnlock()
return bool(s.Browser)
}
// Save config.
func (s *serverConfig) Save() error {
s.RLock()
defer s.RUnlock()
// Save config file.
return quick.Save(getConfigFile(), s)
}
// Returns the string describing a difference with the given
// configuration object. If the given configuration object is
// identical, an empty string is returned.
func (s *serverConfig) ConfigDiff(t *serverConfig) string {
switch {
case t == nil:
return "Given configuration is empty"
case s.Credential != t.Credential:
return "Credential configuration differs"
case s.Region != t.Region:
return "Region configuration differs"
case s.Browser != t.Browser:
return "Browser configuration differs"
case s.Domain != t.Domain:
return "Domain configuration differs"
case s.StorageClass != t.StorageClass:
return "StorageClass configuration differs"
case !reflect.DeepEqual(s.Notify.AMQP, t.Notify.AMQP):
return "AMQP Notification configuration differs"
case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS):
return "NATS Notification configuration differs"
case !reflect.DeepEqual(s.Notify.ElasticSearch, t.Notify.ElasticSearch):
return "ElasticSearch Notification configuration differs"
case !reflect.DeepEqual(s.Notify.Redis, t.Notify.Redis):
return "Redis Notification configuration differs"
case !reflect.DeepEqual(s.Notify.PostgreSQL, t.Notify.PostgreSQL):
return "PostgreSQL Notification configuration differs"
case !reflect.DeepEqual(s.Notify.Kafka, t.Notify.Kafka):
return "Kafka Notification configuration differs"
case !reflect.DeepEqual(s.Notify.Webhook, t.Notify.Webhook):
return "Webhook Notification configuration differs"
case !reflect.DeepEqual(s.Notify.MySQL, t.Notify.MySQL):
return "MySQL Notification configuration differs"
case !reflect.DeepEqual(s.Notify.MQTT, t.Notify.MQTT):
return "MQTT Notification configuration differs"
case reflect.DeepEqual(s, t):
return ""
default:
// This case will not happen unless this comparison
// function has become stale.
return "Configuration differs"
}
}
func newServerConfig() *serverConfig {
srvCfg := &serverConfig{
Version: serverConfigVersion,
@ -162,7 +177,7 @@ func newServerConfig() *serverConfig {
Region: globalMinioDefaultRegion,
Browser: true,
StorageClass: storageClassConfig{},
Notify: &notifier{},
Notify: notifier{},
}
// Make sure to initialize notification configs.

View file

@ -23,6 +23,7 @@ import (
"reflect"
"testing"
"github.com/minio/minio/pkg/auth"
"github.com/tidwall/gjson"
)
@ -316,3 +317,92 @@ func TestValidateConfig(t *testing.T) {
}
}
func TestConfigDiff(t *testing.T) {
testCases := []struct {
s, t *serverConfig
diff string
}{
// 1
{&serverConfig{}, nil, "Given configuration is empty"},
// 2
{
&serverConfig{Credential: auth.Credentials{"u1", "p1"}},
&serverConfig{Credential: auth.Credentials{"u1", "p2"}},
"Credential configuration differs",
},
// 3
{&serverConfig{Region: "us-east-1"}, &serverConfig{Region: "us-west-1"}, "Region configuration differs"},
// 4
{&serverConfig{Browser: false}, &serverConfig{Browser: true}, "Browser configuration differs"},
// 5
{&serverConfig{Domain: "domain1"}, &serverConfig{Domain: "domain2"}, "Domain configuration differs"},
// 6
{
&serverConfig{StorageClass: storageClassConfig{storageClass{"1", 8}, storageClass{"2", 6}}},
&serverConfig{StorageClass: storageClassConfig{storageClass{"1", 8}, storageClass{"2", 4}}},
"StorageClass configuration differs",
},
// 7
{
&serverConfig{Notify: notifier{AMQP: map[string]amqpNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{AMQP: map[string]amqpNotify{"1": {Enable: false}}}},
"AMQP Notification configuration differs",
},
// 8
{
&serverConfig{Notify: notifier{NATS: map[string]natsNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{NATS: map[string]natsNotify{"1": {Enable: false}}}},
"NATS Notification configuration differs",
},
// 9
{
&serverConfig{Notify: notifier{ElasticSearch: map[string]elasticSearchNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{ElasticSearch: map[string]elasticSearchNotify{"1": {Enable: false}}}},
"ElasticSearch Notification configuration differs",
},
// 10
{
&serverConfig{Notify: notifier{Redis: map[string]redisNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Redis: map[string]redisNotify{"1": {Enable: false}}}},
"Redis Notification configuration differs",
},
// 11
{
&serverConfig{Notify: notifier{PostgreSQL: map[string]postgreSQLNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{PostgreSQL: map[string]postgreSQLNotify{"1": {Enable: false}}}},
"PostgreSQL Notification configuration differs",
},
// 12
{
&serverConfig{Notify: notifier{Kafka: map[string]kafkaNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Kafka: map[string]kafkaNotify{"1": {Enable: false}}}},
"Kafka Notification configuration differs",
},
// 13
{
&serverConfig{Notify: notifier{Webhook: map[string]webhookNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Webhook: map[string]webhookNotify{"1": {Enable: false}}}},
"Webhook Notification configuration differs",
},
// 14
{
&serverConfig{Notify: notifier{MySQL: map[string]mySQLNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{MySQL: map[string]mySQLNotify{"1": {Enable: false}}}},
"MySQL Notification configuration differs",
},
// 15
{
&serverConfig{Notify: notifier{MQTT: map[string]mqttNotify{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{MQTT: map[string]mqttNotify{"1": {Enable: false}}}},
"MQTT Notification configuration differs",
},
}
for i, testCase := range testCases {
got := testCase.s.ConfigDiff(testCase.t)
if got != testCase.diff {
t.Errorf("Test %d: got %s expected %s", i+1, got, testCase.diff)
}
}
}

View file

@ -1717,7 +1717,7 @@ func migrateV21ToV22() error {
// Copy over fields from V21 into V22 config struct
srvConfig := &serverConfigV22{
Notify: &notifier{},
Notify: notifier{},
}
srvConfig.Version = serverConfigVersion
srvConfig.Credential = cv21.Credential

View file

@ -547,9 +547,11 @@ type serverConfigV21 struct {
}
// serverConfigV22 is just like version '21' with added support
// for StorageClass
// for StorageClass.
//
// IMPORTANT NOTE: When updating this struct make sure that
// serverConfig.ConfigDiff() is updated as necessary.
type serverConfigV22 struct {
sync.RWMutex
Version string `json:"version"`
// S3 API configuration.
@ -562,5 +564,5 @@ type serverConfigV22 struct {
StorageClass storageClassConfig `json:"storageclass"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify notifier `json:"notify"`
}

View file

@ -18,12 +18,10 @@ package cmd
import (
"fmt"
"sync"
)
// Notifier represents collection of supported notification queues.
type notifier struct {
sync.RWMutex
AMQP amqpConfigs `json:"amqp"`
NATS natsConfigs `json:"nats"`
ElasticSearch elasticSearchConfigs `json:"elasticsearch"`
@ -33,7 +31,9 @@ type notifier struct {
Webhook webhookConfigs `json:"webhook"`
MySQL mySQLConfigs `json:"mysql"`
MQTT mqttConfigs `json:"mqtt"`
// Add new notification queues.
// Add new notification queues. IMPORTANT: When new queues are
// added, update `serverConfig.ConfigDiff()` to reflect the
// change.
}
type amqpConfigs map[string]amqpNotify
@ -239,163 +239,109 @@ func (n *notifier) Validate() error {
}
func (n *notifier) SetAMQPByID(accountID string, amqpn amqpNotify) {
n.Lock()
defer n.Unlock()
n.AMQP[accountID] = amqpn
}
func (n *notifier) GetAMQP() map[string]amqpNotify {
n.RLock()
defer n.RUnlock()
return n.AMQP.Clone()
}
func (n *notifier) GetAMQPByID(accountID string) amqpNotify {
n.RLock()
defer n.RUnlock()
return n.AMQP[accountID]
}
func (n *notifier) SetMQTTByID(accountID string, mqttn mqttNotify) {
n.Lock()
defer n.Unlock()
n.MQTT[accountID] = mqttn
}
func (n *notifier) GetMQTT() map[string]mqttNotify {
n.RLock()
defer n.RUnlock()
return n.MQTT.Clone()
}
func (n *notifier) GetMQTTByID(accountID string) mqttNotify {
n.RLock()
defer n.RUnlock()
return n.MQTT[accountID]
}
func (n *notifier) SetNATSByID(accountID string, natsn natsNotify) {
n.Lock()
defer n.Unlock()
n.NATS[accountID] = natsn
}
func (n *notifier) GetNATS() map[string]natsNotify {
n.RLock()
defer n.RUnlock()
return n.NATS.Clone()
}
func (n *notifier) GetNATSByID(accountID string) natsNotify {
n.RLock()
defer n.RUnlock()
return n.NATS[accountID]
}
func (n *notifier) SetElasticSearchByID(accountID string, es elasticSearchNotify) {
n.Lock()
defer n.Unlock()
n.ElasticSearch[accountID] = es
}
func (n *notifier) GetElasticSearchByID(accountID string) elasticSearchNotify {
n.RLock()
defer n.RUnlock()
return n.ElasticSearch[accountID]
}
func (n *notifier) GetElasticSearch() map[string]elasticSearchNotify {
n.RLock()
defer n.RUnlock()
return n.ElasticSearch.Clone()
}
func (n *notifier) SetRedisByID(accountID string, r redisNotify) {
n.Lock()
defer n.Unlock()
n.Redis[accountID] = r
}
func (n *notifier) GetRedis() map[string]redisNotify {
n.RLock()
defer n.RUnlock()
return n.Redis.Clone()
}
func (n *notifier) GetRedisByID(accountID string) redisNotify {
n.RLock()
defer n.RUnlock()
return n.Redis[accountID]
}
func (n *notifier) GetWebhook() map[string]webhookNotify {
n.RLock()
defer n.RUnlock()
return n.Webhook.Clone()
}
func (n *notifier) GetWebhookByID(accountID string) webhookNotify {
n.RLock()
defer n.RUnlock()
return n.Webhook[accountID]
}
func (n *notifier) SetWebhookByID(accountID string, pgn webhookNotify) {
n.Lock()
defer n.Unlock()
n.Webhook[accountID] = pgn
}
func (n *notifier) SetPostgreSQLByID(accountID string, pgn postgreSQLNotify) {
n.Lock()
defer n.Unlock()
n.PostgreSQL[accountID] = pgn
}
func (n *notifier) GetPostgreSQL() map[string]postgreSQLNotify {
n.RLock()
defer n.RUnlock()
return n.PostgreSQL.Clone()
}
func (n *notifier) GetPostgreSQLByID(accountID string) postgreSQLNotify {
n.RLock()
defer n.RUnlock()
return n.PostgreSQL[accountID]
}
func (n *notifier) SetMySQLByID(accountID string, pgn mySQLNotify) {
n.Lock()
defer n.Unlock()
n.MySQL[accountID] = pgn
}
func (n *notifier) GetMySQL() map[string]mySQLNotify {
n.RLock()
defer n.RUnlock()
return n.MySQL.Clone()
}
func (n *notifier) GetMySQLByID(accountID string) mySQLNotify {
n.RLock()
defer n.RUnlock()
return n.MySQL[accountID]
}
func (n *notifier) SetKafkaByID(accountID string, kn kafkaNotify) {
n.Lock()
defer n.Unlock()
n.Kafka[accountID] = kn
}
func (n *notifier) GetKafka() map[string]kafkaNotify {
n.RLock()
defer n.RUnlock()
return n.Kafka.Clone()
}
func (n *notifier) GetKafkaByID(accountID string) kafkaNotify {
n.RLock()
defer n.RUnlock()
return n.Kafka[accountID]
}

View file

@ -423,6 +423,10 @@ func (web *webAPIHandlers) SetAuth(r *http.Request, args *SetAuthArgs, reply *Se
return toJSONError(err)
}
// Acquire lock before updating global configuration.
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
// Notify all other Minio peers to update credentials
errsMap := updateCredsOnPeers(creds)