Use REST api for inter node communication (#7205)

This commit is contained in:
kannappanr 2019-03-14 16:27:31 -07:00 committed by GitHub
parent facbd653ba
commit eb69c4f946
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1258 additions and 806 deletions

View file

@ -43,7 +43,7 @@ type NotificationSys struct {
targetList *event.TargetList
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerRPCClientMap map[xnet.Host]*PeerRPCClient
peerClients []*peerRESTClient
}
// GetARNList - returns available ARNs.
@ -63,11 +63,6 @@ func (sys *NotificationSys) GetARNList() []string {
return arns
}
// GetPeerRPCClient - returns PeerRPCClient of addr.
func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient {
return sys.peerRPCClientMap[addr]
}
// NotificationPeerErr returns error associated for a remote peer.
type NotificationPeerErr struct {
Host xnet.Host // Remote host on which the rpc call was initiated
@ -78,15 +73,15 @@ type NotificationPeerErr struct {
func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for _, client := range sys.peerClients {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.DeleteBucket(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}(client)
}
wg.Wait()
}()
@ -146,52 +141,58 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
}()
}
// ReloadFormat - calls ReloadFormat RPC call on all peers.
// ReloadFormat - calls ReloadFormat REST call on all peers.
func (sys *NotificationSys) ReloadFormat(dryRun bool) []NotificationPeerErr {
var idx = 0
ng := WithNPeers(len(sys.peerRPCClientMap))
for addr, client := range sys.peerRPCClientMap {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(context.Background(), func() error {
return client.ReloadFormat(dryRun)
}, idx, addr)
idx++
}, idx, *client.host)
}
return ng.Wait()
}
// LoadUsers - calls LoadUsers RPC call on all peers.
func (sys *NotificationSys) LoadUsers() []NotificationPeerErr {
var idx = 0
ng := WithNPeers(len(sys.peerRPCClientMap))
for addr, client := range sys.peerRPCClientMap {
ng.Go(context.Background(), client.LoadUsers, idx, addr)
idx++
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(context.Background(), client.LoadUsers, idx, *client.host)
}
return ng.Wait()
}
// LoadCredentials - calls LoadCredentials RPC call on all peers.
func (sys *NotificationSys) LoadCredentials() []NotificationPeerErr {
var idx = 0
ng := WithNPeers(len(sys.peerRPCClientMap))
for addr, client := range sys.peerRPCClientMap {
ng.Go(context.Background(), client.LoadCredentials, idx, addr)
idx++
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(context.Background(), client.LoadCredentials, idx, *client.host)
}
return ng.Wait()
}
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr {
var idx = 0
ng := WithNPeers(len(sys.peerRPCClientMap))
for addr, client := range sys.peerRPCClientMap {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(context.Background(), func() error {
return client.StartProfiling(profiler)
}, idx, addr)
idx++
}, idx, *client.host)
}
return ng.Wait()
}
@ -205,10 +206,13 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
zipWriter := zip.NewWriter(writer)
defer zipWriter.Close()
for addr, client := range sys.peerRPCClientMap {
data, err := client.DownloadProfilingData()
for _, client := range sys.peerClients {
if client == nil {
continue
}
data, err := client.DownloadProfileData()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
continue
@ -218,7 +222,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
// Send profiling data to zip as file
header, zerr := zip.FileInfoHeader(dummyFileInfo{
name: fmt.Sprintf("profiling-%s.pprof", addr),
name: fmt.Sprintf("profiling-%s.pprof", client.host.String()),
size: int64(len(data)),
mode: 0600,
modTime: UTCNow(),
@ -226,20 +230,20 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
sys: nil,
})
if zerr != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, zerr)
continue
}
zwriter, zerr := zipWriter.CreateHeader(header)
if zerr != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, zerr)
continue
}
if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
continue
@ -289,45 +293,48 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
// SignalService - calls signal service RPC call on all peers.
func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr {
var idx = 0
ng := WithNPeers(len(sys.peerRPCClientMap))
for addr, client := range sys.peerRPCClientMap {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(context.Background(), func() error {
return client.SignalService(sig)
}, idx, addr)
idx++
}, idx, *client.host)
}
return ng.Wait()
}
// ServerInfo - calls ServerInfo RPC call on all peers.
func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
var idx = 0
serverInfo := make([]ServerInfo, len(sys.peerRPCClientMap))
serverInfo := make([]ServerInfo, len(sys.peerClients))
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
go func(idx int, client *peerRESTClient) {
defer wg.Done()
// Try to fetch serverInfo remotely in three attempts.
for i := 0; i < 3; i++ {
info, err := client.ServerInfo()
if err == nil {
serverInfo[idx] = ServerInfo{
Addr: addr.String(),
Addr: client.host.String(),
Data: &info,
}
return
}
serverInfo[idx] = ServerInfo{
Addr: addr.String(),
Addr: client.host.String(),
Data: &info,
Error: err.Error(),
}
// Last iteration log the error.
if i == 2 {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
}
@ -336,8 +343,7 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
time.Sleep(1 * time.Second)
}
}
}(idx, addr, client)
idx++
}(index, client)
}
wg.Wait()
return serverInfo
@ -345,19 +351,22 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
var idx = 0
locksResp := make([]*PeerLocks, len(sys.peerRPCClientMap))
locksResp := make([]*PeerLocks, len(sys.peerClients))
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
go func(idx int, client *peerRESTClient) {
defer wg.Done()
// Try to fetch serverInfo remotely in three attempts.
for i := 0; i < 3; i++ {
serverLocksResp, err := client.GetLocks()
if err == nil {
locksResp[idx] = &PeerLocks{
Addr: addr.String(),
Addr: client.host.String(),
Locks: serverLocksResp,
}
return
@ -365,17 +374,16 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
// Last iteration log the error.
if i == 2 {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, addr.String())
logger.LogOnceIf(ctx, err, client.host.String())
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(idx, addr, client)
idx++
}(index, client)
}
wg.Wait()
return locksResp
@ -385,15 +393,18 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}(client)
}
wg.Wait()
}()
@ -403,15 +414,18 @@ func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName stri
func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.RemoveBucketPolicy(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}(client)
}
wg.Wait()
}()
@ -421,15 +435,18 @@ func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName s
func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
go func(client *peerRESTClient, rulesMap event.RulesMap) {
defer wg.Done()
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(addr, client, rulesMap.Clone())
}(client, rulesMap.Clone())
}
wg.Wait()
}()
@ -440,15 +457,18 @@ func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucket
targetID event.TargetID, localPeer xnet.Host) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}(client)
}
wg.Wait()
}()
@ -492,7 +512,17 @@ func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.
return ok
}
// initListeners - initializes PeerRPC clients available in listener.json.
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
type ListenBucketNotificationArgs struct {
AuthArgs `json:"-"`
BucketName string `json:"-"`
EventNames []event.Name `json:"eventNames"`
Pattern string `json:"pattern"`
TargetID event.TargetID `json:"targetId"`
Addr xnet.Host `json:"addr"`
}
// initListeners - initializes PeerREST clients available in listener.json.
func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error {
// listener.json is available/applicable only in DistXL mode.
if !globalIsDistXL {
@ -542,12 +572,12 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
continue
}
rpcClient := sys.GetPeerRPCClient(args.Addr)
if rpcClient == nil {
return fmt.Errorf("unable to find PeerRPCClient by address %v in listener.json for bucket %v", args.Addr, bucketName)
client, err := newPeerRESTClient(&args.Addr)
if err != nil {
return fmt.Errorf("unable to find PeerHost by address %v in listener.json for bucket %v", args.Addr, bucketName)
}
exist, err := rpcClient.RemoteTargetExist(bucketName, args.TargetID)
exist, err := client.RemoteTargetExist(bucketName, args.TargetID)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name)
logger.LogIf(ctx, err)
@ -558,7 +588,7 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
continue
}
target := NewPeerRPCClientTarget(bucketName, args.TargetID, rpcClient)
target := NewPeerRESTClientTarget(bucketName, args.TargetID, client)
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name)
@ -721,24 +751,25 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
// DrivePerfInfo - Drive speed (read and write) information
func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo {
reply := make([]ServerDrivesPerfInfo, len(sys.peerRPCClientMap))
reply := make([]ServerDrivesPerfInfo, len(sys.peerClients))
var wg sync.WaitGroup
var i int
for addr, client := range sys.peerRPCClientMap {
for i, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, idx int) {
go func(client *peerRESTClient, idx int) {
defer wg.Done()
di, err := client.DrivePerfInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
di.Addr = addr.String()
di.Addr = client.host.String()
di.Error = err.Error()
}
reply[idx] = di
}(addr, client, i)
i++
}(client, i)
}
wg.Wait()
return reply
@ -746,24 +777,25 @@ func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo {
// MemUsageInfo - Mem utilization information
func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo {
reply := make([]ServerMemUsageInfo, len(sys.peerRPCClientMap))
reply := make([]ServerMemUsageInfo, len(sys.peerClients))
var wg sync.WaitGroup
var i int
for addr, client := range sys.peerRPCClientMap {
for i, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, idx int) {
go func(client *peerRESTClient, idx int) {
defer wg.Done()
memi, err := client.MemUsageInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
memi.Addr = addr.String()
memi.Addr = client.host.String()
memi.Error = err.Error()
}
reply[idx] = memi
}(addr, client, i)
i++
}(client, i)
}
wg.Wait()
return reply
@ -771,24 +803,25 @@ func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo {
// CPULoadInfo - CPU utilization information
func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo {
reply := make([]ServerCPULoadInfo, len(sys.peerRPCClientMap))
reply := make([]ServerCPULoadInfo, len(sys.peerClients))
var wg sync.WaitGroup
var i int
for addr, client := range sys.peerRPCClientMap {
for i, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, idx int) {
go func(client *peerRESTClient, idx int) {
defer wg.Done()
cpui, err := client.CPULoadInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String())
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
cpui.Addr = addr.String()
cpui.Addr = client.host.String()
cpui.Error = err.Error()
}
reply[idx] = cpui
}(addr, client, i)
i++
}(client, i)
}
wg.Wait()
return reply
@ -797,14 +830,18 @@ func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo {
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
targetList := getNotificationTargets(config)
peerRPCClientMap := makeRemoteRPCClients(endpoints)
remoteHosts := getRemoteHosts(endpoints)
remoteClients, err := getRestClients(remoteHosts)
if err != nil {
logger.FatalIf(err, "Unable to start notification sub system")
}
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
return &NotificationSys{
targetList: targetList,
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerRPCClientMap: peerRPCClientMap,
peerClients: remoteClients,
}
}

