/* * Minio Cloud Storage, (C) 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "context" "fmt" "path" "sort" "time" "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" xrpc "github.com/minio/minio/cmd/rpc" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" ) const peerServiceName = "Peer" const peerServiceSubPath = "/peer/remote" var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath) // peerRPCReceiver - Peer RPC receiver for peer RPC server. type peerRPCReceiver struct{} // DeleteBucketArgs - delete bucket RPC arguments. type DeleteBucketArgs struct { AuthArgs BucketName string } // DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object. func (receiver *peerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } globalNotificationSys.RemoveNotification(args.BucketName) globalPolicySys.Remove(args.BucketName) return nil } // SetBucketPolicyArgs - set bucket policy RPC arguments. type SetBucketPolicyArgs struct { AuthArgs BucketName string Policy policy.Policy } // SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys. func (receiver *peerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } globalPolicySys.Set(args.BucketName, args.Policy) return nil } // RemoveBucketPolicyArgs - delete bucket policy RPC arguments. type RemoveBucketPolicyArgs struct { AuthArgs BucketName string } // RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys. func (receiver *peerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } globalPolicySys.Remove(args.BucketName) return nil } // PutBucketNotificationArgs - put bucket notification RPC arguments. type PutBucketNotificationArgs struct { AuthArgs BucketName string RulesMap event.RulesMap } // PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object. func (receiver *peerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap) return nil } // ListenBucketNotificationArgs - listen bucket notification RPC arguments. type ListenBucketNotificationArgs struct { AuthArgs `json:"-"` BucketName string `json:"-"` EventNames []event.Name `json:"eventNames"` Pattern string `json:"pattern"` TargetID event.TargetID `json:"targetId"` Addr xnet.Host `json:"addr"` } // ListenBucketNotification - handles listen bucket notification RPC call. // It creates PeerRPCClient target which pushes requested events to target in remote peer. func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr) if rpcClient == nil { return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr) } target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient) rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil { reqInfo := &logger.ReqInfo{BucketName: target.bucketName} reqInfo.AppendTags("target", target.id.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) return err } return nil } // RemoteTargetExistArgs - remote target ID exist RPC arguments. type RemoteTargetExistArgs struct { AuthArgs BucketName string TargetID event.TargetID } // RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not. func (receiver *peerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *bool) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } *reply = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID) return nil } // SendEventArgs - send event RPC arguments. type SendEventArgs struct { AuthArgs Event event.Event TargetID event.TargetID BucketName string } // SendEvent - handles send event RPC call which sends given event to target by given target ID. func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } // Set default to true to keep the target. *reply = true errs := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID) for i := range errs { reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String()) reqInfo.AppendTags("targetName", args.TargetID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, errs[i].Err) *reply = false // send failed i.e. do not keep the target. return errs[i].Err } return nil } // ReloadFormatArgs - send event RPC arguments. type ReloadFormatArgs struct { AuthArgs DryRun bool } // ReloadFormat - handles reload format RPC call, reloads latest `format.json` func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } return objAPI.ReloadFormat(context.Background(), args.DryRun) } // LoadUsers - handles load users RPC call. func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } return globalIAMSys.Load(objAPI) } // LoadCredentials - handles load credentials RPC call. func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } // Construct path to config.json for the given bucket. configFile := path.Join(bucketConfigPrefix, minioConfigFile) transactionConfigFile := configFile + ".transaction" // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket // and configFile, take a transaction lock to avoid race. objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) if err := objLock.GetRLock(globalOperationTimeout); err != nil { return err } objLock.RUnlock() return globalConfigSys.Load(newObjectLayerFn()) } // DrivePerfInfo - handles drive performance RPC call func (receiver *peerRPCReceiver) DrivePerfInfo(args *AuthArgs, reply *ServerDrivesPerfInfo) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } *reply = localEndpointsDrivePerf(globalEndpoints) return nil } // CPULoadInfo - handles cpu performance RPC call func (receiver *peerRPCReceiver) CPULoadInfo(args *AuthArgs, reply *ServerCPULoadInfo) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } *reply = localEndpointsCPULoad(globalEndpoints) return nil } // MemUsageInfo - handles mem utilization RPC call func (receiver *peerRPCReceiver) MemUsageInfo(args *AuthArgs, reply *ServerMemUsageInfo) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } *reply = localEndpointsMemUsage(globalEndpoints) return nil } // uptimes - used to sort uptimes in chronological order. type uptimes []time.Duration func (ts uptimes) Len() int { return len(ts) } func (ts uptimes) Less(i, j int) bool { return ts[i] < ts[j] } func (ts uptimes) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } // getPeerUptimes - returns the uptime. func getPeerUptimes(serverInfo []ServerInfo) time.Duration { // In a single node Erasure or FS backend setup the uptime of // the setup is the uptime of the single minio server // instance. if !globalIsDistXL { return UTCNow().Sub(globalBootTime) } var times []time.Duration for _, info := range serverInfo { if info.Error != "" { continue } times = append(times, info.Data.Properties.Uptime) } // Sort uptimes in chronological order. sort.Sort(uptimes(times)) // Return the latest time as the uptime. return times[0] } // StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call type StartProfilingArgs struct { AuthArgs Profiler string } // StartProfiling - profiling server receiver. func (receiver *peerRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error { if globalProfiler != nil { globalProfiler.Stop() } var err error globalProfiler, err = startProfiler(args.Profiler, "") return err } // DownloadProfilingData - download profiling data. func (receiver *peerRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) error { var err error *reply, err = getProfileData() return err } var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") // SignalServiceArgs - send event RPC arguments. type SignalServiceArgs struct { AuthArgs Sig serviceSignal } // SignalService - signal service receiver. func (receiver *peerRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error { switch args.Sig { case serviceRestart, serviceStop: globalServiceSignalCh <- args.Sig default: return errUnsupportedSignal } return nil } // ServerInfo - server info receiver. func (receiver *peerRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) error { if globalBootTime.IsZero() { return errServerNotInitialized } // Build storage info objLayer := newObjectLayerFn() if objLayer == nil { return errServerNotInitialized } // Server info data. *reply = ServerInfoData{ StorageInfo: objLayer.StorageInfo(context.Background()), ConnStats: globalConnStats.toServerConnStats(), HTTPStats: globalHTTPStats.toServerHTTPStats(), Properties: ServerProperties{ Uptime: UTCNow().Sub(globalBootTime), Version: Version, CommitID: CommitID, SQSARN: globalNotificationSys.GetARNList(), Region: globalServerConfig.GetRegion(), }, } return nil } // NewPeerRPCServer - returns new peer RPC server. func NewPeerRPCServer() (*xrpc.Server, error) { rpcServer := xrpc.NewServer() if err := rpcServer.RegisterName(peerServiceName, &peerRPCReceiver{}); err != nil { return nil, err } return rpcServer, nil } // registerPeerRPCRouter - creates and registers Peer RPC server and its router. func registerPeerRPCRouter(router *mux.Router) { rpcServer, err := NewPeerRPCServer() logger.FatalIf(err, "Unable to initialize peer RPC Server") subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter() subrouter.Path(peerServiceSubPath).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP)) }