minio/cmd/server-main.go
Klaus Post 2294e53a0b
Don't retain context in locker (#10515)
Use the context for internal timeouts, but disconnect it from outgoing 
calls so we always receive the results and cancel it remotely.
2020-11-04 08:25:42 -08:00

533 lines
17 KiB
Go

/*
* MinIO Cloud Storage, (C) 2015-2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math/rand"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/minio/cli"
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env"
)
// ServerFlags - server command specific flags
var ServerFlags = []cli.Flag{
cli.StringFlag{
Name: "address",
Value: ":" + GlobalMinioDefaultPort,
Usage: "bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname",
},
}
var serverCmd = cli.Command{
Name: "server",
Usage: "start object storage server",
Flags: append(ServerFlags, GlobalFlags...),
Action: serverMain,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR1 [DIR2..]
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64}
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64} DIR{65...128}
DIR:
DIR points to a directory on a filesystem. When you want to combine
multiple drives into a single large system, pass one directory per
filesystem separated by space. You may also use a '...' convention
to abbreviate the directory arguments. Remote directories in a
distributed setup are encoded as HTTP(s) URIs.
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
EXAMPLES:
1. Start minio server on "/home/shared" directory.
{{.Prompt}} {{.HelpName}} /home/shared
2. Start single node server with 64 local drives "/mnt/data1" to "/mnt/data64".
{{.Prompt}} {{.HelpName}} /mnt/data{1...64}
3. Start distributed minio server on an 32 node setup with 32 drives each, run following command on all the nodes
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}minio
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export{1...32}
4. Start distributed minio server in an expanded setup, run the following command on all the nodes
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}minio
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
{{.Prompt}} {{.HelpName}} http://node{1...16}.example.com/mnt/export{1...32} \
http://node{17...64}.example.com/mnt/export{1...64}
`,
}
func serverCmdArgs(ctx *cli.Context) []string {
v := env.Get(config.EnvArgs, "")
if v == "" {
// Fall back to older ENV MINIO_ENDPOINTS
v = env.Get(config.EnvEndpoints, "")
}
if v == "" {
if !ctx.Args().Present() || ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, ctx.Command.Name, 1)
}
return ctx.Args()
}
return strings.Fields(v)
}
func serverHandleCmdArgs(ctx *cli.Context) {
// Handle common command args.
handleCommonCmdArgs(ctx)
logger.FatalIf(CheckLocalServerAddr(globalCLIContext.Addr), "Unable to validate passed arguments")
var err error
var setupType SetupType
// Check and load TLS certificates.
globalPublicCerts, globalTLSCerts, globalIsSSL, err = getTLSConfig()
logger.FatalIf(err, "Unable to load the TLS configuration")
// Check and load Root CAs.
globalRootCAs, err = certs.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
// Add the global public crts as part of global root CAs
for _, publicCrt := range globalPublicCerts {
globalRootCAs.AddCert(publicCrt)
}
// Register root CAs for remote ENVs
env.RegisterGlobalCAs(globalRootCAs)
globalMinioAddr = globalCLIContext.Addr
globalMinioHost, globalMinioPort = mustSplitHostPort(globalMinioAddr)
globalEndpoints, setupType, err = createServerEndpoints(globalCLIContext.Addr, serverCmdArgs(ctx)...)
logger.FatalIf(err, "Invalid command line arguments")
globalProxyEndpoints = GetProxyEndpoints(globalEndpoints)
globalInternodeTransport = newInternodeHTTPTransport(&tls.Config{
RootCAs: globalRootCAs,
}, rest.DefaultTimeout)()
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
// to IPv6 address ie minio will start listening on IPv6 address whereas another
// (non-)minio process is listening on IPv4 of given port.
// To avoid this error situation we check for port availability.
logger.FatalIf(checkPortAvailability(globalMinioHost, globalMinioPort), "Unable to start the server")
globalIsErasure = (setupType == ErasureSetupType)
globalIsDistErasure = (setupType == DistErasureSetupType)
if globalIsDistErasure {
globalIsErasure = true
}
}
func serverHandleEnvVars() {
// Handle common environment variables.
handleCommonEnvVars()
}
func newAllSubsystems() {
// Create new notification system and initialize notification targets
globalNotificationSys = NewNotificationSys(globalEndpoints)
// Create new bucket metadata system.
globalBucketMetadataSys = NewBucketMetadataSys()
// Create the bucket bandwidth monitor
globalBucketMonitor = bandwidth.NewMonitor(GlobalServiceDoneCh)
// Create a new config system.
globalConfigSys = NewConfigSys()
// Create new IAM system.
globalIAMSys = NewIAMSys()
// Create new policy system.
globalPolicySys = NewPolicySys()
// Create new lifecycle system.
globalLifecycleSys = NewLifecycleSys()
// Create new bucket encryption subsystem
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
// Create new bucket object lock subsystem
globalBucketObjectLockSys = NewBucketObjectLockSys()
// Create new bucket quota subsystem
globalBucketQuotaSys = NewBucketQuotaSys()
// Create new bucket versioning subsystem
globalBucketVersioningSys = NewBucketVersioningSys()
// Create new bucket replication subsytem
globalBucketTargetSys = NewBucketTargetSys()
}
func initServer(ctx context.Context, newObject ObjectLayer) error {
// Once the config is fully loaded, initialize the new object layer.
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
// Make sure to hold lock for entire migration to avoid
// such that only one server should migrate the entire config
// at a given time, this big transaction lock ensures this
// appropriately. This is also true for rotation of encrypted
// content.
txnLk := newObject.NewNSLock(minioMetaBucket, minioConfigPrefix+"/transaction.lock")
// **** WARNING ****
// Migrating to encrypted backend should happen before initialization of any
// sub-systems, make sure that we do not move the above codeblock elsewhere.
// Initializing sub-systems needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
// - Write quorum not met when upgrading configuration
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
var err error
for {
select {
case <-ctx.Done():
// Retry was canceled successfully.
return fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
default:
}
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err = txnLk.GetLock(ctx, lockTimeout); err != nil {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue
}
// These messages only meant primarily for distributed setup, so only log during distributed setup.
if globalIsDistErasure {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. lock acquired")
}
// Migrate all backend configs to encrypted backend configs, optionally
// handles rotating keys for encryption, if there is any retriable failure
// that shall be retried if there is an error.
if err = handleEncryptedConfigBackend(newObject); err == nil {
// Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away
if err = initAllSubsystems(ctx, newObject); err == nil {
txnLk.Unlock()
// All successful return.
if globalIsDistErasure {
// These messages only meant primarily for distributed setup, so only log during distributed setup.
logger.Info("All MinIO sub-systems initialized successfully")
}
return nil
}
}
txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
// One of these retriable errors shall be retried.
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue
}
// Any other unhandled return right here.
return fmt.Errorf("Unable to initialize sub-systems: %w", err)
}
}
func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// %w is used by all error returns here to make sure
// we wrap the underlying error, make sure when you
// are modifying this code that you do so, if and when
// you want to add extra context to your error. This
// ensures top level retry works accordingly.
// List buckets to heal, and be re-used for loading configs.
var buckets []BucketInfo
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
if globalIsErasure {
buckets, err = newObject.ListBucketsHeal(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err)
}
for _, bucket := range buckets {
if err = newObject.MakeBucketWithLocation(ctx, bucket.Name, BucketOptions{}); err != nil {
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
// Return the error upwards for the caller to retry.
return fmt.Errorf("Unable to heal bucket: %w", err)
}
if _, ok := err.(BucketExists); !ok {
// ignore any other error and log for investigation.
logger.LogIf(ctx, err)
continue
}
// Bucket already exists, nothing that needs to be done.
}
}
} else {
buckets, err = newObject.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets: %w", err)
}
}
// Initialize config system.
if err = globalConfigSys.Init(newObject); err != nil {
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
return fmt.Errorf("Unable to initialize config system: %w", err)
}
// Any other config errors we simply print a message and proceed forward.
logger.LogIf(ctx, fmt.Errorf("Unable to initialize config, some features may be missing %w", err))
}
// Initialize IAM store
globalIAMSys.InitStore(newObject)
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
// Background this operation.
go initFederatorBackend(buckets, newObject)
}
// Initialize bucket metadata sub-system.
globalBucketMetadataSys.Init(ctx, buckets, newObject)
// Initialize notification system.
globalNotificationSys.Init(ctx, buckets, newObject)
// Initialize bucket targets sub-system.
globalBucketTargetSys.Init(ctx, buckets, newObject)
return nil
}
// serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) {
defer globalDNSCache.Stop()
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
go handleSignals()
setDefaultProfilerRates()
// Initialize globalConsoleSys system
globalConsoleSys = NewConsoleLogger(GlobalContext)
logger.AddTarget(globalConsoleSys)
// Handle all server command args.
serverHandleCmdArgs(ctx)
// Handle all server environment vars.
serverHandleEnvVars()
// Set node name, only set for distributed setup.
globalConsoleSys.SetNodeName(globalEndpoints)
// Initialize all help
initHelp()
// Initialize all sub-systems
newAllSubsystems()
globalMinioEndpoint = func() string {
host := globalMinioHost
if host == "" {
host = sortIPs(localIP4.ToSlice())[0]
}
return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort))
}()
// Is distributed setup, error out if no certificates are found for HTTPS endpoints.
if globalIsDistErasure {
if globalEndpoints.HTTPS() && !globalIsSSL {
logger.Fatal(config.ErrNoCertsAndHTTPSEndpoints(nil), "Unable to start the server")
}
if !globalEndpoints.HTTPS() && globalIsSSL {
logger.Fatal(config.ErrCertsAndHTTPEndpoints(nil), "Unable to start the server")
}
}
if !globalCLIContext.Quiet && globalInplaceUpdateDisabled {
// Check for new updates from dl.min.io.
checkUpdate(getMinioMode())
}
if !globalActiveCred.IsValid() && globalIsDistErasure {
logger.Fatal(config.ErrEnvCredentialsMissingDistributed(nil),
"Unable to initialize the server in distributed mode")
}
// Set system resources to maximum.
setMaxResources()
if globalIsErasure {
// New global heal state
globalAllHealState = newHealState()
globalBackgroundHealState = newHealState()
globalReplicationState = newReplicationState()
}
// Configure server.
handler, err := configureServerHandler(globalEndpoints)
if err != nil {
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
}
var getCert certs.GetCertificateFunc
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}
go func() {
globalHTTPServerErrorCh <- httpServer.Start()
}()
globalObjLayerMutex.Lock()
globalHTTPServer = httpServer
globalObjLayerMutex.Unlock()
if globalIsDistErasure && globalEndpoints.FirstLocal() {
for {
// Additionally in distributed setup, validate the setup and configuration.
err := verifyServerSystemConfig(GlobalContext, globalEndpoints)
if err == nil || errors.Is(err, context.Canceled) {
break
}
logger.LogIf(GlobalContext, err, "Unable to initialize distributed setup, retrying.. after 5 seconds")
select {
case <-GlobalContext.Done():
return
case <-time.After(500 * time.Millisecond):
}
}
}
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
if err != nil {
logFatalErrs(err, Endpoint{}, true)
}
logger.SetDeploymentID(globalDeploymentID)
go initDataCrawler(GlobalContext, newObject)
// Enable background operations for erasure coding
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
}
if err = initServer(GlobalContext, newObject); err != nil {
var cerr config.Err
// For any config error, we don't need to drop into safe-mode
// instead its a user error and should be fixed by user.
if errors.As(err, &cerr) {
logger.FatalIf(err, "Unable to initialize the server")
}
// If context was canceled
if errors.Is(err, context.Canceled) {
logger.FatalIf(err, "Server startup canceled upon user request")
}
}
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
}
// Initialize users credentials and policies in background right after config has initialized.
go globalIAMSys.Init(GlobalContext, newObject)
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
printStartupMessage(getAPIEndpoints(), err)
if globalActiveCred.Equal(auth.DefaultCredentials) {
msg := fmt.Sprintf("Detected default credentials '%s', please change the credentials immediately using 'MINIO_ACCESS_KEY' and 'MINIO_SECRET_KEY'", globalActiveCred)
logger.StartupMessage(color.RedBold(msg))
}
<-globalOSSignalCh
}
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
func newObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
if endpointServerSets.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointServerSets[0].Endpoints[0].Path)
}
return newErasureServerSets(ctx, endpointServerSets)
}