minio/cmd/notification.go
Anis Elleuch be1700f595 Avoid startup abort when a notify target is down (#6126)
Minio server was preventing itself to start when any notification
target is down and not running. The PR changes the behavior by
avoiding startup abort in that case, so the user will still
be able to access Minio server using mc admin commands after
a restart or set config commands.
2018-07-10 07:20:31 +05:30

703 lines
21 KiB
Go

/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"net/url"
"path"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
// NotificationSys - notification system.
type NotificationSys struct {
sync.RWMutex
targetList *event.TargetList
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerRPCClientMap map[xnet.Host]*PeerRPCClient
}
// GetARNList - returns available ARNs.
func (sys *NotificationSys) GetARNList() []string {
arns := []string{}
region := globalServerConfig.GetRegion()
for _, targetID := range sys.targetList.List() {
arns = append(arns, targetID.ToARN(region).String())
}
return arns
}
// GetPeerRPCClient - returns PeerRPCClient of addr.
func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient {
return sys.peerRPCClientMap[addr]
}
// NotificationPeerErr returns error associated for a remote peer.
type NotificationPeerErr struct {
Host xnet.Host // Remote host on which the rpc call was initiated
Err error // Error returned by the remote peer for an rpc call
}
// DeleteBucket - calls DeleteBucket RPC call on all peers.
func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
if err := client.DeleteBucket(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}
wg.Wait()
}()
}
// SetCredentials - calls SetCredentials RPC call on all peers.
func (sys *NotificationSys) SetCredentials(credentials auth.Credentials) map[xnet.Host]error {
errors := make(map[xnet.Host]error)
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
// Try to set credentials in three attempts.
for i := 0; i < 3; i++ {
err := client.SetCredentials(credentials)
if err == nil {
break
}
errors[addr] = err
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(addr, client)
}
wg.Wait()
return errors
}
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}
wg.Wait()
}()
}
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
if err := client.RemoveBucketPolicy(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}
wg.Wait()
}()
}
// PutBucketNotification - calls PutBucketNotification RPC call on all peers.
func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
defer wg.Done()
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.LogIf(ctx, err)
}
}(addr, client, rulesMap.Clone())
}
wg.Wait()
}()
}
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string,
targetID event.TargetID, localPeer xnet.Host) {
go func() {
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
logger.LogIf(ctx, err)
}
}(addr, client)
}
wg.Wait()
}()
}
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Target, rulesMap event.RulesMap) error {
if err := sys.targetList.Add(target); err != nil {
return err
}
sys.Lock()
targetMap := sys.bucketRemoteTargetRulesMap[bucketName]
if targetMap == nil {
targetMap = make(map[event.TargetID]event.RulesMap)
}
targetMap[target.ID()] = rulesMap.Clone()
sys.bucketRemoteTargetRulesMap[bucketName] = targetMap
sys.Unlock()
sys.AddRulesMap(bucketName, rulesMap)
return nil
}
// RemoteTargetExist - checks whether given target ID is a HTTP/PeerRPC client target or not.
func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.TargetID) bool {
sys.Lock()
defer sys.Unlock()
targetMap, ok := sys.bucketRemoteTargetRulesMap[bucketName]
if ok {
_, ok = targetMap[targetID]
}
return ok
}
// initListeners - initializes PeerRPC clients available in listener.json.
func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error {
// listener.json is available/applicable only in DistXL mode.
if !globalIsDistXL {
return nil
}
// Construct path to listener.json for the given bucket.
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
transactionConfigFile := configFile + ".transaction"
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
reader, e := readConfig(ctx, objAPI, configFile)
if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) {
return e
}
listenerList := []ListenBucketNotificationArgs{}
if reader != nil {
err := json.NewDecoder(reader).Decode(&listenerList)
if err != nil {
logger.LogIf(ctx, err)
return err
}
}
if len(listenerList) == 0 {
// Nothing to initialize for empty listener list.
return nil
}
activeListenerList := []ListenBucketNotificationArgs{}
for _, args := range listenerList {
found, err := isLocalHost(args.Addr.Name)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("host", args.Addr.Name)
logger.LogIf(ctx, err)
return err
}
if found {
// As this function is called at startup, skip HTTP listener to this host.
continue
}
rpcClient := sys.GetPeerRPCClient(args.Addr)
if rpcClient == nil {
return fmt.Errorf("unable to find PeerRPCClient by address %v in listener.json for bucket %v", args.Addr, bucketName)
}
exist, err := rpcClient.RemoteTargetExist(bucketName, args.TargetID)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name)
logger.LogIf(ctx, err)
return err
}
if !exist {
// Skip previously connected HTTP listener which is not found in remote peer.
continue
}
target := NewPeerRPCClientTarget(bucketName, args.TargetID, rpcClient)
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name)
logger.LogIf(ctx, err)
return err
}
activeListenerList = append(activeListenerList, args)
}
data, err := json.Marshal(activeListenerList)
if err != nil {
logger.LogIf(ctx, err)
return err
}
return saveConfig(objAPI, configFile, data)
}
// Init - initializes notification system from notification.xml and listener.json of all buckets.
func (sys *NotificationSys) Init(objAPI ObjectLayer) error {
if objAPI == nil {
return errInvalidArgument
}
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil {
return err
}
for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
config, err := readNotificationConfig(ctx, objAPI, bucket.Name)
if err != nil {
if !IsErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
return err
}
} else {
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
}
if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil {
return err
}
}
return nil
}
// AddRulesMap - adds rules map for bucket name.
func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
rulesMap = rulesMap.Clone()
for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] {
rulesMap.Add(targetRulesMap)
}
rulesMap.Add(sys.bucketRulesMap[bucketName])
sys.bucketRulesMap[bucketName] = rulesMap
}
// RemoveRulesMap - removes rules map for bucket name.
func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
sys.bucketRulesMap[bucketName].Remove(rulesMap)
if len(sys.bucketRulesMap[bucketName]) == 0 {
delete(sys.bucketRulesMap, bucketName)
}
}
// RemoveNotification - removes all notification configuration for bucket name.
func (sys *NotificationSys) RemoveNotification(bucketName string) {
sys.Lock()
defer sys.Unlock()
delete(sys.bucketRulesMap, bucketName)
for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] {
sys.targetList.Remove(targetID)
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
}
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
// RemoveAllRemoteTargets - closes and removes all HTTP/PeerRPC client targets.
func (sys *NotificationSys) RemoveAllRemoteTargets() {
for _, targetMap := range sys.bucketRemoteTargetRulesMap {
for targetID := range targetMap {
sys.targetList.Remove(targetID)
}
}
}
// RemoveRemoteTarget - closes and removes target by target ID.
func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) {
for terr := range sys.targetList.Remove(targetID) {
reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, terr.Err)
}
sys.Lock()
defer sys.Unlock()
if _, ok := sys.bucketRemoteTargetRulesMap[bucketName]; ok {
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
if len(sys.bucketRemoteTargetRulesMap[bucketName]) == 0 {
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
}
}
func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) {
errCh := sys.targetList.Send(eventData, targetIDs...)
for terr := range errCh {
errs = append(errs, terr)
if sys.RemoteTargetExist(bucketName, terr.ID) {
sys.RemoveRemoteTarget(bucketName, terr.ID)
}
}
return errs
}
// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
sys.RLock()
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.RUnlock()
if len(targetIDSet) == 0 {
return nil
}
targetIDs := targetIDSet.ToSlice()
return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
}
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
targetList := getNotificationTargets(config)
peerRPCClientMap := makeRemoteRPCClients(endpoints)
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
return &NotificationSys{
targetList: targetList,
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerRPCClientMap: peerRPCClientMap,
}
}
type eventArgs struct {
EventName event.Name
BucketName string
Object ObjectInfo
ReqParams map[string]string
Host string
Port string
UserAgent string
}
// ToEvent - converts to notification event.
func (args eventArgs) ToEvent() event.Event {
getOriginEndpoint := func() string {
host := globalMinioHost
if host == "" {
// FIXME: Send FQDN or hostname of this machine than sending IP address.
host = localIP4.ToSlice()[0]
}
return fmt.Sprintf("%s://%s:%s", getURLScheme(globalIsSSL), host, globalMinioPort)
}
creds := globalServerConfig.GetCredential()
eventTime := UTCNow()
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
newEvent := event.Event{
EventVersion: "2.0",
EventSource: "minio:s3",
AwsRegion: globalServerConfig.GetRegion(),
EventTime: eventTime.Format(event.AMZTimeFormat),
EventName: args.EventName,
UserIdentity: event.Identity{creds.AccessKey},
RequestParameters: args.ReqParams,
ResponseElements: map[string]string{
"x-amz-request-id": uniqueID,
"x-minio-origin-endpoint": getOriginEndpoint(), // Minio specific custom elements.
},
S3: event.Metadata{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: event.Bucket{
Name: args.BucketName,
OwnerIdentity: event.Identity{creds.AccessKey},
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{
Key: url.QueryEscape(args.Object.Name),
VersionID: "1",
Sequencer: uniqueID,
},
},
Source: event.Source{
Host: args.Host,
Port: args.Port,
UserAgent: args.UserAgent,
},
}
if args.EventName != event.ObjectRemovedDelete {
newEvent.S3.Object.ETag = args.Object.ETag
newEvent.S3.Object.Size = args.Object.Size
newEvent.S3.Object.ContentType = args.Object.ContentType
newEvent.S3.Object.UserMetadata = args.Object.UserDefined
}
return newEvent
}
func sendEvent(args eventArgs) {
// globalNotificationSys is not initialized in gateway mode.
if globalNotificationSys == nil {
return
}
notifyCh := globalNotificationSys.Send(args)
go func() {
for _, err := range notifyCh {
reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
reqInfo.AppendTags("EventName", args.EventName.String())
reqInfo.AppendTags("targetID", err.ID.Name)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogOnceIf(ctx, err.Err, err.ID)
}
}()
}
func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error {
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data))
if err != nil {
return err
}
_, err = objAPI.PutObject(context.Background(), minioMetaBucket, configFile, hashReader, nil)
return err
}
var errConfigNotFound = errors.New("config file not found")
func readConfig(ctx context.Context, objAPI ObjectLayer, configFile string) (*bytes.Buffer, error) {
var buffer bytes.Buffer
// Read entire content by setting size to -1
err := objAPI.GetObject(ctx, minioMetaBucket, configFile, 0, -1, &buffer, "")
if err != nil {
// Ignore if err is ObjectNotFound or IncompleteBody when bucket is not configured with notification
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, errConfigNotFound
}
logger.GetReqInfo(ctx).AppendTags("configFile", configFile)
logger.LogIf(ctx, err)
return nil, err
}
// Return NoSuchNotifications on empty content.
if buffer.Len() == 0 {
return nil, errNoSuchNotifications
}
return &buffer, nil
}
func readNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string) (*event.Config, error) {
// Construct path to notification.xml for the given bucket.
configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
reader, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
err = errNoSuchNotifications
}
return nil, err
}
config, err := event.ParseConfig(reader, globalServerConfig.GetRegion(), globalNotificationSys.targetList)
logger.LogIf(ctx, err)
return config, err
}
func saveNotificationConfig(objAPI ObjectLayer, bucketName string, config *event.Config) error {
data, err := xml.Marshal(config)
if err != nil {
return err
}
configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
return saveConfig(objAPI, configFile, data)
}
// SaveListener - saves HTTP client currently listening for events to listener.json.
func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error {
// listener.json is available/applicable only in DistXL mode.
if !globalIsDistXL {
return nil
}
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
// Construct path to listener.json for the given bucket.
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
transactionConfigFile := configFile + ".transaction"
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
reader, err := readConfig(ctx, objAPI, configFile)
if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
return err
}
listenerList := []ListenBucketNotificationArgs{}
if reader != nil {
if err = json.NewDecoder(reader).Decode(&listenerList); err != nil {
logger.LogIf(ctx, err)
return err
}
}
listenerList = append(listenerList, ListenBucketNotificationArgs{
EventNames: eventNames,
Pattern: pattern,
TargetID: targetID,
Addr: addr,
})
data, err := json.Marshal(listenerList)
if err != nil {
logger.LogIf(ctx, err)
return err
}
return saveConfig(objAPI, configFile, data)
}
// RemoveListener - removes HTTP client currently listening for events from listener.json.
func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.TargetID, addr xnet.Host) error {
// listener.json is available/applicable only in DistXL mode.
if !globalIsDistXL {
return nil
}
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
// Construct path to listener.json for the given bucket.
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
transactionConfigFile := configFile + ".transaction"
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
reader, err := readConfig(ctx, objAPI, configFile)
if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
return err
}
listenerList := []ListenBucketNotificationArgs{}
if reader != nil {
if err = json.NewDecoder(reader).Decode(&listenerList); err != nil {
logger.LogIf(ctx, err)
return err
}
}
if len(listenerList) == 0 {
// Nothing to remove.
return nil
}
activeListenerList := []ListenBucketNotificationArgs{}
for _, args := range listenerList {
if args.TargetID == targetID && args.Addr.Equal(addr) {
// Skip if matches
continue
}
activeListenerList = append(activeListenerList, args)
}
data, err := json.Marshal(activeListenerList)
if err != nil {
logger.LogIf(ctx, err)
return err
}
return saveConfig(objAPI, configFile, data)
}