View file

@ -18,35 +18,35 @@ package cmd
import "github.com/minio/minio/pkg/event"
// PeerRPCClientTarget - RPCClient is an event.Target which sends event to target of remote peer.
type PeerRPCClientTarget struct {
// PeerRESTClientTarget - RPCClient is an event.Target which sends event to target of remote peer.
type PeerRESTClientTarget struct {
id event.TargetID
remoteTargetID event.TargetID
rpcClient *PeerRPCClient
restClient *peerRESTClient
bucketName string
}
// ID - returns target ID.
func (target *PeerRPCClientTarget) ID() event.TargetID {
func (target *PeerRESTClientTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to remote peer by making RPC call.
func (target *PeerRPCClientTarget) Send(eventData event.Event) error {
return target.rpcClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
func (target *PeerRESTClientTarget) Send(eventData event.Event) error {
return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
}
// Close - does nothing and available for interface compatibility.
func (target *PeerRPCClientTarget) Close() error {
func (target *PeerRESTClientTarget) Close() error {
return nil
}
// NewPeerRPCClientTarget - creates RPCClient target with given target ID available in remote peer.
func NewPeerRPCClientTarget(bucketName string, targetID event.TargetID, rpcClient *PeerRPCClient) *PeerRPCClientTarget {
return &PeerRPCClientTarget{
// NewPeerRESTClientTarget - creates RPCClient target with given target ID available in remote peer.
func NewPeerRESTClientTarget(bucketName string, targetID event.TargetID, restClient *peerRESTClient) *PeerRESTClientTarget {
return &PeerRESTClientTarget{
id: event.TargetID{ID: targetID.ID, Name: targetID.Name + "+" + mustGetUUID()},
remoteTargetID: targetID,
bucketName: bucketName,
rpcClient: rpcClient,
restClient: restClient,
}
}

425
cmd/peer-rest-client.go Normal file
View file

@ -0,0 +1,425 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"io"
"net/url"
"github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
// client to talk to peer Nodes.
type peerRESTClient struct {
host *xnet.Host
restClient *rest.Client
connected bool
}
// Reconnect to a peer rest server.
func (client *peerRESTClient) reConnect() error {
// correct (intelligent) retry logic will be
// implemented in subsequent PRs.
client.connected = true
return nil
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
if !client.connected {
err := client.reConnect()
logger.LogIf(context.Background(), err)
if err != nil {
return nil, err
}
}
if values == nil {
values = make(url.Values)
}
respBody, err = client.restClient.Call(method, values, body, length)
if err == nil {
return respBody, nil
}
if isNetworkError(err) {
client.connected = false
}
return nil, err
}
// Stringer provides a canonicalized representation of node.
func (client *peerRESTClient) String() string {
return client.host.String()
}
// IsOnline - returns whether RPC client failed to connect or not.
func (client *peerRESTClient) IsOnline() bool {
return client.connected
}
// Close - marks the client as closed.
func (client *peerRESTClient) Close() error {
client.connected = false
client.restClient.Close()
return nil
}
// GetLocksResp stores various info from the client for each lock that is requested.
type GetLocksResp map[string][]lockRequesterInfo
// GetLocks - fetch older locks for a remote node.
func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&locks)
return locks, err
}
// ServerInfo - fetch server information for a remote node.
func (client *peerRESTClient) ServerInfo() (info ServerInfoData, err error) {
respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// CPULoadInfo - fetch CPU information for a remote node.
func (client *peerRESTClient) CPULoadInfo() (info ServerCPULoadInfo, err error) {
respBody, err := client.call(peerRESTMethodCPULoadInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// DrivePerfInfo - fetch Drive performance information for a remote node.
func (client *peerRESTClient) DrivePerfInfo() (info ServerDrivesPerfInfo, err error) {
respBody, err := client.call(peerRESTMethodDrivePerfInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// MemUsageInfo - fetch memory usage information for a remote node.
func (client *peerRESTClient) MemUsageInfo() (info ServerMemUsageInfo, err error) {
respBody, err := client.call(peerRESTMethodMemUsageInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// StartProfiling - Issues profiling command on the peer node.
func (client *peerRESTClient) StartProfiling(profiler string) error {
values := make(url.Values)
values.Set(peerRESTProfiler, profiler)
respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// DownloadProfileData - download profiled data from a remote node.
func (client *peerRESTClient) DownloadProfileData() (data []byte, err error) {
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&data)
return data, err
}
// DeleteBucket - Delete notification and policies related to the bucket.
func (client *peerRESTClient) DeleteBucket(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodDeleteBucket, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// ReloadFormat - reload format on the peer node.
func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
values := make(url.Values)
if dryRun {
values.Set(peerRESTDryRun, "true")
} else {
values.Set(peerRESTDryRun, "false")
}
respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// ListenBucketNotification - send listent bucket notification to peer nodes.
func (client *peerRESTClient) ListenBucketNotification(bucket string, eventNames []event.Name,
pattern string, targetID event.TargetID, addr xnet.Host) error {
args := listenBucketNotificationReq{
EventNames: eventNames,
Pattern: pattern,
TargetID: targetID,
Addr: addr,
}
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(args)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketNotificationListen, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// SendEvent - calls send event RPC.
func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
args := sendEventRequest{
TargetID: remoteTargetID,
Event: eventData,
}
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(args)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodSendEvent, values, &reader, -1)
if err != nil {
return err
}
var eventResp sendEventResp
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&eventResp)
if err != nil || !eventResp.Success {
reqInfo := &logger.ReqInfo{BucketName: bucket}
reqInfo.AppendTags("targetID", targetID.Name)
reqInfo.AppendTags("event", eventData.EventName.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
globalNotificationSys.RemoveRemoteTarget(bucket, targetID)
}
return err
}
// RemoteTargetExist - calls remote target ID exist REST API.
func (client *peerRESTClient) RemoteTargetExist(bucket string, targetID event.TargetID) (bool, error) {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(targetID)
if err != nil {
return false, err
}
respBody, err := client.call(peerRESTMethodTargetExists, values, &reader, -1)
if err != nil {
return false, err
}
defer http.DrainBody(respBody)
var targetExists remoteTargetExistsResp
err = gob.NewDecoder(respBody).Decode(&targetExists)
return targetExists.Exists, err
}
// RemoveBucketPolicy - Remove bucket policy on the peer node.
func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodBucketPolicyRemove, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// SetBucketPolicy - Set bucket policy on the peer node.
func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(bucketPolicy)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketPolicySet, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// PutBucketNotification - Put bucket notification on the peer node.
func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap event.RulesMap) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(&rulesMap)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketNotificationPut, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// LoadUsers - send load users command to peer nodes.
func (client *peerRESTClient) LoadUsers() (err error) {
respBody, err := client.call(peerRESTMethodLoadUsers, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadCredentials - send load credentials command to peer nodes.
func (client *peerRESTClient) LoadCredentials() (err error) {
respBody, err := client.call(peerRESTMethodLoadCredentials, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// SignalService - sends signal to peer nodes.
func (client *peerRESTClient) SignalService(sig serviceSignal) error {
values := make(url.Values)
values.Set(peerRESTSignal, string(sig))
respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
func getRemoteHosts(endpoints EndpointList) []*xnet.Host {
var remoteHosts []*xnet.Host
for _, hostStr := range GetRemotePeers(endpoints) {
host, err := xnet.ParseHost(hostStr)
logger.FatalIf(err, "Unable to parse peer Host")
remoteHosts = append(remoteHosts, host)
}
return remoteHosts
}
func getRestClients(peerHosts []*xnet.Host) ([]*peerRESTClient, error) {
restClients := make([]*peerRESTClient, len(peerHosts))
for i, host := range peerHosts {
client, err := newPeerRESTClient(host)
if err != nil {
logger.LogIf(context.Background(), err)
}
restClients[i] = client
}
return restClients, nil
}
// Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
scheme := "http"
if globalIsSSL {
scheme = "https"
}
serverURL := &url.URL{
Scheme: scheme,
Host: peer.String(),
Path: peerRESTPath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: peer.String(),
RootCAs: globalRootCAs,
}
}
restClient, err := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken)
if err != nil {
return &peerRESTClient{host: peer, restClient: restClient, connected: false}, err
}
return &peerRESTClient{host: peer, restClient: restClient, connected: true}, nil
}

48
cmd/peer-rest-common.go Normal file
View file

@ -0,0 +1,48 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
const peerRESTVersion = "v1"
const peerRESTPath = minioReservedBucketPath + "/peer/" + peerRESTVersion
const (
peerRESTMethodServerInfo = "serverinfo"
peerRESTMethodCPULoadInfo = "cpuloadinfo"
peerRESTMethodMemUsageInfo = "memusageinfo"
peerRESTMethodDrivePerfInfo = "driveperfinfo"
peerRESTMethodDeleteBucket = "deletebucket"
peerRESTMethodSignalService = "signalservice"
peerRESTMethodGetLocks = "getlocks"
peerRESTMethodBucketPolicyRemove = "removebucketpolicy"
peerRESTMethodLoadUsers = "loadusers"
peerRESTMethodStartProfiling = "startprofiling"
peerRESTMethodDownloadProfilingData = "downloadprofilingdata"
peerRESTMethodLoadCredentials = "loadcredentials"
peerRESTMethodBucketPolicySet = "setbucketpolicy"
peerRESTMethodBucketNotificationPut = "putbucketnotification"
peerRESTMethodBucketNotificationListen = "listenbucketnotification"
peerRESTMethodReloadFormat = "reloadformat"
peerRESTMethodTargetExists = "targetexists"
peerRESTMethodSendEvent = "sendevent"
)
const (
peerRESTBucket = "bucket"
peerRESTSignal = "signal"
peerRESTProfiler = "profiler"
peerRESTDryRun = "dry-run"
)

621
cmd/peer-rest-server.go Normal file
View file

@ -0,0 +1,621 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"encoding/gob"
"errors"
"fmt"
"net/http"
"path"
"sort"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
// To abstract a node over network.
type peerRESTServer struct {
}
func getServerInfo() (*ServerInfoData, error) {
if globalBootTime.IsZero() {
return nil, errServerNotInitialized
}
objLayer := newObjectLayerFn()
if objLayer == nil {
return nil, errServerNotInitialized
}
// Server info data.
return &ServerInfoData{
StorageInfo: objLayer.StorageInfo(context.Background()),
ConnStats: globalConnStats.toServerConnStats(),
HTTPStats: globalHTTPStats.toServerHTTPStats(),
Properties: ServerProperties{
Uptime: UTCNow().Sub(globalBootTime),
Version: Version,
CommitID: CommitID,
SQSARN: globalNotificationSys.GetARNList(),
Region: globalServerConfig.GetRegion(),
},
}, nil
}
// uptimes - used to sort uptimes in chronological order.
type uptimes []time.Duration
func (ts uptimes) Len() int {
return len(ts)
}
func (ts uptimes) Less(i, j int) bool {
return ts[i] < ts[j]
}
func (ts uptimes) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
// getPeerUptimes - returns the uptime.
func getPeerUptimes(serverInfo []ServerInfo) time.Duration {
// In a single node Erasure or FS backend setup the uptime of
// the setup is the uptime of the single minio server
// instance.
if !globalIsDistXL {
return UTCNow().Sub(globalBootTime)
}
var times []time.Duration
for _, info := range serverInfo {
if info.Error != "" {
continue
}
times = append(times, info.Data.Properties.Uptime)
}
// Sort uptimes in chronological order.
sort.Sort(uptimes(times))
// Return the latest time as the uptime.
return times[0]
}
// GetLocksHandler - returns list of older lock from the server.
func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "GetLocks")
locks := globalLockServer.ll.DupLockMap()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(locks))
w.(http.Flusher).Flush()
}
// LoadUsersHandler - returns server info.
func (s *peerRESTServer) LoadUsersHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
err := globalIAMSys.Load(objAPI)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// StartProfilingHandler - Issues the start profiling command.
func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
profiler := vars[peerRESTProfiler]
if profiler == "" {
s.writeErrorResponse(w, errors.New("profiler name is missing"))
return
}
if globalProfiler != nil {
globalProfiler.Stop()
}
var err error
globalProfiler, err = startProfiler(profiler, "")
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// ServerInfoHandler - returns server info.
func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "ServerInfo")
info, err := getServerInfo()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DownloadProflingDataHandler - returns proflied data.
func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "DownloadProfiling")
profileData, err := getProfileData()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(profileData))
}
// CPULoadInfoHandler - returns CPU Load info.
func (s *peerRESTServer) CPULoadInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "CPULoadInfo")
info := localEndpointsCPULoad(globalEndpoints)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DrivePerfInfoHandler - returns Drive Performance info.
func (s *peerRESTServer) DrivePerfInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "DrivePerfInfo")
info := localEndpointsDrivePerf(globalEndpoints)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// MemUsageInfoHandler - returns Memory Usage info.
func (s *peerRESTServer) MemUsageInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "MemUsageInfo")
info := localEndpointsMemUsage(globalEndpoints)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// LoadCredentialsHandler - loads credentials.
func (s *peerRESTServer) LoadCredentialsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
// Construct path to config.json for the given bucket.
configFile := path.Join(bucketConfigPrefix, minioConfigFile)
transactionConfigFile := configFile + ".transaction"
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid race.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
s.writeErrorResponse(w, err)
return
}
objLock.RUnlock()
if err := globalConfigSys.Load(newObjectLayerFn()); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// DeleteBucketHandler - Delete notification and policies related to the bucket.
func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
globalNotificationSys.RemoveNotification(bucketName)
globalPolicySys.Remove(bucketName)
w.(http.Flusher).Flush()
}
// ReloadFormatHandler - Reload Format.
func (s *peerRESTServer) ReloadFormatHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
dryRunString := vars[peerRESTDryRun]
if dryRunString == "" {
s.writeErrorResponse(w, errors.New("dry run parameter is missing"))
return
}
var dryRun bool
switch strings.ToLower(dryRunString) {
case "true":
dryRun = true
case "false":
dryRun = false
default:
s.writeErrorResponse(w, errInvalidArgument)
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
err := objAPI.ReloadFormat(context.Background(), dryRun)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// RemoveBucketPolicyHandler - Remove bucket policy.
func (s *peerRESTServer) RemoveBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
globalPolicySys.Remove(bucketName)
w.(http.Flusher).Flush()
}
// SetBucketPolicyHandler - Set bucket policy.
func (s *peerRESTServer) SetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var policyData policy.Policy
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&policyData)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalPolicySys.Set(bucketName, policyData)
w.(http.Flusher).Flush()
}
type remoteTargetExistsResp struct {
Exists bool
}
// TargetExistsHandler - Check if Target exists.
func (s *peerRESTServer) TargetExistsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "TargetExists")
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var targetID event.TargetID
if r.ContentLength <= 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&targetID)
if err != nil {
s.writeErrorResponse(w, err)
return
}
var targetExists remoteTargetExistsResp
targetExists.Exists = globalNotificationSys.RemoteTargetExist(bucketName, targetID)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(&targetExists))
}
type sendEventRequest struct {
Event event.Event
TargetID event.TargetID
}
type sendEventResp struct {
Success bool
}
// SendEventHandler - Send Event.
func (s *peerRESTServer) SendEventHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "SendEvent")
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var eventReq sendEventRequest
if r.ContentLength <= 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&eventReq)
if err != nil {
s.writeErrorResponse(w, err)
return
}
var eventResp sendEventResp
eventResp.Success = true
errs := globalNotificationSys.send(bucketName, eventReq.Event, eventReq.TargetID)
for i := range errs {
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", eventReq.Event.EventName.String())
reqInfo.AppendTags("targetName", eventReq.TargetID.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, errs[i].Err)
eventResp.Success = false
s.writeErrorResponse(w, errs[i].Err)
return
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(&eventResp))
w.(http.Flusher).Flush()
}
// PutBucketNotificationHandler - Set bucket policy.
func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var rulesMap event.RulesMap
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&rulesMap)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalNotificationSys.AddRulesMap(bucketName, rulesMap)
w.(http.Flusher).Flush()
}
type listenBucketNotificationReq struct {
EventNames []event.Name `json:"eventNames"`
Pattern string `json:"pattern"`
TargetID event.TargetID `json:"targetId"`
Addr xnet.Host `json:"addr"`
}
// ListenBucketNotificationHandler - Listen bucket notification handler.
func (s *peerRESTServer) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var args listenBucketNotificationReq
if r.ContentLength <= 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&args)
if err != nil {
s.writeErrorResponse(w, err)
return
}
restClient, err := newPeerRESTClient(&args.Addr)
if err != nil {
s.writeErrorResponse(w, fmt.Errorf("unable to find PeerRESTClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr))
return
}
target := NewPeerRESTClientTarget(bucketName, args.TargetID, restClient)
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
if err := globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
reqInfo.AppendTags("target", target.id.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
// SignalServiceHandler - signal service handler.
func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
signalString := vars[peerRESTSignal]
if signalString == "" {
s.writeErrorResponse(w, errors.New("signal name is missing"))
return
}
signal := serviceSignal(signalString)
defer w.(http.Flusher).Flush()
switch signal {
case serviceRestart, serviceStop:
globalServiceSignalCh <- signal
default:
s.writeErrorResponse(w, errUnsupportedSignal)
return
}
}
func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
}
// IsValid - To authenticate and verify the time difference.
func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
if err := storageServerRequestValidate(r); err != nil {
s.writeErrorResponse(w, err)
return false
}
return true
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
subrouter := router.PathPrefix(peerRESTPath).Subrouter()
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodCPULoadInfo).HandlerFunc(httpTraceHdrs(server.CPULoadInfoHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodMemUsageInfo).HandlerFunc(httpTraceHdrs(server.MemUsageInfoHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDrivePerfInfo).HandlerFunc(httpTraceHdrs(server.DrivePerfInfoHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketPolicyRemove).HandlerFunc(httpTraceAll(server.RemoveBucketPolicyHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketPolicySet).HandlerFunc(httpTraceHdrs(server.SetBucketPolicyHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodLoadUsers).HandlerFunc(httpTraceAll(server.LoadUsersHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProflingDataHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodLoadCredentials).HandlerFunc(httpTraceHdrs(server.LoadCredentialsHandler))
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketNotificationListen).HandlerFunc(httpTraceHdrs(server.ListenBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...)
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler))
}

View file

@ -1,261 +0,0 @@
/*
* Minio Cloud Storage, (C) 2018, 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"crypto/tls"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
// PeerRPCClient - peer RPC client talks to peer RPC server.
type PeerRPCClient struct {
*RPCClient
}
// DeleteBucket - calls delete bucket RPC.
func (rpcClient *PeerRPCClient) DeleteBucket(bucketName string) error {
args := DeleteBucketArgs{BucketName: bucketName}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".DeleteBucket", &args, &reply)
}
// SetBucketPolicy - calls set bucket policy RPC.
func (rpcClient *PeerRPCClient) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) error {
args := SetBucketPolicyArgs{
BucketName: bucketName,
Policy: *bucketPolicy,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".SetBucketPolicy", &args, &reply)
}
// RemoveBucketPolicy - calls remove bucket policy RPC.
func (rpcClient *PeerRPCClient) RemoveBucketPolicy(bucketName string) error {
args := RemoveBucketPolicyArgs{
BucketName: bucketName,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".RemoveBucketPolicy", &args, &reply)
}
// PutBucketNotification - calls put bukcet notification RPC.
func (rpcClient *PeerRPCClient) PutBucketNotification(bucketName string, rulesMap event.RulesMap) error {
args := PutBucketNotificationArgs{
BucketName: bucketName,
RulesMap: rulesMap,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".PutBucketNotification", &args, &reply)
}
// ListenBucketNotification - calls listen bucket notification RPC.
func (rpcClient *PeerRPCClient) ListenBucketNotification(bucketName string, eventNames []event.Name,
pattern string, targetID event.TargetID, addr xnet.Host) error {
args := ListenBucketNotificationArgs{
BucketName: bucketName,
EventNames: eventNames,
Pattern: pattern,
TargetID: targetID,
Addr: addr,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".ListenBucketNotification", &args, &reply)
}
// RemoteTargetExist - calls remote target ID exist RPC.
func (rpcClient *PeerRPCClient) RemoteTargetExist(bucketName string, targetID event.TargetID) (bool, error) {
args := RemoteTargetExistArgs{
BucketName: bucketName,
TargetID: targetID,
}
var reply bool
err := rpcClient.Call(peerServiceName+".RemoteTargetExist", &args, &reply)
return reply, err
}
// SendEvent - calls send event RPC.
func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
args := SendEventArgs{
BucketName: bucketName,
TargetID: remoteTargetID,
Event: eventData,
}
var reply bool
err := rpcClient.Call(peerServiceName+".SendEvent", &args, &reply)
if err != nil && !reply {
reqInfo := &logger.ReqInfo{BucketName: bucketName}
reqInfo.AppendTags("targetID", targetID.Name)
reqInfo.AppendTags("event", eventData.EventName.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
globalNotificationSys.RemoveRemoteTarget(bucketName, targetID)
}
return err
}
// ReloadFormat - calls reload format RPC.
func (rpcClient *PeerRPCClient) ReloadFormat(dryRun bool) error {
args := ReloadFormatArgs{
DryRun: dryRun,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".ReloadFormat", &args, &reply)
}
// LoadUsers - calls load users RPC.
func (rpcClient *PeerRPCClient) LoadUsers() error {
args := AuthArgs{}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".LoadUsers", &args, &reply)
}
// LoadCredentials - calls load credentials RPC.
func (rpcClient *PeerRPCClient) LoadCredentials() error {
args := AuthArgs{}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".LoadCredentials", &args, &reply)
}
// DrivePerfInfo - returns drive performance info for remote server.
func (rpcClient *PeerRPCClient) DrivePerfInfo() (ServerDrivesPerfInfo, error) {
args := AuthArgs{}
var reply ServerDrivesPerfInfo
err := rpcClient.Call(peerServiceName+".DrivePerfInfo", &args, &reply)
return reply, err
}
// MemUsageInfo - returns mem utilization info for remote server
func (rpcClient *PeerRPCClient) MemUsageInfo() (ServerMemUsageInfo, error) {
args := AuthArgs{}
var reply ServerMemUsageInfo
err := rpcClient.Call(peerServiceName+".MemUsageInfo", &args, &reply)
return reply, err
}
// CPULoadInfo - returns cpu performance info for remote server
func (rpcClient *PeerRPCClient) CPULoadInfo() (ServerCPULoadInfo, error) {
args := AuthArgs{}
var reply ServerCPULoadInfo
err := rpcClient.Call(peerServiceName+".CPULoadInfo", &args, &reply)
return reply, err
}
// StartProfiling - starts profiling on the remote server.
func (rpcClient *PeerRPCClient) StartProfiling(profiler string) error {
args := StartProfilingArgs{Profiler: profiler}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".StartProfiling", &args, &reply)
}
// DownloadProfilingData - download already running profiling on the remote server.
func (rpcClient *PeerRPCClient) DownloadProfilingData() ([]byte, error) {
args := AuthArgs{}
var reply []byte
err := rpcClient.Call(peerServiceName+".DownloadProfilingData", &args, &reply)
return reply, err
}
// SignalService - calls load server info RPC.
func (rpcClient *PeerRPCClient) SignalService(sig serviceSignal) error {
args := SignalServiceArgs{Sig: sig}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".SignalService", &args, &reply)
}
// ServerInfo - calls load server info RPC.
func (rpcClient *PeerRPCClient) ServerInfo() (ServerInfoData, error) {
args := AuthArgs{}
reply := ServerInfoData{}
err := rpcClient.Call(peerServiceName+".ServerInfo", &args, &reply)
return reply, err
}
// GetLocksResp stores various info from the client for each lock that is requested.
type GetLocksResp map[string][]lockRequesterInfo
// GetLocks - returns the lock information on the server to which the RPC call is made.
func (rpcClient *PeerRPCClient) GetLocks() (resp GetLocksResp, err error) {
err = rpcClient.Call(peerServiceName+".GetLocks", &AuthArgs{}, &resp)
return resp, err
}
// NewPeerRPCClient - returns new peer RPC client.
func NewPeerRPCClient(host *xnet.Host) (*PeerRPCClient, error) {
scheme := "http"
if globalIsSSL {
scheme = "https"
}
serviceURL := &xnet.URL{
Scheme: scheme,
Host: host.String(),
Path: peerServicePath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: host.Name,
RootCAs: globalRootCAs,
}
}
rpcClient, err := NewRPCClient(
RPCClientArgs{
NewAuthTokenFunc: newAuthToken,
RPCVersion: globalRPCAPIVersion,
ServiceName: peerServiceName,
ServiceURL: serviceURL,
TLSConfig: tlsConfig,
},
)
if err != nil {
return nil, err
}
return &PeerRPCClient{rpcClient}, nil
}
// makeRemoteRPCClients - creates Peer RPCClients for given endpoint list.
func makeRemoteRPCClients(endpoints EndpointList) map[xnet.Host]*PeerRPCClient {
peerRPCClientMap := make(map[xnet.Host]*PeerRPCClient)
for _, hostStr := range GetRemotePeers(endpoints) {
host, err := xnet.ParseHost(hostStr)
logger.FatalIf(err, "Unable to parse peer RPC Host")
rpcClient, err := NewPeerRPCClient(host)
logger.FatalIf(err, "Unable to parse peer RPC Client")
peerRPCClientMap[*host] = rpcClient
}
return peerRPCClientMap
}

View file

@ -1,419 +0,0 @@
/*
* Minio Cloud Storage, (C) 2018, 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"path"
"sort"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
xrpc "github.com/minio/minio/cmd/rpc"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
const peerServiceName = "Peer"
const peerServiceSubPath = "/peer/remote"
var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath)
// peerRPCReceiver - Peer RPC receiver for peer RPC server.
type peerRPCReceiver struct{}
// DeleteBucketArgs - delete bucket RPC arguments.
type DeleteBucketArgs struct {
AuthArgs
BucketName string
}
// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
func (receiver *peerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
globalNotificationSys.RemoveNotification(args.BucketName)
globalPolicySys.Remove(args.BucketName)
return nil
}
// SetBucketPolicyArgs - set bucket policy RPC arguments.
type SetBucketPolicyArgs struct {
AuthArgs
BucketName string
Policy policy.Policy
}
// SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys.
func (receiver *peerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
globalPolicySys.Set(args.BucketName, args.Policy)
return nil
}
// RemoveBucketPolicyArgs - delete bucket policy RPC arguments.
type RemoveBucketPolicyArgs struct {
AuthArgs
BucketName string
}
// RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys.
func (receiver *peerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
globalPolicySys.Remove(args.BucketName)
return nil
}
// PutBucketNotificationArgs - put bucket notification RPC arguments.
type PutBucketNotificationArgs struct {
AuthArgs
BucketName string
RulesMap event.RulesMap
}
// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
func (receiver *peerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap)
return nil
}
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
type ListenBucketNotificationArgs struct {
AuthArgs `json:"-"`
BucketName string `json:"-"`
EventNames []event.Name `json:"eventNames"`
Pattern string `json:"pattern"`
TargetID event.TargetID `json:"targetId"`
Addr xnet.Host `json:"addr"`
}
// ListenBucketNotification - handles listen bucket notification RPC call.
// It creates PeerRPCClient target which pushes requested events to target in remote peer.
func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr)
if rpcClient == nil {
return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)
}
target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient)
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil {
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
reqInfo.AppendTags("target", target.id.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return err
}
return nil
}
// RemoteTargetExistArgs - remote target ID exist RPC arguments.
type RemoteTargetExistArgs struct {
AuthArgs
BucketName string
TargetID event.TargetID
}
// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
func (receiver *peerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *bool) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
*reply = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID)
return nil
}
// SendEventArgs - send event RPC arguments.
type SendEventArgs struct {
AuthArgs
Event event.Event
TargetID event.TargetID
BucketName string
}
// SendEvent - handles send event RPC call which sends given event to target by given target ID.
func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Set default to true to keep the target.
*reply = true
errs := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID)
for i := range errs {
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
reqInfo.AppendTags("targetName", args.TargetID.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, errs[i].Err)
*reply = false // send failed i.e. do not keep the target.
return errs[i].Err
}
return nil
}
// ReloadFormatArgs - send event RPC arguments.
type ReloadFormatArgs struct {
AuthArgs
DryRun bool
}
// ReloadFormat - handles reload format RPC call, reloads latest `format.json`
func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
return objAPI.ReloadFormat(context.Background(), args.DryRun)
}
// LoadUsers - handles load users RPC call.
func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
return globalIAMSys.Load(objAPI)
}
// LoadCredentials - handles load credentials RPC call.
func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Construct path to config.json for the given bucket.
configFile := path.Join(bucketConfigPrefix, minioConfigFile)
transactionConfigFile := configFile + ".transaction"
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid race.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
return err
}
objLock.RUnlock()
return globalConfigSys.Load(newObjectLayerFn())
}
// DrivePerfInfo - handles drive performance RPC call
func (receiver *peerRPCReceiver) DrivePerfInfo(args *AuthArgs, reply *ServerDrivesPerfInfo) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
*reply = localEndpointsDrivePerf(globalEndpoints)
return nil
}
// CPULoadInfo - handles cpu performance RPC call
func (receiver *peerRPCReceiver) CPULoadInfo(args *AuthArgs, reply *ServerCPULoadInfo) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
*reply = localEndpointsCPULoad(globalEndpoints)
return nil
}
// MemUsageInfo - handles mem utilization RPC call
func (receiver *peerRPCReceiver) MemUsageInfo(args *AuthArgs, reply *ServerMemUsageInfo) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
*reply = localEndpointsMemUsage(globalEndpoints)
return nil
}
// uptimes - used to sort uptimes in chronological order.
type uptimes []time.Duration
func (ts uptimes) Len() int {
return len(ts)
}
func (ts uptimes) Less(i, j int) bool {
return ts[i] < ts[j]
}
func (ts uptimes) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
// getPeerUptimes - returns the uptime.
func getPeerUptimes(serverInfo []ServerInfo) time.Duration {
// In a single node Erasure or FS backend setup the uptime of
// the setup is the uptime of the single minio server
// instance.
if !globalIsDistXL {
return UTCNow().Sub(globalBootTime)
}
var times []time.Duration
for _, info := range serverInfo {
if info.Error != "" {
continue
}
times = append(times, info.Data.Properties.Uptime)
}
// Sort uptimes in chronological order.
sort.Sort(uptimes(times))
// Return the latest time as the uptime.
return times[0]
}
// StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call
type StartProfilingArgs struct {
AuthArgs
Profiler string
}
// StartProfiling - profiling server receiver.
func (receiver *peerRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error {
if globalProfiler != nil {
globalProfiler.Stop()
}
var err error
globalProfiler, err = startProfiler(args.Profiler, "")
return err
}
// DownloadProfilingData - download profiling data.
func (receiver *peerRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) error {
var err error
*reply, err = getProfileData()
return err
}
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
// SignalServiceArgs - send event RPC arguments.
type SignalServiceArgs struct {
AuthArgs
Sig serviceSignal
}
// SignalService - signal service receiver.
func (receiver *peerRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error {
switch args.Sig {
case serviceRestart, serviceStop:
globalServiceSignalCh <- args.Sig
default:
return errUnsupportedSignal
}
return nil
}
// ServerInfo - server info receiver.
func (receiver *peerRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) error {
if globalBootTime.IsZero() {
return errServerNotInitialized
}
// Build storage info
objLayer := newObjectLayerFn()
if objLayer == nil {
return errServerNotInitialized
}
// Server info data.
*reply = ServerInfoData{
StorageInfo: objLayer.StorageInfo(context.Background()),
ConnStats: globalConnStats.toServerConnStats(),
HTTPStats: globalHTTPStats.toServerHTTPStats(),
Properties: ServerProperties{
Uptime: UTCNow().Sub(globalBootTime),
Version: Version,
CommitID: CommitID,
SQSARN: globalNotificationSys.GetARNList(),
Region: globalServerConfig.GetRegion(),
},
}
return nil
}
// GetLocks - Get Locks receiver.
func (receiver *peerRPCReceiver) GetLocks(args *AuthArgs, reply *GetLocksResp) error {
if globalBootTime.IsZero() {
return errServerNotInitialized
}
// Build storage info
objLayer := newObjectLayerFn()
if objLayer == nil {
return errServerNotInitialized
}
// Locks data.
*reply = globalLockServer.ll.DupLockMap()
return nil
}
// NewPeerRPCServer - returns new peer RPC server.
func NewPeerRPCServer() (*xrpc.Server, error) {
rpcServer := xrpc.NewServer()
if err := rpcServer.RegisterName(peerServiceName, &peerRPCReceiver{}); err != nil {
return nil, err
}
return rpcServer, nil
}
// registerPeerRPCRouter - creates and registers Peer RPC server and its router.
func registerPeerRPCRouter(router *mux.Router) {
rpcServer, err := NewPeerRPCServer()
logger.FatalIf(err, "Unable to initialize peer RPC Server")
subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter()
subrouter.Path(peerServiceSubPath).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP))
}

View file

@ -38,11 +38,12 @@ func registerDistXLRouters(router *mux.Router, endpoints EndpointList) {
// Register storage rpc router only if its a distributed setup.
registerStorageRESTHandlers(router, endpoints)
// Register peer REST router only if its a distributed setup.
registerPeerRESTHandlers(router)
// Register distributed namespace lock.
registerDistNSLockRouter(router)
// Register peer communication router.
registerPeerRPCRouter(router)
}
// List of some generic handlers which are applied for all incoming requests.

View file

@ -22,12 +22,12 @@ import (
)
// Type of service signals currently supported.
type serviceSignal int
type serviceSignal string
const (
serviceStatus = iota // Gets status about the service.
serviceRestart // Restarts the service.
serviceStop // Stops the server.
serviceStatus serviceSignal = "serviceStatus" // Gets status about the service.
serviceRestart = "serviceRestart" // Restarts the service.
serviceStop = "serviceStop" // Stops the server.
// Add new service requests here.
)