diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go new file mode 100644 index 000000000..289ef65ff --- /dev/null +++ b/cmd/bootstrap-peer-server.go @@ -0,0 +1,257 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "runtime" + "sync/atomic" + "time" + + "github.com/gorilla/mux" + "github.com/minio/minio-go/pkg/set" + xhttp "github.com/minio/minio/cmd/http" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/rest" +) + +const ( + bootstrapRESTVersion = "v1" + bootstrapRESTVersionPrefix = SlashSeparator + bootstrapRESTVersion + bootstrapRESTPrefix = minioReservedBucketPath + "/bootstrap" + bootstrapRESTPath = bootstrapRESTPrefix + bootstrapRESTVersionPrefix +) + +const ( + bootstrapRESTMethodVerify = "/verify" +) + +// To abstract a node over network. +type bootstrapRESTServer struct{} + +// ServerSystemConfig - captures information about server configuration. +type ServerSystemConfig struct { + MinioPlatform string + MinioRuntime string + MinioEndpoints EndpointZones +} + +// Diff - returns error on first difference found in two configs. +func (s1 ServerSystemConfig) Diff(s2 ServerSystemConfig) error { + if s1.MinioPlatform != s2.MinioPlatform { + return fmt.Errorf("Expected platform '%s', found to be running '%s'", + s1.MinioPlatform, s2.MinioPlatform) + } + if s1.MinioEndpoints.Nodes() != s2.MinioEndpoints.Nodes() { + return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.Nodes(), + s2.MinioEndpoints.Nodes()) + } + + for i, ep := range s1.MinioEndpoints { + if ep.SetCount != s2.MinioEndpoints[i].SetCount { + return fmt.Errorf("Expected set count %d, seen %d", ep.SetCount, + s2.MinioEndpoints[i].SetCount) + } + if ep.DrivesPerSet != s2.MinioEndpoints[i].DrivesPerSet { + return fmt.Errorf("Expected drives pet set %d, seen %d", ep.DrivesPerSet, + s2.MinioEndpoints[i].DrivesPerSet) + } + for j, endpoint := range ep.Endpoints { + if endpoint.String() != s2.MinioEndpoints[i].Endpoints[j].String() { + return fmt.Errorf("Expected endpoint %s, seen %s", endpoint, + s2.MinioEndpoints[i].Endpoints[j]) + } + } + + } + return nil +} + +func getServerSystemCfg() ServerSystemConfig { + return ServerSystemConfig{ + MinioPlatform: fmt.Sprintf("OS: %s | Arch: %s", runtime.GOOS, runtime.GOARCH), + MinioEndpoints: globalEndpoints, + } +} + +func (b *bootstrapRESTServer) VerifyHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "VerifyHandler") + cfg := getServerSystemCfg() + logger.LogIf(ctx, json.NewEncoder(w).Encode(&cfg)) + w.(http.Flusher).Flush() +} + +// registerBootstrapRESTHandlers - register bootstrap rest router. +func registerBootstrapRESTHandlers(router *mux.Router) { + server := &bootstrapRESTServer{} + subrouter := router.PathPrefix(bootstrapRESTPrefix).Subrouter() + + subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodVerify).HandlerFunc( + httpTraceHdrs(server.VerifyHandler)) +} + +// client to talk to bootstrap Nodes. +type bootstrapRESTClient struct { + endpoint Endpoint + restClient *rest.Client + connected int32 +} + +// Reconnect to a bootstrap rest server.k +func (client *bootstrapRESTClient) reConnect() { + atomic.StoreInt32(&client.connected, 1) +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *bootstrapRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + return client.callWithContext(context.Background(), method, values, body, length) +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + if !client.IsOnline() { + client.reConnect() + } + + if values == nil { + values = make(url.Values) + } + + respBody, err = client.restClient.CallWithContext(ctx, method, values, body, length) + if err == nil { + return respBody, nil + } + + if isNetworkError(err) { + atomic.StoreInt32(&client.connected, 0) + } + + return nil, err +} + +// Stringer provides a canonicalized representation of node. +func (client *bootstrapRESTClient) String() string { + return client.endpoint.String() +} + +// IsOnline - returns whether RPC client failed to connect or not. +func (client *bootstrapRESTClient) IsOnline() bool { + return atomic.LoadInt32(&client.connected) == 1 +} + +// Close - marks the client as closed. +func (client *bootstrapRESTClient) Close() error { + atomic.StoreInt32(&client.connected, 0) + client.restClient.Close() + return nil +} + +// Verify - fetches system server config. +func (client *bootstrapRESTClient) Verify(srcCfg ServerSystemConfig) (err error) { + if newObjectLayerFn() != nil { + return nil + } + respBody, err := client.call(bootstrapRESTMethodVerify, nil, nil, -1) + if err != nil { + return + } + defer xhttp.DrainBody(respBody) + recvCfg := ServerSystemConfig{} + if err = json.NewDecoder(respBody).Decode(&recvCfg); err != nil { + return err + } + return srcCfg.Diff(recvCfg) +} + +func verifyServerSystemConfig(endpointZones EndpointZones) error { + srcCfg := getServerSystemCfg() + clnts := newBootstrapRESTClients(endpointZones) + var onlineServers int + for onlineServers < len(clnts)/2 { + for _, clnt := range clnts { + if err := clnt.Verify(srcCfg); err != nil { + if isNetworkError(err) { + continue + } + return fmt.Errorf("%s as has incorrect configuration: %w", clnt.String(), err) + } + onlineServers++ + } + // Sleep for a while - so that we don't go into + // 100% CPU when half the endpoints are offline. + time.Sleep(500 * time.Millisecond) + } + return nil +} + +func newBootstrapRESTClients(endpointZones EndpointZones) []*bootstrapRESTClient { + seenHosts := set.NewStringSet() + var clnts []*bootstrapRESTClient + for _, ep := range endpointZones { + for _, endpoint := range ep.Endpoints { + if seenHosts.Contains(endpoint.Host) { + continue + } + seenHosts.Add(endpoint.Host) + + // Only proceed for remote endpoints. + if !endpoint.IsLocal { + clnt, err := newBootstrapRESTClient(endpoint) + if err != nil { + continue + } + clnts = append(clnts, clnt) + } + } + } + return clnts +} + +// Returns a new bootstrap client. +func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) { + serverURL := &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + Path: bootstrapRESTPath, + } + + var tlsConfig *tls.Config + if globalIsSSL { + tlsConfig = &tls.Config{ + ServerName: endpoint.Hostname(), + RootCAs: globalRootCAs, + } + } + + trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout) + restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) + if err != nil { + return nil, err + } + + return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}, nil +} diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 9258281e2..999d7d577 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -196,8 +196,8 @@ type ZoneEndpoints struct { // EndpointZones - list of list of endpoints type EndpointZones []ZoneEndpoints -// First returns true if the first endpoint is local. -func (l EndpointZones) First() bool { +// FirstLocal returns true if the first endpoint is local. +func (l EndpointZones) FirstLocal() bool { return l[0].Endpoints[0].IsLocal } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 50a61c5e5..af340867d 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -149,8 +149,4 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { globalLockServers[endpoint] = lockServer.ll } } - - // If none of the routes match add default error handler routes - router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) - router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) } diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 0e417c977..06dcf1223 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1086,11 +1086,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketObjectLockConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) - - // If none of the routes match add default error handler routes - router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) - router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) } diff --git a/cmd/routers.go b/cmd/routers.go index 491912ee3..0f004a759 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -24,15 +24,17 @@ import ( // Composed function registering routers for only distributed XL setup. func registerDistXLRouters(router *mux.Router, endpointZones EndpointZones) { - // Register storage rpc router only if its a distributed setup. + // Register storage REST router only if its a distributed setup. registerStorageRESTHandlers(router, endpointZones) // Register peer REST router only if its a distributed setup. registerPeerRESTHandlers(router) - // Register distributed namespace lock. - registerLockRESTHandlers(router, endpointZones) + // Register bootstrap REST router for distributed setups. + registerBootstrapRESTHandlers(router) + // Register distributed namespace lock routers. + registerLockRESTHandlers(router, endpointZones) } // List of some generic handlers which are applied for all incoming requests. @@ -112,6 +114,10 @@ func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) { // but don't allow SSE-KMS. registerAPIRouter(router, true, false) + // If none of the routes match add default error handler routes + router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) + router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) + // Register rest of the handlers. return registerHandlers(router, globalHandlers...), nil } diff --git a/cmd/server-main.go b/cmd/server-main.go index 9b239821f..03775cb15 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -365,6 +365,13 @@ func serverMain(ctx *cli.Context) { globalHTTPServerErrorCh <- globalHTTPServer.Start() }() + if globalIsDistXL && globalEndpoints.FirstLocal() { + // Additionally in distributed setup validate + if err := verifyServerSystemConfig(globalEndpoints); err != nil { + logger.Fatal(err, "Unable to initialize distributed setup") + } + } + newObject, err := newObjectLayer(globalEndpoints) logger.SetDeploymentID(globalDeploymentID) if err != nil { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 96eca378c..80a4a5881 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -612,8 +612,4 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTBitrotHash, storageRESTLength, storageRESTShardSize)...) } } - - // If none of the routes match add default error handler routes - router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) - router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) } diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index a0e92e23e..e117e1aca 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -60,7 +60,7 @@ func newXLZones(endpointZones EndpointZones) (ObjectLayer, error) { z = &xlZones{zones: make([]*xlSets, len(endpointZones))} ) for i, ep := range endpointZones { - formats[i], err = waitForFormatXL(endpointZones.First(), ep.Endpoints, + formats[i], err = waitForFormatXL(endpointZones.FirstLocal(), ep.Endpoints, ep.SetCount, ep.DrivesPerSet, deploymentID) if err != nil { return nil, err