[feat] Add configurable deadline for writers (#11822)

This PR adds deadlines per Write() calls, such
that slow drives are timed-out appropriately and
the overall responsiveness for Writes() is always
up to a predefined threshold providing applications
sustained latency even if one of the drives is slow
to respond.
This commit is contained in:
Harshavardhana 2021-03-18 14:09:55 -07:00 committed by GitHub
parent d46c3c07a8
commit 51a8619a79
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 191 additions and 90 deletions

View file

@ -23,8 +23,11 @@ import (
"fmt"
"hash"
"io"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
xioutil "github.com/minio/minio/pkg/ioutil"
)
type errHashMismatch struct {
@ -37,7 +40,7 @@ func (err *errHashMismatch) Error() string {
// Calculates bitrot in chunks and writes the hash into the stream.
type streamingBitrotWriter struct {
iow *io.PipeWriter
iow io.WriteCloser
h hash.Hash
shardSize int64
canClose chan struct{} // Needed to avoid race explained in Close() call.
@ -70,19 +73,28 @@ func (b *streamingBitrotWriter) Close() error {
return err
}
var (
ioDeadline, _ = time.ParseDuration(env.Get("MINIO_IO_DEADLINE", ""))
)
// Returns streaming bitrot writer implementation.
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser {
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer {
r, w := io.Pipe()
h := algo.New()
bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})}
var wc io.WriteCloser = w
if ioDeadline > 0 && !heal {
wc = xioutil.NewDeadlineWriter(w, ioDeadline)
}
bw := &streamingBitrotWriter{wc, h, shardSize, make(chan struct{})}
go func() {
totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1)
if length != -1 {
bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums.
totalFileSize = bitrotSumsTotalSize + length
}
err := disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)
r.CloseWithError(err)
r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r))
close(bw.canClose)
}()
return bw

View file

@ -96,9 +96,9 @@ func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) {
return
}
func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer {
func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer {
if algo == HighwayHash256S {
return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize)
return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize, heal)
}
return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
}

View file

@ -41,7 +41,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) {
disk.MakeVol(context.Background(), volume)
writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10)
writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10, false)
_, err = writer.Write([]byte("aaaaaaaaaa"))
if err != nil {

View file

@ -108,7 +108,8 @@ func TestErasureDecode(t *testing.T) {
buffer := make([]byte, test.blocksize, 2*test.blocksize)
writers := make([]io.Writer, len(disks))
for i, disk := range disks {
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object",
erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize(), false)
}
n, err := erasure.Encode(context.Background(), bytes.NewReader(data[:]), writers, buffer, erasure.dataBlocks+1)
closeBitrotWriters(writers)
@ -234,7 +235,8 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) {
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object",
erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
// 10000 iterations with random offsets and lengths.
@ -304,7 +306,8 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64,
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object",
erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
content := make([]byte, size)

View file

@ -108,7 +108,7 @@ func TestErasureEncode(t *testing.T) {
if disk == OfflineDisk {
continue
}
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false)
}
n, err := erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1)
closeBitrotWriters(writers)
@ -132,14 +132,14 @@ func TestErasureEncode(t *testing.T) {
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false)
}
for j := range disks[:test.offDisks] {
switch w := writers[j].(type) {
case *wholeBitrotWriter:
w.disk = badDisk{nil}
case *streamingBitrotWriter:
w.iow.CloseWithError(errFaultyDisk)
w.iow.(*io.PipeWriter).CloseWithError(errFaultyDisk)
}
}
if test.offDisks > 0 {
@ -196,7 +196,8 @@ func benchmarkErasureEncode(data, parity, dataDown, parityDown int, size int64,
continue
}
disk.Delete(context.Background(), "testbucket", "object", false)
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "object",
erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
_, err := erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1)
closeBitrotWriters(writers)

View file

@ -87,7 +87,8 @@ func TestErasureHeal(t *testing.T) {
buffer := make([]byte, test.blocksize, 2*test.blocksize)
writers := make([]io.Writer, len(disks))
for i, disk := range disks {
writers[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, "testbucket", "testobject",
erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true)
}
_, err = erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1)
closeBitrotWriters(writers)
@ -130,7 +131,8 @@ func TestErasureHeal(t *testing.T) {
continue
}
os.Remove(pathJoin(disk.String(), "testbucket", "testobject"))
staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize())
staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject",
erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true)
}
// test case setup is complete - now call Heal()

View file

@ -422,7 +422,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
continue
}
partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber))
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true)
}
err = erasure.Heal(ctx, readers, writers, partSize)
closeBitrotReaders(readers)

View file

@ -476,7 +476,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath,
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)

View file

@ -718,7 +718,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj,
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)

View file

