fix: remove unnecessary limit for queueStore (#12491)

There is no good reason to limit ourselves
to max_open_fd for queue_store

Bonus: Support for publisher confirms
This commit is contained in:
Harshavardhana 2021-06-14 13:28:44 -07:00 committed by GitHub
parent 0d1fb10940
commit ed6cc66cf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 55 deletions

View file

@ -506,7 +506,6 @@ func (c Config) Merge() Config {
rnSubSys, ok := renamedSubsys[subSys]
if !ok {
// A config subsystem was removed or server was downgraded.
Logger.Info("config: ignoring unknown subsystem config %q\n", subSys)
continue
}
// Copy over settings from previous sub-system

View file

@ -140,6 +140,12 @@ var (
Optional: true,
Type: "number",
},
config.HelpKV{
Key: target.AmqpPublisherConfirms,
Description: "enable consumer acknowlegement and publisher confirms, use this along with queue_dir for guaranteed delivery of all events",
Optional: true,
Type: "on|off",
},
config.HelpKV{
Key: target.AmqpQueueDir,
Description: queueDirComment,

View file

@ -1701,6 +1701,10 @@ var (
Key: target.AmqpDeliveryMode,
Value: "0",
},
config.KV{
Key: target.AmqpPublisherConfirms,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpQueueLimit,
Value: "0",
@ -1779,6 +1783,10 @@ func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, e
if k != config.Default {
autoDeletedEnv = autoDeletedEnv + config.Default + k
}
publisherConfirmsEnv := target.EnvAMQPPublisherConfirms
if k != config.Default {
publisherConfirmsEnv = publisherConfirmsEnv + config.Default + k
}
queueDirEnv := target.EnvAMQPQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
@ -1792,20 +1800,21 @@ func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, e
return nil, err
}
amqpArgs := target.AMQPArgs{
Enable: enabled,
URL: *url,
Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)),
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
DeliveryMode: uint8(deliveryMode),
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
QueueLimit: queueLimit,
Enable: enabled,
URL: *url,
Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)),
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
DeliveryMode: uint8(deliveryMode),
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
PublisherConfirms: env.Get(publisherConfirmsEnv, kv.Get(target.AmqpPublisherConfirms)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
QueueLimit: queueLimit,
}
if err = amqpArgs.Validate(); err != nil {
return nil, err

View file

@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"os"
@ -34,20 +35,21 @@ import (
// AMQPArgs - AMQP target arguments.
type AMQPArgs struct {
Enable bool `json:"enable"`
URL xnet.URL `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routingKey"`
ExchangeType string `json:"exchangeType"`
DeliveryMode uint8 `json:"deliveryMode"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Enable bool `json:"enable"`
URL xnet.URL `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routingKey"`
ExchangeType string `json:"exchangeType"`
DeliveryMode uint8 `json:"deliveryMode"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
PublisherConfirms bool `json:"publisherConfirms"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
//lint:file-ignore ST1003 We cannot change these exported names.
@ -69,7 +71,7 @@ const (
AmqpNoWait = "no_wait"
AmqpAutoDeleted = "auto_deleted"
AmqpArguments = "arguments"
AmqpPublishingHeaders = "publishing_headers"
AmqpPublisherConfirms = "publisher_confirms"
EnvAMQPEnable = "MINIO_NOTIFY_AMQP_ENABLE"
EnvAMQPURL = "MINIO_NOTIFY_AMQP_URL"
@ -84,7 +86,7 @@ const (
EnvAMQPNoWait = "MINIO_NOTIFY_AMQP_NO_WAIT"
EnvAMQPAutoDeleted = "MINIO_NOTIFY_AMQP_AUTO_DELETED"
EnvAMQPArguments = "MINIO_NOTIFY_AMQP_ARGUMENTS"
EnvAMQPPublishingHeaders = "MINIO_NOTIFY_AMQP_PUBLISHING_HEADERS"
EnvAMQPPublisherConfirms = "MINIO_NOTIFY_AMQP_PUBLISHING_CONFIRMS"
EnvAMQPQueueDir = "MINIO_NOTIFY_AMQP_QUEUE_DIR"
EnvAMQPQueueLimit = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT"
)
@ -123,7 +125,7 @@ func (target *AMQPTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *AMQPTarget) IsActive() (bool, error) {
ch, err := target.channel()
ch, _, err := target.channel()
if err != nil {
return false, err
}
@ -138,7 +140,7 @@ func (target *AMQPTarget) HasQueueStore() bool {
return target.store != nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, error) {
func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, error) {
var err error
var conn *amqp.Connection
var ch *amqp.Channel
@ -161,34 +163,54 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) {
if target.conn != nil {
ch, err = target.conn.Channel()
if err == nil {
return ch, nil
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
}
return ch, confirms, nil
}
return ch, nil, nil
}
if !isAMQPClosedErr(err) {
return nil, err
return nil, nil, err
}
// close when we know this is a network error.
target.conn.Close()
}
conn, err = amqp.Dial(target.args.URL.String())
if err != nil {
if IsConnRefusedErr(err) {
return nil, errNotConnected
return nil, nil, errNotConnected
}
return nil, err
return nil, nil, err
}
ch, err = conn.Channel()
if err != nil {
return nil, err
return nil, nil, err
}
target.conn = conn
return ch, nil
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
}
return ch, confirms, nil
}
return ch, nil, nil
}
// send - sends an event to the AMQP.
func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error {
func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms chan amqp.Confirmation) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@ -205,12 +227,24 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error {
return err
}
return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{
ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode,
Body: data,
})
}); err != nil {
return err
}
// check for publisher confirms only if its enabled
if target.args.PublisherConfirms {
confirmed := <-confirms
if !confirmed.Ack {
return fmt.Errorf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
return nil
}
// Save - saves the events to the store which will be replayed when the amqp connection is active.
@ -218,7 +252,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
ch, err := target.channel()
ch, confirms, err := target.channel()
if err != nil {
return err
}
@ -227,12 +261,12 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
target.loggerOnce(context.Background(), cErr, target.ID())
}()
return target.send(eventData, ch)
return target.send(eventData, ch, confirms)
}
// Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventKey string) error {
ch, err := target.channel()
ch, confirms, err := target.channel()
if err != nil {
return err
}
@ -251,7 +285,7 @@ func (target *AMQPTarget) Send(eventKey string) error {
return eErr
}
if err := target.send(eventData, ch); err != nil {
if err := target.send(eventData, ch, confirms); err != nil {
return err
}

View file

@ -27,7 +27,6 @@ import (
"sync"
"github.com/minio/minio/internal/event"
"github.com/minio/pkg/sys"
)
const (
@ -47,14 +46,6 @@ type QueueStore struct {
func NewQueueStore(directory string, limit uint64) Store {
if limit == 0 {
limit = defaultLimit
_, maxRLimit, err := sys.GetMaxOpenFileLimit()
if err == nil {
// Limit the maximum number of entries
// to maximum open file limit
if maxRLimit < limit {
limit = maxRLimit
}
}
}
return &QueueStore{