minio/pkg/madmin/lock-commands.go
Krishnan Parthasarathi c8f57133a4 Implement list, clear locks REST API w/ pkg/madmin support (#3491)
* Filter lock info based on bucket, prefix and time since lock was held
* Implement list and clear locks REST API
* madmin: Add list and clear locks API
* locks: Clear locks matching bucket, prefix, relTime.
* Gather lock information across nodes for both list and clear locks admin REST API.
* docs: Add lock API to management APIs
2017-01-03 23:39:22 -08:00

152 lines
4.3 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package madmin
import (
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
)
type statusType string
const (
runningStatus statusType = "Running"
blockedStatus statusType = "Blocked"
)
type lockType string
const (
debugRLockStr lockType = "RLock"
debugWLockStr lockType = "WLock"
)
// OpsLockState - represents lock specific details.
type OpsLockState struct {
OperationID string `json:"opsID"` // String containing operation ID.
LockSource string `json:"lockSource"` // Operation type (GetObject, PutObject...)
LockType lockType `json:"lockType"` // Lock type (RLock, WLock)
Status statusType `json:"status"` // Status can be Running/Ready/Blocked.
Since time.Time `json:"statusSince"` // Time when the lock was initially held.
Duration time.Duration `json:"statusDuration"` // Duration since the lock was held.
}
// VolumeLockInfo - represents summary and individual lock details of all
// locks held on a given bucket, object.
type VolumeLockInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
// All locks blocked + running for given <volume,path> pair.
LocksOnObject int64 `json:"locksOnObject"`
// Count of operations which has successfully acquired the lock
// but hasn't unlocked yet( operation in progress).
LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"`
// Count of operations which are blocked waiting for the lock
// to be released.
TotalBlockedLocks int64 `json:"locksBlockedOnObject"`
// State information containing state of the locks for all operations
// on given <volume,path> pair.
LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"`
}
// getLockInfos - unmarshal []VolumeLockInfo from a reader.
func getLockInfos(body io.Reader) ([]VolumeLockInfo, error) {
respBytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
var lockInfos []VolumeLockInfo
err = json.Unmarshal(respBytes, &lockInfos)
if err != nil {
return nil, err
}
return lockInfos, nil
}
// ListLocks - Calls List Locks Management API to fetch locks matching
// bucket, prefix and held before the duration supplied.
func (adm *AdminClient) ListLocks(bucket, prefix string, olderThan time.Duration) ([]VolumeLockInfo, error) {
queryVal := make(url.Values)
queryVal.Set("lock", "")
queryVal.Set("bucket", bucket)
queryVal.Set("prefix", prefix)
queryVal.Set("older-than", olderThan.String())
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "list")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute GET on /?lock to list locks.
resp, err := adm.executeMethod("GET", reqData)
defer closeResponse(resp)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("Got HTTP Status: " + resp.Status)
}
return getLockInfos(resp.Body)
}
// ClearLocks - Calls Clear Locks Management API to clear locks held
// on bucket, matching prefix older than duration supplied.
func (adm *AdminClient) ClearLocks(bucket, prefix string, olderThan time.Duration) ([]VolumeLockInfo, error) {
queryVal := make(url.Values)
queryVal.Set("lock", "")
queryVal.Set("bucket", bucket)
queryVal.Set("prefix", prefix)
queryVal.Set("older-than", olderThan.String())
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "clear")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute POST on /?lock to clear locks.
resp, err := adm.executeMethod("POST", reqData)
defer closeResponse(resp)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("Got HTTP Status: " + resp.Status)
}
return getLockInfos(resp.Body)
}