Assume local endpoints appropriately in k8s deployments (#8375)

On Kubernetes/Docker setups DNS resolves inappropriately
sometimes where there are situations same endpoints with
multiple disks come online indicating either one of them
is local and some of them are not local. This situation
can never happen and its only a possibility in orchestrated
deployments with dynamic DNS. Following code ensures that we
treat if one of the endpoint says its local for a given host
it is true for all endpoints for the same host. Following code
ensures that this assumption is true and it works in all
scenarios and it is safe to assume for a given host.

This PR also adds validation such that we do not crash the
server if there are bugs in the endpoints list in dsync
initialization.

Thanks to Daniel Valdivia <hola@danielvaldivia.com> for
reproducing this, this fix is needed as part of the
https://github.com/minio/m3 project.
This commit is contained in:
Harshavardhana 2019-10-09 21:44:17 -07:00 committed by kannappanr
parent 42531db37e
commit 36e12a6038
4 changed files with 62 additions and 37 deletions

View file

@ -227,7 +227,7 @@ func (endpoints EndpointList) UpdateIsLocal() error {
keepAliveTicker := time.NewTicker(retryInterval * time.Second)
defer keepAliveTicker.Stop()
for {
// Break if the local endpoint is found already. Or all the endpoints are resolved.
// Break if the local endpoint is found already Or all the endpoints are resolved.
if foundLocal || (epsResolved == len(endpoints)) {
break
}
@ -240,13 +240,13 @@ func (endpoints EndpointList) UpdateIsLocal() error {
default:
for i, resolved := range resolvedList {
if resolved {
// Continue if host is already resolved.
continue
}
// return err if not Docker or Kubernetes
// We use IsDocker() method to check for Docker Swarm environment
// as there is no reliable way to clearly identify Swarm from
// Docker environment.
// We use IsDocker() to check for Docker environment
// We use IsKubernetes() to check for Kubernetes environment
isLocal, err := isLocalHost(endpoints[i].HostName)
if err != nil {
if !IsDocker() && !IsKubernetes() {
@ -256,8 +256,7 @@ func (endpoints EndpointList) UpdateIsLocal() error {
timeElapsed := time.Since(startTime)
// log error only if more than 1s elapsed
if timeElapsed > time.Second {
// log the message to console about the host not being
// resolveable.
// Log the message to console about the host not being resolveable.
reqInfo := (&logger.ReqInfo{}).AppendTags("host", endpoints[i].HostName)
reqInfo.AppendTags("elapsedTime", humanize.RelTime(startTime, startTime.Add(timeElapsed), "elapsed", ""))
ctx := logger.SetReqInfo(context.Background(), reqInfo)
@ -274,15 +273,33 @@ func (endpoints EndpointList) UpdateIsLocal() error {
}
// Wait for the tick, if the there exist a local endpoint in discovery.
// Non docker/kubernetes environment does not need to wait.
if !foundLocal && (IsDocker() && IsKubernetes()) {
// Non docker/kubernetes environment we do not need to wait.
if !foundLocal && (IsDocker() || IsKubernetes()) {
<-keepAliveTicker.C
}
}
}
// On Kubernetes/Docker setups DNS resolves inappropriately sometimes
// where there are situations same endpoints with multiple disks
// come online indicating either one of them is local and some
// of them are not local. This situation can never happen and
// its only a possibility in orchestrated deployments with dynamic
// DNS. Following code ensures that we treat if one of the endpoint
// says its local for a given host - it is true for all endpoints
// for the same host. Following code ensures that this assumption
// is true and it works in all scenarios and it is safe to assume
// for a given host.
endpointLocalMap := make(map[string]bool)
for _, ep := range endpoints {
if ep.IsLocal {
endpointLocalMap[ep.Host] = ep.IsLocal
}
}
for i := range endpoints {
endpoints[i].IsLocal = endpointLocalMap[endpoints[i].Host]
}
return nil
}
// NewEndpointList - returns new endpoint list based on input args.

View file

@ -350,27 +350,29 @@ func TestCreateEndpoints(t *testing.T) {
}
for i, testCase := range testCases {
serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...)
if err == nil {
if testCase.expectedErr != nil {
t.Fatalf("Test (%d) error: expected = %v, got = <nil>", i+1, testCase.expectedErr)
} else {
if serverAddr != testCase.expectedServerAddr {
t.Fatalf("Test (%d) serverAddr: expected = %v, got = %v", i+1, testCase.expectedServerAddr, serverAddr)
}
if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) {
t.Fatalf("Test (%d) endpoints: expected = %v, got = %v", i+1, testCase.expectedEndpoints, endpoints)
}
if setupType != testCase.expectedSetupType {
t.Fatalf("Test (%d) setupType: expected = %v, got = %v", i+1, testCase.expectedSetupType, setupType)
testCase := testCase
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...)
if err == nil {
if testCase.expectedErr != nil {
t.Fatalf("error: expected = %v, got = <nil>", testCase.expectedErr)
} else {
if serverAddr != testCase.expectedServerAddr {
t.Fatalf("serverAddr: expected = %v, got = %v", testCase.expectedServerAddr, serverAddr)
}
if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) {
t.Fatalf("endpoints: expected = %v, got = %v", testCase.expectedEndpoints, endpoints)
}
if setupType != testCase.expectedSetupType {
t.Fatalf("setupType: expected = %v, got = %v", testCase.expectedSetupType, setupType)
}
}
} else if testCase.expectedErr == nil {
t.Fatalf("error: expected = <nil>, got = %v", err)
} else if err.Error() != testCase.expectedErr.Error() {
t.Fatalf("error: expected = %v, got = %v", testCase.expectedErr, err)
}
} else if testCase.expectedErr == nil {
t.Fatalf("Test (%d) error: expected = <nil>, got = %v", i+1, err)
} else if err.Error() != testCase.expectedErr.Error() {
t.Fatalf("Test (%d) error: expected = %v, got = %v", i+1, testCase.expectedErr, err)
}
})
}
}

View file

@ -53,8 +53,9 @@ type RWLocker interface {
// Initialize distributed locking only in case of distributed setup.
// Returns lock clients and the node index for the current server.
func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) {
func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int, err error) {
myNode = -1
seenHosts := set.NewStringSet()
for _, endpoint := range endpoints {
if seenHosts.Contains(endpoint.Host) {
@ -66,26 +67,27 @@ func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int)
if endpoint.IsLocal {
myNode = len(clnts)
receiver := &lockRESTServer{
globalLockServer = &lockRESTServer{
ll: &localLocker{
serverAddr: endpoint.Host,
serviceEndpoint: lockServicePath,
lockMap: make(map[string][]lockRequesterInfo),
},
}
globalLockServer = receiver
locker = receiver.ll
locker = globalLockServer.ll
} else {
host, err := xnet.ParseHost(endpoint.Host)
logger.FatalIf(err, "Unable to parse Lock RPC Host")
var host *xnet.Host
host, err = xnet.ParseHost(endpoint.Host)
locker = newlockRESTClient(host)
}
clnts = append(clnts, locker)
}
return clnts, myNode
if myNode == -1 {
return clnts, myNode, errors.New("no endpoint pointing to the local machine is found")
}
return clnts, myNode, err
}
// newNSLock - return a new name space lock map.

View file

@ -276,7 +276,11 @@ func serverMain(ctx *cli.Context) {
// Set nodes for dsync for distributed setup.
if globalIsDistXL {
globalDsync, err = dsync.New(newDsyncNodes(globalEndpoints))
clnts, myNode, err := newDsyncNodes(globalEndpoints)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}
globalDsync, err = dsync.New(clnts, myNode)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}