minio/cmd/peer-rest-server.go
Harshavardhana 0570c21671 fix: replaced drive properly by healing the entire drive
Bonus fixes, we do not need reload format anymore
as the replaced drive is healed locally we only need
to ensure that drive heal reloads the drive properly.

We preserve the UUID of the original order, this means
that the replacement in `format.json` doesn't mean that
the drive needs to be reloaded into memory anymore.

fixes #10791
2020-10-31 00:30:14 -07:00

1018 lines
29 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"encoding/gob"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
b "github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
trace "github.com/minio/minio/pkg/trace"
)
// To abstract a node over network.
type peerRESTServer struct{}
// GetLocksHandler - returns list of older lock from the server.
func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "GetLocks")
llockers := make(GetLocksResp, 0, len(globalLockServers))
for _, llocker := range globalLockServers {
llockers = append(llockers, llocker.DupLockMap())
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers))
w.(http.Flusher).Flush()
}
// DeletePolicyHandler - deletes a policy on the server.
func (s *peerRESTServer) DeletePolicyHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
policyName := vars[peerRESTPolicy]
if policyName == "" {
s.writeErrorResponse(w, errors.New("policyName is missing"))
return
}
if err := globalIAMSys.DeletePolicy(policyName); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// LoadPolicyHandler - reloads a policy on the server.
func (s *peerRESTServer) LoadPolicyHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
policyName := vars[peerRESTPolicy]
if policyName == "" {
s.writeErrorResponse(w, errors.New("policyName is missing"))
return
}
if err := globalIAMSys.LoadPolicy(objAPI, policyName); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// LoadPolicyMappingHandler - reloads a policy mapping on the server.
func (s *peerRESTServer) LoadPolicyMappingHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
userOrGroup := vars[peerRESTUserOrGroup]
if userOrGroup == "" {
s.writeErrorResponse(w, errors.New("user-or-group is missing"))
return
}
_, isGroup := vars[peerRESTIsGroup]
if err := globalIAMSys.LoadPolicyMapping(objAPI, userOrGroup, isGroup); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// DeleteServiceAccountHandler - deletes a service account on the server.
func (s *peerRESTServer) DeleteServiceAccountHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
accessKey := vars[peerRESTUser]
if accessKey == "" {
s.writeErrorResponse(w, errors.New("service account name is missing"))
return
}
if err := globalIAMSys.DeleteServiceAccount(r.Context(), accessKey); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// LoadServiceAccountHandler - reloads a service account on the server.
func (s *peerRESTServer) LoadServiceAccountHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
accessKey := vars[peerRESTUser]
if accessKey == "" {
s.writeErrorResponse(w, errors.New("service account parameter is missing"))
return
}
if err := globalIAMSys.LoadServiceAccount(accessKey); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// DeleteUserHandler - deletes a user on the server.
func (s *peerRESTServer) DeleteUserHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
accessKey := vars[peerRESTUser]
if accessKey == "" {
s.writeErrorResponse(w, errors.New("username is missing"))
return
}
if err := globalIAMSys.DeleteUser(accessKey); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// LoadUserHandler - reloads a user on the server.
func (s *peerRESTServer) LoadUserHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
accessKey := vars[peerRESTUser]
if accessKey == "" {
s.writeErrorResponse(w, errors.New("username is missing"))
return
}
temp, err := strconv.ParseBool(vars[peerRESTUserTemp])
if err != nil {
s.writeErrorResponse(w, err)
return
}
var userType = regularUser
if temp {
userType = stsUser
}
if err = globalIAMSys.LoadUser(objAPI, accessKey, userType); err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// LoadGroupHandler - reloads group along with members list.
func (s *peerRESTServer) LoadGroupHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
vars := mux.Vars(r)
group := vars[peerRESTGroup]
err := globalIAMSys.LoadGroup(objAPI, group)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.(http.Flusher).Flush()
}
// StartProfilingHandler - Issues the start profiling command.
func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
profiles := strings.Split(vars[peerRESTProfiler], ",")
if len(profiles) == 0 {
s.writeErrorResponse(w, errors.New("profiler name is missing"))
return
}
globalProfilerMu.Lock()
defer globalProfilerMu.Unlock()
if globalProfiler == nil {
globalProfiler = make(map[string]minioProfiler, 10)
}
// Stop profiler of all types if already running
for k, v := range globalProfiler {
for _, p := range profiles {
if p == k {
v.Stop()
delete(globalProfiler, k)
}
}
}
for _, profiler := range profiles {
prof, err := startProfiler(profiler)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalProfiler[profiler] = prof
}
w.(http.Flusher).Flush()
}
// DownloadProfilingDataHandler - returns profiled data.
func (s *peerRESTServer) DownloadProfilingDataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "DownloadProfiling")
profileData, err := getProfileData()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(profileData))
}
// ServerInfoHandler - returns Server Info
func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "ServerInfo")
info := getLocalServerProperty(globalEndpoints, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
func (s *peerRESTServer) NetOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "NetOBDInfo")
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
// Use this trailer to send additional headers after sending body
w.Header().Set("Trailer", "FinalStatus")
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
n, err := io.Copy(ioutil.Discard, r.Body)
if err == io.ErrUnexpectedEOF {
w.Header().Set("FinalStatus", err.Error())
return
}
if err != nil && err != io.EOF {
logger.LogIf(ctx, err)
w.Header().Set("FinalStatus", err.Error())
return
}
if n != r.ContentLength {
err := fmt.Errorf("OBD: short read: expected %d found %d", r.ContentLength, n)
logger.LogIf(ctx, err)
w.Header().Set("FinalStatus", err.Error())
return
}
w.Header().Set("FinalStatus", "Success")
w.(http.Flusher).Flush()
}
func (s *peerRESTServer) DispatchNetOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
done := keepHTTPResponseAlive(w)
ctx := newContext(r, w, "DispatchNetOBDInfo")
info := globalNotificationSys.NetOBDInfo(ctx)
done(nil)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
w.(http.Flusher).Flush()
}
// DriveOBDInfoHandler - returns Drive OBD info.
func (s *peerRESTServer) DriveOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "DriveOBDInfo"))
defer cancel()
infoSerial := getLocalDrivesOBD(ctx, false, globalEndpoints, r)
infoParallel := getLocalDrivesOBD(ctx, true, globalEndpoints, r)
errStr := ""
if infoSerial.Error != "" {
errStr = "serial: " + infoSerial.Error
}
if infoParallel.Error != "" {
errStr = errStr + " parallel: " + infoParallel.Error
}
info := madmin.ServerDrivesOBDInfo{
Addr: infoSerial.Addr,
Serial: infoSerial.Serial,
Parallel: infoParallel.Parallel,
Error: errStr,
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// CPUOBDInfoHandler - returns CPU OBD info.
func (s *peerRESTServer) CPUOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := getLocalCPUOBDInfo(ctx, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DiskHwOBDInfoHandler - returns Disk HW OBD info.
func (s *peerRESTServer) DiskHwOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := getLocalDiskHwOBD(ctx, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// OsOBDInfoHandler - returns Os OBD info.
func (s *peerRESTServer) OsOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := getLocalOsInfoOBD(ctx, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// ProcOBDInfoHandler - returns Proc OBD info.
func (s *peerRESTServer) ProcOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := getLocalProcOBD(ctx, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// MemOBDInfoHandler - returns Mem OBD info.
func (s *peerRESTServer) MemOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := getLocalMemOBD(ctx, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DeleteBucketMetadataHandler - Delete in memory bucket metadata
func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
globalBucketMetadataSys.Remove(bucketName)
w.(http.Flusher).Flush()
}
// LoadBucketMetadataHandler - reloads in memory bucket metadata
func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
meta, err := loadBucketMetadata(r.Context(), objAPI, bucketName)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalBucketMetadataSys.Set(bucketName, meta)
if meta.notificationConfig != nil {
globalNotificationSys.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap())
}
if meta.bucketTargetConfig != nil {
globalBucketTargetSys.UpdateAllTargets(bucketName, meta.bucketTargetConfig)
}
}
// CycleServerBloomFilterHandler cycles bloom filter on server.
func (s *peerRESTServer) CycleServerBloomFilterHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "CycleServerBloomFilter")
var req bloomFilterRequest
err := gob.NewDecoder(r.Body).Decode(&req)
if err != nil {
s.writeErrorResponse(w, err)
return
}
bf, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current)
if err != nil {
s.writeErrorResponse(w, err)
return
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(bf))
}
// PutBucketNotificationHandler - Set bucket policy.
func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var rulesMap event.RulesMap
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&rulesMap)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalNotificationSys.AddRulesMap(bucketName, rulesMap)
w.(http.Flusher).Flush()
}
// Return disk IDs of all the local disks.
func getLocalDiskIDs(z *erasureServerSets) []string {
var ids []string
for zoneIdx := range z.serverSets {
for _, set := range z.serverSets[zoneIdx].sets {
disks := set.getDisks()
for _, disk := range disks {
if disk == nil {
continue
}
if disk.IsLocal() {
id, err := disk.GetDiskID()
if err != nil {
continue
}
if id == "" {
continue
}
ids = append(ids, id)
}
}
}
}
return ids
}
// HealthHandler - returns true of health
func (s *peerRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
s.IsValid(w, r)
}
// GetLocalDiskIDs - Return disk IDs of all the local disks.
func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "GetLocalDiskIDs")
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
z, ok := objLayer.(*erasureServerSets)
if !ok {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
ids := getLocalDiskIDs(z)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(ids))
w.(http.Flusher).Flush()
}
// ServerUpdateHandler - updates the current server.
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
var info serverUpdateInfo
err := gob.NewDecoder(r.Body).Decode(&info)
if err != nil {
s.writeErrorResponse(w, err)
return
}
if _, err = updateServer(info.URL, info.Sha256Sum, info.Time, getMinioMode()); err != nil {
s.writeErrorResponse(w, err)
return
}
}
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
// SignalServiceHandler - signal service handler.
func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
signalString := vars[peerRESTSignal]
if signalString == "" {
s.writeErrorResponse(w, errors.New("signal name is missing"))
return
}
si, err := strconv.Atoi(signalString)
if err != nil {
s.writeErrorResponse(w, err)
return
}
signal := serviceSignal(si)
defer w.(http.Flusher).Flush()
switch signal {
case serviceRestart:
globalServiceSignalCh <- signal
case serviceStop:
globalServiceSignalCh <- signal
default:
s.writeErrorResponse(w, errUnsupportedSignal)
return
}
}
// ListenHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
values := r.URL.Query()
var prefix string
if len(values[peerRESTListenPrefix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
if len(values[peerRESTListenPrefix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil {
s.writeErrorResponse(w, err)
return
}
prefix = values[peerRESTListenPrefix][0]
}
var suffix string
if len(values[peerRESTListenSuffix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
if len(values[peerRESTListenSuffix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil {
s.writeErrorResponse(w, err)
return
}
suffix = values[peerRESTListenSuffix][0]
}
pattern := event.NewPattern(prefix, suffix)
var eventNames []event.Name
for _, ev := range values[peerRESTListenEvents] {
eventName, err := event.ParseName(ev)
if err != nil {
s.writeErrorResponse(w, err)
return
}
eventNames = append(eventNames, eventName)
}
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
globalHTTPListen.Subscribe(ch, doneCh, func(evI interface{}) bool {
ev, ok := evI.(event.Event)
if !ok {
return false
}
if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" {
if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) {
return false
}
}
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
})
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w)
for {
select {
case ev := <-ch:
if err := enc.Encode(ev); err != nil {
return
}
w.(http.Flusher).Flush()
case <-keepAliveTicker.C:
if err := enc.Encode(&event.Event{}); err != nil {
return
}
w.(http.Flusher).Flush()
}
}
}
// TraceHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
trcAll := r.URL.Query().Get(peerRESTTraceAll) == "true"
trcErr := r.URL.Query().Get(peerRESTTraceErr) == "true"
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
globalHTTPTrace.Subscribe(ch, doneCh, func(entry interface{}) bool {
return mustTrace(entry, trcAll, trcErr)
})
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w)
for {
select {
case entry := <-ch:
if err := enc.Encode(entry); err != nil {
return
}
w.(http.Flusher).Flush()
case <-keepAliveTicker.C:
if err := enc.Encode(&trace.Info{}); err != nil {
return
}
w.(http.Flusher).Flush()
}
}
}
func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
ctx := newContext(r, w, "BackgroundHealStatus")
state, ok := getLocalBackgroundHealStatus()
if !ok {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
// ConsoleLogHandler sends console logs of this node back to peer rest client
func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
ch := make(chan interface{}, 2000)
globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil)
enc := gob.NewEncoder(w)
for {
select {
case entry := <-ch:
if err := enc.Encode(entry); err != nil {
return
}
w.(http.Flusher).Flush()
case <-r.Context().Done():
return
}
}
}
func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
}
// IsValid - To authenticate and verify the time difference.
func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
if err := storageServerRequestValidate(r); err != nil {
s.writeErrorResponse(w, err)
return false
}
return true
}
// GetBandwidth gets the bandwidth for the buckets requested.
func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
bucketsString := r.URL.Query().Get("buckets")
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
selectBuckets := b.SelectBuckets(strings.Split(bucketsString, ",")...)
report := globalBucketMonitor.GetReport(selectBuckets)
enc := gob.NewEncoder(w)
if err := enc.Encode(report); err != nil {
s.writeErrorResponse(w, errors.New("Encoding report failed: "+err.Error()))
return
}
w.(http.Flusher).Flush()
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
subrouter := router.PathPrefix(peerRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodProcOBDInfo).HandlerFunc(httpTraceHdrs(server.ProcOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMemOBDInfo).HandlerFunc(httpTraceHdrs(server.MemOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfoOBDInfo).HandlerFunc(httpTraceHdrs(server.OsOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwOBDInfo).HandlerFunc(httpTraceHdrs(server.DiskHwOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUOBDInfo).HandlerFunc(httpTraceHdrs(server.CPUOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveOBDInfo).HandlerFunc(httpTraceHdrs(server.DriveOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetOBDInfo).HandlerFunc(httpTraceHdrs(server.NetOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetOBDInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(httpTraceAll(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicy).HandlerFunc(httpTraceAll(server.LoadPolicyHandler)).Queries(restQueries(peerRESTPolicy)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicyMapping).HandlerFunc(httpTraceAll(server.LoadPolicyMappingHandler)).Queries(restQueries(peerRESTUserOrGroup)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteUser).HandlerFunc(httpTraceAll(server.DeleteUserHandler)).Queries(restQueries(peerRESTUser)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteServiceAccount).HandlerFunc(httpTraceAll(server.DeleteServiceAccountHandler)).Queries(restQueries(peerRESTUser)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUser).HandlerFunc(httpTraceAll(server.LoadUserHandler)).Queries(restQueries(peerRESTUser, peerRESTUserTemp)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadServiceAccount).HandlerFunc(httpTraceAll(server.LoadServiceAccountHandler)).Queries(restQueries(peerRESTUser)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProfilingDataHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(httpTraceHdrs(server.GetLocalDiskIDs))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBandwidth).HandlerFunc(httpTraceHdrs(server.GetBandwidth))
}