@ -577,15 +577,6 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
// Hold the lock for migration only.
txnLk := objAPI.NewNSLock(minioMetaBucket, minioConfigPrefix+"/iam.lock")
// Initializing IAM sub-system needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
// - Write quorum not met when upgrading configuration
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
// allocate dynamic timeout once before the loop
iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
@ -620,12 +611,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
// Migrate IAM configuration, if necessary.
if err := sys.doIAMConfigMigration(ctx); err != nil {
txnLk.Unlock()
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
if configRetriableErrors(err) {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
continue
}
@ -641,12 +627,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
for {
if err := sys.store.loadAll(ctx, sys); err != nil {
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
if configRetriableErrors(err) {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue

View file

@ -237,6 +237,28 @@ func newAllSubsystems() {
globalBucketTargetSys = NewBucketTargetSys()
}
func configRetriableErrors(err error) bool {
// Initializing sub-systems needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
// - Write quorum not met when upgrading configuration
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
// One of these retriable errors shall be retried.
return errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, errErasureWriteQuorum) ||
errors.Is(err, errErasureReadQuorum) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) ||
errors.Is(err, os.ErrDeadlineExceeded)
}
func initServer(ctx context.Context, newObject ObjectLayer) error {
// Once the config is fully loaded, initialize the new object layer.
setObjectLayer(newObject)
@ -252,15 +274,6 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
// Migrating to encrypted backend should happen before initialization of any
// sub-systems, make sure that we do not move the above codeblock elsewhere.
// Initializing sub-systems needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
// - Write quorum not met when upgrading configuration
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
@ -307,15 +320,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
// One of these retriable errors shall be retried.
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, errErasureWriteQuorum) ||
errors.Is(err, errErasureReadQuorum) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
if configRetriableErrors(err) {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue
@ -333,8 +338,6 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// you want to add extra context to your error. This
// ensures top level retry works accordingly.
// List buckets to heal, and be re-used for loading configs.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
buckets, err := newObject.ListBuckets(ctx)
if err != nil {
@ -368,14 +371,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// Initialize config system.
if err = globalConfigSys.Init(newObject); err != nil {
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, errErasureWriteQuorum) ||
errors.Is(err, errErasureReadQuorum) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
if configRetriableErrors(err) {
return fmt.Errorf("Unable to initialize config system: %w", err)
}
// Any other config errors we simply print a message and proceed forward.

View file

@ -62,7 +62,7 @@ const (
// Detects change in underlying disk.
type xlStorageDiskIDCheck struct {
storage *xlStorage
storage StorageAPI
diskID string
apiCalls [metricLast]uint64

View file

@ -71,6 +71,13 @@ const (
xlStorageFormatFile = "xl.meta"
)
var alignedBuf []byte
func init() {
alignedBuf = disk.AlignedBlock(4096)
_, _ = rand.Read(alignedBuf)
}
// isValidVolname verifies a volname name in accordance with object
// layer requirements.
func isValidVolname(volname string) bool {
@ -282,10 +289,17 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
var rnd [8]byte
_, _ = rand.Read(rnd[:])
tmpFile := ".writable-check-" + hex.EncodeToString(rnd[:]) + ".tmp"
if err = p.CreateFile(GlobalContext, minioMetaTmpBucket, tmpFile, 1, strings.NewReader("0")); err != nil {
filePath := pathJoin(p.diskPath, minioMetaTmpBucket, tmpFile)
w, err := disk.OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666)
if err != nil {
return p, err
}
defer os.Remove(pathJoin(p.diskPath, minioMetaTmpBucket, tmpFile))
if _, err = w.Write(alignedBuf[:]); err != nil {
w.Close()
return p, err
}
w.Close()
defer os.Remove(filePath)
volumeDir, err := p.getVolDir(minioMetaTmpBucket)
if err != nil {
@ -294,7 +308,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
// Check if backend is readable, and optionally supports O_DIRECT.
if _, err = p.readAllData(volumeDir, pathJoin(volumeDir, tmpFile), true); err != nil {
if err != errUnsupportedDisk {
if !errors.Is(err, errUnsupportedDisk) {
return p, err
}
// error is unsupported disk, turn-off directIO for reads
@ -1432,7 +1446,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
return errInvalidArgument
}
if fileSize <= smallFileThreshold {
if fileSize >= 0 && fileSize <= smallFileThreshold {
// For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync)
// and not O_DIRECT to avoid the complexities of aligned I/O.
w, err := s.openFile(volume, path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
@ -1483,9 +1497,9 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
return err
}
if written < fileSize {
if written < fileSize && fileSize >= 0 {
return errLessData
} else if written > fileSize {
} else if written > fileSize && fileSize >= 0 {
return errMoreData
}

View file

@ -1730,7 +1730,7 @@ func TestXLStorageVerifyFile(t *testing.T) {
// 4) Streaming bitrot check on corrupted file
// create xlStorage test setup
xlStorage, path, err := newXLStorageTestSetup()
storage, path, err := newXLStorageTestSetup()
if err != nil {
t.Fatalf("Unable to create xlStorage test setup, %s", err)
}
@ -1738,7 +1738,7 @@ func TestXLStorageVerifyFile(t *testing.T) {
volName := "testvol"
fileName := "testfile"
if err := xlStorage.MakeVol(context.Background(), volName); err != nil {
if err := storage.MakeVol(context.Background(), volName); err != nil {
t.Fatal(err)
}
@ -1752,29 +1752,29 @@ func TestXLStorageVerifyFile(t *testing.T) {
h := algo.New()
h.Write(data)
hashBytes := h.Sum(nil)
if err := xlStorage.WriteAll(context.Background(), volName, fileName, data); err != nil {
if err := storage.WriteAll(context.Background(), volName, fileName, data); err != nil {
t.Fatal(err)
}
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil {
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil {
t.Fatal(err)
}
// 2) Whole-file bitrot check on corrupted file
if err := xlStorage.AppendFile(context.Background(), volName, fileName, []byte("a")); err != nil {
if err := storage.AppendFile(context.Background(), volName, fileName, []byte("a")); err != nil {
t.Fatal(err)
}
// Check if VerifyFile reports the incorrect file length (the correct length is `size+1`)
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil {
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check")
}
// Check if bitrot fails
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil {
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check")
}
if err := xlStorage.Delete(context.Background(), volName, fileName, false); err != nil {
if err := storage.Delete(context.Background(), volName, fileName, false); err != nil {
t.Fatal(err)
}
@ -1782,7 +1782,7 @@ func TestXLStorageVerifyFile(t *testing.T) {
algo = HighwayHash256S
shardSize := int64(1024 * 1024)
shard := make([]byte, shardSize)
w := newStreamingBitrotWriter(xlStorage, volName, fileName, size, algo, shardSize)
w := newStreamingBitrotWriter(storage, volName, fileName, size, algo, shardSize, false)
reader := bytes.NewReader(data)
for {
// Using io.Copy instead of this loop will not work for us as io.Copy
@ -1798,13 +1798,13 @@ func TestXLStorageVerifyFile(t *testing.T) {
}
t.Fatal(err)
}
w.Close()
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil {
w.(io.Closer).Close()
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil {
t.Fatal(err)
}
// 4) Streaming bitrot check on corrupted file
filePath := pathJoin(xlStorage.String(), volName, fileName)
filePath := pathJoin(storage.String(), volName, fileName)
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
t.Fatal(err)
@ -1814,10 +1814,10 @@ func TestXLStorageVerifyFile(t *testing.T) {
t.Fatal(err)
}
f.Close()
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil {
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check")
}
if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil {
if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check")
}
}

View file

@ -20,8 +20,10 @@ package ioutil
import (
"bytes"
"context"
"io"
"os"
"time"
"github.com/minio/minio/pkg/disk"
)
@ -64,6 +66,56 @@ func WriteOnClose(w io.Writer) *WriteOnCloser {
return &WriteOnCloser{w, false}
}
type ioret struct {
n int
err error
}
// DeadlineWriter deadline writer with context
type DeadlineWriter struct {
io.WriteCloser
timeout time.Duration
err error
}
// NewDeadlineWriter wraps a writer to make it respect given deadline
// value per Write(). If there is a blocking write, the returned Writer
// will return whenever the timer hits (the return values are n=0
// and err=context.Canceled.)
func NewDeadlineWriter(w io.WriteCloser, timeout time.Duration) io.WriteCloser {
return &DeadlineWriter{WriteCloser: w, timeout: timeout}
}
func (w *DeadlineWriter) Write(buf []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
c := make(chan ioret, 1)
t := time.NewTimer(w.timeout)
defer t.Stop()
go func() {
n, err := w.WriteCloser.Write(buf)
c <- ioret{n, err}
close(c)
}()
select {
case r := <-c:
w.err = r.err
return r.n, r.err
case <-t.C:
w.err = context.Canceled
return 0, context.Canceled
}
}
// Close closer interface to close the underlying closer
func (w *DeadlineWriter) Close() error {
return w.WriteCloser.Close()
}
// LimitWriter implements io.WriteCloser.
//
// This is implemented such that we want to restrict

View file

@ -18,12 +18,49 @@ package ioutil
import (
"bytes"
"context"
"io"
goioutil "io/ioutil"
"os"
"testing"
"time"
)
type sleepWriter struct {
timeout time.Duration
}
func (w *sleepWriter) Write(p []byte) (n int, err error) {
time.Sleep(w.timeout)
return len(p), nil
}
func (w *sleepWriter) Close() error {
return nil
}
func TestDeadlineWriter(t *testing.T) {
w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond)
_, err := w.Write([]byte("1"))
w.Close()
if err != context.Canceled {
t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled")
}
_, err = w.Write([]byte("1"))
if err != context.Canceled {
t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled")
}
w = NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 600*time.Millisecond)
n, err := w.Write([]byte("abcd"))
w.Close()
if err != nil {
t.Errorf("DeadlineWriter should succeed but failed with %s", err)
}
if n != 4 {
t.Errorf("DeadlineWriter should succeed but should have only written 0 bytes, but returned %d instead", n)
}
}
func TestCloseOnWriter(t *testing.T) {
writer := WriteOnClose(goioutil.Discard)
if writer.HasWritten() {