minio/vendor/gopkg.in/Shopify/sarama.v1/produce_request.go

157 lines
3.6 KiB
Go

package sarama
// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
// it must see before responding. Any of the constants defined here are valid. On broker versions
// prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
// acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
// by setting the `min.isr` value in the brokers configuration).
type RequiredAcks int16
const (
// NoResponse doesn't send any response, the TCP ACK is all you get.
NoResponse RequiredAcks = 0
// WaitForLocal waits for only the local commit to succeed before responding.
WaitForLocal RequiredAcks = 1
// WaitForAll waits for all replicas to commit before responding.
WaitForAll RequiredAcks = -1
)
type ProduceRequest struct {
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
msgSets map[string]map[int32]*MessageSet
}
func (r *ProduceRequest) encode(pe packetEncoder) error {
pe.putInt16(int16(r.RequiredAcks))
pe.putInt32(r.Timeout)
err := pe.putArrayLength(len(r.msgSets))
if err != nil {
return err
}
for topic, partitions := range r.msgSets {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
return err
}
for id, msgSet := range partitions {
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
if err != nil {
return err
}
err = pe.pop()
if err != nil {
return err
}
}
}
return nil
}
func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
requiredAcks, err := pd.getInt16()
if err != nil {
return err
}
r.RequiredAcks = RequiredAcks(requiredAcks)
if r.Timeout, err = pd.getInt32(); err != nil {
return err
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
}
if topicCount == 0 {
return nil
}
r.msgSets = make(map[string]map[int32]*MessageSet)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.msgSets[topic] = make(map[int32]*MessageSet)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
messageSetSize, err := pd.getInt32()
if err != nil {
return err
}
msgSetDecoder, err := pd.getSubset(int(messageSetSize))
if err != nil {
return err
}
msgSet := &MessageSet{}
err = msgSet.decode(msgSetDecoder)
if err != nil {
return err
}
r.msgSets[topic][partition] = msgSet
}
}
return nil
}
func (r *ProduceRequest) key() int16 {
return 0
}
func (r *ProduceRequest) version() int16 {
return r.Version
}
func (r *ProduceRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
}
if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
}
set := r.msgSets[topic][partition]
if set == nil {
set = new(MessageSet)
r.msgSets[topic][partition] = set
}
set.addMessage(msg)
}
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
}
if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
}
r.msgSets[topic][partition] = set
}