minio/cmd/event-notifier_test.go
Harshavardhana 716316f711 Reduce number of envs and options from command line. (#3230)
Ref #3229

After review with @abperiasamy we decided to remove all the unnecessary options

- MINIO_BROWSER (Implemented as a security feature but now deemed obsolete
  since even if blocking access to MINIO_BROWSER, s3 API port is open)
- MINIO_CACHE_EXPIRY (Defaults to 72h)
- MINIO_MAXCONN (No one used this option and we don't test this)
- MINIO_ENABLE_FSMETA (Enable FSMETA all the time)

Remove --ignore-disks option - this option was implemented when XL layer
 would initialize the backend disks and heal them automatically to disallow
 XL accidentally using the root partition itself this option was introduced.

This behavior has been changed XL no longer automatically initializes
`format.json`  a HEAL is controlled activity, so ignore-disks is not
useful anymore. This change also addresses the problems of our documentation
going forward and keeps things simple. This patch brings in reduction of
options and defaulting them to a valid known inputs.  This patch also
serves as a guideline of limiting many ways to do the same thing.
2016-11-11 16:40:55 -08:00

514 lines
14 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"net"
"reflect"
"testing"
"time"
)
// Test InitEventNotifier with faulty disks
func TestInitEventNotifierFaultyDisks(t *testing.T) {
// Prepare for tests
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root directory after the test ends.
defer removeAll(rootPath)
disks, err := getRandomDisks(1)
if err != nil {
t.Fatal("Unable to create directories for FS backend. ", err)
}
defer removeAll(disks[0])
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatal(err)
}
obj, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal("Unable to initialize FS backend.", err)
}
bucketName := "bucket"
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal("Unexpected error:", err)
}
fs := obj.(fsObjects)
fsstorage := fs.storage.(*posix)
listenARN := "arn:minio:sns:us-east-1:1:listen"
queueARN := "arn:minio:sqs:us-east-1:1:redis"
// Write a notification.xml in the disk
notificationXML := "<NotificationConfiguration>"
notificationXML += "<TopicConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Topic>" + listenARN + "</Topic></TopicConfiguration>"
notificationXML += "<QueueConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Queue>" + queueARN + "</Queue></QueueConfiguration>"
notificationXML += "</NotificationConfiguration>"
if err := fsstorage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil {
t.Fatal("Unexpected error:", err)
}
// Test initEventNotifier() with faulty disks
for i := 1; i <= 5; i++ {
fs.storage = newNaughtyDisk(fsstorage, map[int]error{i: errFaultyDisk}, nil)
if err := initEventNotifier(fs); errorCause(err) != errFaultyDisk {
t.Fatal("Unexpected error:", err)
}
}
}
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
func TestInitEventNotifierWithAMQP(t *testing.T) {
// initialize the server and obtain the credentials and root.
// credentials are necessary to sign the HTTP request.
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root directory after the test ends.
defer removeAll(rootPath)
disks, err := getRandomDisks(1)
defer removeAll(disks[0])
if err != nil {
t.Fatal("Unable to create directories for FS backend. ", err)
}
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatal(err)
}
fs, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("AMQP config didn't fail.")
}
}
// InitEventNotifierWithElasticSearch - test InitEventNotifier when ElasticSearch is not ready
func TestInitEventNotifierWithElasticSearch(t *testing.T) {
// initialize the server and obtain the credentials and root.
// credentials are necessary to sign the HTTP request.
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root directory after the test ends.
defer removeAll(rootPath)
disks, err := getRandomDisks(1)
defer removeAll(disks[0])
if err != nil {
t.Fatal("Unable to create directories for FS backend. ", err)
}
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatal(err)
}
fs, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("ElasticSearch config didn't fail.")
}
}
// InitEventNotifierWithRedis - test InitEventNotifier when Redis is not ready
func TestInitEventNotifierWithRedis(t *testing.T) {
// initialize the server and obtain the credentials and root.
// credentials are necessary to sign the HTTP request.
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root directory after the test ends.
defer removeAll(rootPath)
disks, err := getRandomDisks(1)
defer removeAll(disks[0])
if err != nil {
t.Fatal("Unable to create directories for FS backend. ", err)
}
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatal(err)
}
fs, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("Redis config didn't fail.")
}
}
type TestPeerRPCServerData struct {
serverType string
testServer TestServer
}
func (s *TestPeerRPCServerData) Setup(t *testing.T) {
s.testServer = StartTestPeersRPCServer(t, s.serverType)
// setup port and minio addr
host, port, err := net.SplitHostPort(s.testServer.Server.Listener.Addr().String())
if err != nil {
t.Fatalf("Initialisation error: %v", err)
}
globalMinioHost = host
globalMinioPort = port
globalMinioAddr = getLocalAddress(
s.testServer.SrvCmdCfg,
)
// initialize the peer client(s)
initGlobalS3Peers(s.testServer.Disks)
}
func (s *TestPeerRPCServerData) TearDown() {
s.testServer.Stop()
_ = removeAll(s.testServer.Root)
for _, d := range s.testServer.Disks {
_ = removeAll(d.Path)
}
}
func TestSetNGetBucketNotification(t *testing.T) {
s := TestPeerRPCServerData{serverType: "XL"}
// setup and teardown
s.Setup(t)
defer s.TearDown()
bucketName := getRandomBucketName()
obj := s.testServer.Obj
if err := initEventNotifier(obj); err != nil {
t.Fatal("Unexpected error:", err)
}
globalEventNotifier.SetBucketNotificationConfig(bucketName, &notificationConfig{})
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
if nConfig == nil {
t.Errorf("Notification expected to be set, but notification not set.")
}
if !reflect.DeepEqual(nConfig, &notificationConfig{}) {
t.Errorf("Mismatching notification configs.")
}
}
func TestInitEventNotifier(t *testing.T) {
s := TestPeerRPCServerData{serverType: "XL"}
// setup and teardown
s.Setup(t)
defer s.TearDown()
// test if empty object layer arg. returns expected error.
if err := initEventNotifier(nil); err == nil || err != errInvalidArgument {
t.Fatalf("initEventNotifier returned unexpected error value - %v", err)
}
obj := s.testServer.Obj
bucketName := getRandomBucketName()
// declare sample configs
filterRules := []filterRule{
{
Name: "prefix",
Value: "minio",
},
{
Name: "suffix",
Value: "*.jpg",
},
}
sampleSvcCfg := ServiceConfig{
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
filterStruct{
keyFilter{filterRules},
},
"1",
}
sampleNotifCfg := notificationConfig{
QueueConfigs: []queueConfig{
{
ServiceConfig: sampleSvcCfg,
QueueARN: "testqARN",
},
},
}
sampleListenCfg := []listenerConfig{
{
TopicConfig: topicConfig{ServiceConfig: sampleSvcCfg,
TopicARN: "testlARN"},
TargetServer: globalMinioAddr,
},
}
// create bucket
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal("Unexpected error:", err)
}
// bucket is created, now writing should not give errors.
if err := persistNotificationConfig(bucketName, &sampleNotifCfg, obj); err != nil {
t.Fatal("Unexpected error:", err)
}
if err := persistListenerConfig(bucketName, sampleListenCfg, obj); err != nil {
t.Fatal("Unexpected error:", err)
}
// needed to load listener config from disk for testing (in
// single peer mode, the listener config is ingored, but here
// we want to test the loading from disk too.)
globalIsDistXL = true
// test event notifier init
if err := initEventNotifier(obj); err != nil {
t.Fatal("Unexpected error:", err)
}
// fetch bucket configs and verify
ncfg := globalEventNotifier.GetBucketNotificationConfig(bucketName)
if ncfg == nil {
t.Error("Bucket notification was not present for ", bucketName)
}
if len(ncfg.QueueConfigs) != 1 || ncfg.QueueConfigs[0].QueueARN != "testqARN" {
t.Error("Unexpected bucket notification found - ", *ncfg)
}
if globalEventNotifier.GetExternalTarget("testqARN") != nil {
t.Error("A logger was not expected to be found as it was not enabled in the config.")
}
lcfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
if lcfg == nil {
t.Error("Bucket listener was not present for ", bucketName)
}
if len(lcfg) != 1 || lcfg[0].TargetServer != globalMinioAddr || lcfg[0].TopicConfig.TopicARN != "testlARN" {
t.Error("Unexpected listener config found - ", lcfg[0])
}
if globalEventNotifier.GetInternalTarget("testlARN") == nil {
t.Error("A listen logger was not found.")
}
}
func TestListenBucketNotification(t *testing.T) {
s := TestPeerRPCServerData{serverType: "XL"}
// setup and teardown
s.Setup(t)
defer s.TearDown()
// test initialisation
obj := s.testServer.Obj
bucketName := "bucket"
objectName := "object"
// Create the bucket to listen on
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal("Unexpected error:", err)
}
listenARN := fmt.Sprintf("%s:%s:1:%s-%s",
minioTopic,
serverConfig.GetRegion(),
snsTypeMinio,
s.testServer.Server.Listener.Addr(),
)
lcfg := listenerConfig{
TopicConfig: topicConfig{
ServiceConfig{
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
filterStruct{},
"0",
},
listenARN,
},
TargetServer: globalMinioAddr,
}
// write listener config to storage layer
lcfgs := []listenerConfig{lcfg}
if err := persistListenerConfig(bucketName, lcfgs, obj); err != nil {
t.Fatalf("Test Setup error: %v", err)
}
// needed to load listener config from disk for testing (in
// single peer mode, the listener config is ingored, but here
// we want to test the loading from disk too.)
globalIsDistXL = true
// Init event notifier
if err := initEventNotifier(obj); err != nil {
t.Fatal("Unexpected error:", err)
}
// Check if the config is loaded
listenerCfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
if listenerCfg == nil {
t.Fatal("Cannot load bucket listener config")
}
if len(listenerCfg) != 1 {
t.Fatal("Listener config is not correctly loaded. Exactly one listener config is expected")
}
// Check if topic ARN is correct
if listenerCfg[0].TopicConfig.TopicARN != listenARN {
t.Fatal("Configured topic ARN is incorrect.")
}
// Create a new notification event channel.
nEventCh := make(chan []NotificationEvent)
// Close the listener channel.
defer close(nEventCh)
// Add events channel for listener.
if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil {
t.Fatalf("Test Setup error: %v", err)
}
// Remove listen channel after the writer has closed or the
// client disconnected.
defer globalEventNotifier.RemoveListenerChan(listenARN)
// Fire an event notification
go eventNotify(eventData{
Type: ObjectRemovedDelete,
Bucket: bucketName,
ObjInfo: ObjectInfo{
Bucket: bucketName,
Name: objectName,
},
ReqParams: map[string]string{
"sourceIPAddress": "localhost:1337",
},
})
// Wait for the event notification here, if nothing is received within 30 seconds,
// test error will be fired
select {
case n := <-nEventCh:
// Check that received event
if len(n) == 0 {
t.Fatal("Unexpected error occurred")
}
if n[0].S3.Object.Key != objectName {
t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName)
}
break
case <-time.After(3 * time.Second):
break
}
}
func TestAddRemoveBucketListenerConfig(t *testing.T) {
s := TestPeerRPCServerData{serverType: "XL"}
// setup and teardown
s.Setup(t)
defer s.TearDown()
// test code
obj := s.testServer.Obj
if err := initEventNotifier(obj); err != nil {
t.Fatalf("Failed to initialize event notifier: %v", err)
}
// Make a bucket to store topicConfigs.
randBucket := getRandomBucketName()
if err := obj.MakeBucket(randBucket); err != nil {
t.Fatalf("Failed to make bucket %s", randBucket)
}
// Add a topicConfig to an empty notificationConfig.
accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano())
accountARN := fmt.Sprintf(
"arn:minio:sqs:%s:%s:listen-%s",
serverConfig.GetRegion(),
accountID,
globalMinioAddr,
)
// Make topic configuration
filterRules := []filterRule{
{
Name: "prefix",
Value: "minio",
},
{
Name: "suffix",
Value: "*.jpg",
},
}
sampleTopicCfg := topicConfig{
TopicARN: accountARN,
ServiceConfig: ServiceConfig{
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
filterStruct{
keyFilter{filterRules},
},
"sns-" + accountID,
},
}
sampleListenerCfg := &listenerConfig{
TopicConfig: sampleTopicCfg,
TargetServer: globalMinioAddr,
}
testCases := []struct {
lCfg *listenerConfig
expectedErr error
}{
{sampleListenerCfg, nil},
{nil, errInvalidArgument},
}
for i, test := range testCases {
err := AddBucketListenerConfig(randBucket, test.lCfg, obj)
if err != test.expectedErr {
t.Errorf(
"Test %d: Failed with error %v, expected to fail with %v",
i+1, err, test.expectedErr,
)
}
}
// test remove listener actually removes a listener
RemoveBucketListenerConfig(randBucket, sampleListenerCfg, obj)
// since it does not return errors we fetch the config and
// check
lcSlice := globalEventNotifier.GetBucketListenerConfig(randBucket)
if len(lcSlice) != 0 {
t.Errorf("Remove Listener Config Test: did not remove listener config - %v",
lcSlice)
}
}