/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "fmt" "reflect" "testing" "time" ) // Tests event notify. func TestEventNotify(t *testing.T) { ExecObjectLayerTest(t, testEventNotify) } func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { bucketName := getRandomBucketName() // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) if err := initEventNotifier(obj); err != nil { t.Fatal("Unexpected error:", err) } // Notify object created event. eventNotify(eventData{ Type: ObjectCreatedPost, Bucket: bucketName, ObjInfo: ObjectInfo{ Bucket: bucketName, Name: "object1", }, ReqParams: map[string]string{ "sourceIPAddress": "localhost:1337", }, }) if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, nil); err != errInvalidArgument { t.Errorf("Expected error %s, got %s", errInvalidArgument, err) } if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{}); err != nil { t.Errorf("Expected error to be nil, got %s", err) } nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) if nConfig == nil { t.Errorf("Notification expected to be set, but notification not set.") } if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) { t.Errorf("Mismatching notification configs.") } // Notify object created event. eventNotify(eventData{ Type: ObjectRemovedDelete, Bucket: bucketName, ObjInfo: ObjectInfo{ Bucket: bucketName, Name: "object1", }, ReqParams: map[string]string{ "sourceIPAddress": "localhost:1337", }, }) } // Tests various forms of inititalization of event notifier. func TestInitEventNotifier(t *testing.T) { disks, err := getRandomDisks(1) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } defer removeRoots(disks) fs, _, err := initObjectLayer(disks, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } nDisks := 16 disks, err = getRandomDisks(nDisks) if err != nil { t.Fatal("Unable to create directories for XL backend. ", err) } defer removeRoots(disks) xl, _, err := initObjectLayer(disks, nil) if err != nil { t.Fatal("Unable to initialize XL backend.", err) } // Collection of test cases for inititalizing event notifier. testCases := []struct { objAPI ObjectLayer configs map[string]*notificationConfig err error }{ // Test 1 - invalid arguments. { objAPI: nil, err: errInvalidArgument, }, // Test 2 - valid FS object layer but no bucket notifications. { objAPI: fs, err: nil, }, // Test 3 - valid XL object layer but no bucket notifications. { objAPI: xl, err: nil, }, } // Validate if event notifier is properly initialized. for i, testCase := range testCases { err = initEventNotifier(testCase.objAPI) if err != testCase.err { t.Errorf("Test %d: Expected %s, but got: %s", i+1, testCase.err, err) } } } // Test InitEventNotifier with faulty disks func TestInitEventNotifierFaultyDisks(t *testing.T) { // Prepare for tests rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) disk, err := getRandomDisks(1) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } defer removeAll(disk[0]) obj, _, err := initObjectLayer(disk, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } bucketName := "bucket" if err := obj.MakeBucket(bucketName); err != nil { t.Fatal("Unexpected error:", err) } fs := obj.(fsObjects) fsstorage := fs.storage.(*posix) listenARN := "arn:minio:sns:us-east-1:1:listen" queueARN := "arn:minio:sqs:us-east-1:1:redis" // Write a notification.xml in the disk notificationXML := "" notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + "" notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + "" notificationXML += "" if err := fsstorage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { t.Fatal("Unexpected error:", err) } // Test initEventNotifier() with faulty disks for i := 1; i <= 5; i++ { fs.storage = newNaughtyDisk(fsstorage, map[int]error{i: errFaultyDisk}, nil) if err := initEventNotifier(fs); errorCause(err) != errFaultyDisk { t.Fatal("Unexpected error:", err) } } } // InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared func TestInitEventNotifierWithAMQP(t *testing.T) { // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) disk, err := getRandomDisks(1) defer removeAll(disk[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } fs, _, err := initObjectLayer(disk, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("AMQP config didn't fail.") } } // InitEventNotifierWithElasticSearch - test InitEventNotifier when ElasticSearch is not ready func TestInitEventNotifierWithElasticSearch(t *testing.T) { // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) disk, err := getRandomDisks(1) defer removeAll(disk[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } fs, _, err := initObjectLayer(disk, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("ElasticSearch config didn't fail.") } } // InitEventNotifierWithRedis - test InitEventNotifier when Redis is not ready func TestInitEventNotifierWithRedis(t *testing.T) { // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) disk, err := getRandomDisks(1) defer removeAll(disk[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } fs, _, err := initObjectLayer(disk, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("Redis config didn't fail.") } } // TestListenBucketNotification - test Listen Bucket Notification process func TestListenBucketNotification(t *testing.T) { bucketName := "bucket" objectName := "object" // Prepare for tests // Create fs backend rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Init Test config failed") } // remove the root folder after the test ends. defer removeAll(rootPath) disk, err := getRandomDisks(1) defer removeAll(disk[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } obj, _, err := initObjectLayer(disk, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } // Create the bucket to listen on if err := obj.MakeBucket(bucketName); err != nil { t.Fatal("Unexpected error:", err) } listenARN := "arn:minio:sns:us-east-1:1:listen" queueARN := "arn:minio:sqs:us-east-1:1:redis" fs := obj.(fsObjects) storage := fs.storage.(*posix) // Create and store notification.xml with listen and queue notification configured notificationXML := "" notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + "" notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + "" notificationXML += "" if err := storage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { t.Fatal("Unexpected error:", err) } // Init event notifier if err := initEventNotifier(fs); err != nil { t.Fatal("Unexpected error:", err) } // Check if the config is loaded notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucketName) if notificationCfg == nil { t.Fatal("Cannot load bucket notification config") } if len(notificationCfg.TopicConfigs) != 1 || len(notificationCfg.QueueConfigs) != 1 { t.Fatal("Notification config is not correctly loaded. Exactly one topic and one queue config are expected") } // Check if topic ARN is enabled if notificationCfg.TopicConfigs[0].TopicARN != listenARN { t.Fatal("SNS listen is not configured.") } // Create a new notification event channel. nEventCh := make(chan []NotificationEvent) // Close the listener channel. defer close(nEventCh) // Set sns target. globalEventNotifier.SetSNSTarget(listenARN, nEventCh) // Remove sns listener after the writer has closed or the client disconnected. defer globalEventNotifier.RemoveSNSTarget(listenARN, nEventCh) // Fire an event notification go eventNotify(eventData{ Type: ObjectRemovedDelete, Bucket: bucketName, ObjInfo: ObjectInfo{ Bucket: bucketName, Name: objectName, }, ReqParams: map[string]string{ "sourceIPAddress": "localhost:1337", }, }) // Wait for the event notification here, if nothing is received within 30 seconds, // test error will be fired select { case n := <-nEventCh: // Check that received event if len(n) == 0 { t.Fatal("Unexpected error occurred") } if n[0].S3.Object.Key != objectName { t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName) } break case <-time.After(30 * time.Second): break } } func testAddTopicConfig(obj ObjectLayer, instanceType string, t TestErrHandler) { root, cErr := newTestConfig("us-east-1") if cErr != nil { t.Fatalf("[%s] Failed to initialize test config: %v", instanceType, cErr) } defer removeAll(root) if err := initEventNotifier(obj); err != nil { t.Fatalf("[%s] : Failed to initialize event notifier: %v", instanceType, err) } // Make a bucket to store topicConfigs. randBucket := getRandomBucketName() if err := obj.MakeBucket(randBucket); err != nil { t.Fatalf("[%s] : Failed to make bucket %s", instanceType, randBucket) } // Add a topicConfig to an empty notificationConfig. accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano()) accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen" var filterRules []filterRule filterRules = append(filterRules, filterRule{ Name: "prefix", Value: "minio", }) filterRules = append(filterRules, filterRule{ Name: "suffix", Value: "*.jpg", }) // Make topic configuration corresponding to this ListenBucketNotification request. sampleTopicCfg := &topicConfig{ TopicARN: accountARN, serviceConfig: serviceConfig{ Filter: struct { Key keyFilter `xml:"S3Key,omitempty"` }{ Key: keyFilter{ FilterRules: filterRules, }, }, ID: "sns-" + accountID, }, } testCases := []struct { topicCfg *topicConfig expectedErr error }{ {sampleTopicCfg, nil}, {nil, errInvalidArgument}, {sampleTopicCfg, nil}, } for i, test := range testCases { err := globalEventNotifier.AddTopicConfig(randBucket, test.topicCfg) if err != test.expectedErr { t.Errorf("Test %d: %s failed with error %v, expected to fail with %v", i+1, instanceType, err, test.expectedErr) } } } func TestAddTopicConfig(t *testing.T) { ExecObjectLayerTest(t, testAddTopicConfig) }