Rewrite cache implementation to cache only on GET (#7694)

Fixes #7458
Fixes #7573 
Fixes #7938 
Fixes #6934
Fixes #6265 
Fixes #6630 

This will allow the cache to consistently work for
server and gateways. Range GET requests will
be cached in the background after the request
is served from the backend.

- All cached content is automatically bitrot protected.

- Avoid ETag verification if a cache-control header
is set and the cached content is still valid.

- This PR changes the cache backend format, and all existing
content will be migrated to the new format. Until the data is
migrated completely, all content will be served from the backend.
This commit is contained in:
poornas 2019-08-09 17:09:08 -07:00 committed by Harshavardhana
parent 1ce8d2c476
commit 3385bf3da8
15 changed files with 1423 additions and 1686 deletions

View file

@ -93,9 +93,7 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
}
listObjectsV2 := objectAPI.ListObjectsV2
if api.CacheAPI() != nil {
listObjectsV2 = api.CacheAPI().ListObjectsV2
}
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
@ -172,9 +170,6 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
}
listObjects := objectAPI.ListObjects
if api.CacheAPI() != nil {
listObjects = api.CacheAPI().ListObjects
}
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsInfo object to be

View file

@ -106,9 +106,7 @@ func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r *
}
getBucketInfo := objectAPI.GetBucketInfo
if api.CacheAPI() != nil {
getBucketInfo = api.CacheAPI().GetBucketInfo
}
if _, err := getBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@ -203,9 +201,6 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
}
listBuckets := objectAPI.ListBuckets
if api.CacheAPI() != nil {
listBuckets = api.CacheAPI().ListBuckets
}
if s3Error := checkRequestAuthType(ctx, r, policy.ListAllMyBucketsAction, "", ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
@ -747,9 +742,7 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
}
getBucketInfo := objectAPI.GetBucketInfo
if api.CacheAPI() != nil {
getBucketInfo = api.CacheAPI().GetBucketInfo
}
if _, err := getBucketInfo(ctx, bucket); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
@ -779,9 +772,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
deleteBucket := objectAPI.DeleteBucket
if api.CacheAPI() != nil {
deleteBucket = api.CacheAPI().DeleteBucket
}
// Attempt to delete bucket.
if err := deleteBucket(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))

573
cmd/disk-cache-backend.go Normal file
View file

@ -0,0 +1,573 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"reflect"
"sync"
"time"
"github.com/djherbis/atime"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk"
"github.com/ncw/directio"
)
const (
// cache.json object metadata for cached objects.
cacheMetaJSONFile = "cache.json"
cacheDataFile = "part.1"
cacheMetaVersion = "1.0.0"
cacheEnvDelimiter = ";"
)
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
type CacheChecksumInfoV1 struct {
Algorithm string `json:"algorithm"`
Blocksize int64 `json:"blocksize"`
}
// Represents the cache metadata struct
type cacheMeta struct {
Version string `json:"version"`
Stat statInfo `json:"stat"` // Stat of the current object `cache.json`.
// checksums of blocks on disk.
Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"`
// Metadata map for current object.
Meta map[string]string `json:"meta,omitempty"`
}
func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
if len(m.Meta) == 0 {
m.Meta = make(map[string]string)
m.Stat.ModTime = timeSentinel
}
o = ObjectInfo{
Bucket: bucket,
Name: object,
}
// We set file info only if its valid.
o.ModTime = m.Stat.ModTime
o.Size = m.Stat.Size
o.ETag = extractETag(m.Meta)
o.ContentType = m.Meta["content-type"]
o.ContentEncoding = m.Meta["content-encoding"]
if storageClass, ok := m.Meta[amzStorageClass]; ok {
o.StorageClass = storageClass
} else {
o.StorageClass = globalMinioDefaultStorageClass
}
var (
t time.Time
e error
)
if exp, ok := m.Meta["expires"]; ok {
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
o.Expires = t.UTC()
}
}
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of user-defined metadata
o.UserDefined = cleanMetadata(m.Meta)
return o
}
// represents disk cache struct
type diskCache struct {
dir string // caching directory
maxDiskUsagePct int // max usage in %
expiry int // cache expiry in days
// mark false if drive is offline
online bool
// mutex to protect updates to online variable
onlineMutex *sync.RWMutex
// purge() listens on this channel to start the cache-purge process
purgeChan chan struct{}
pool sync.Pool
}
// Inits the disk cache dir if it is not initialized already.
func newdiskCache(dir string, expiry int, maxDiskUsagePct int) (*diskCache, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, fmt.Errorf("Unable to initialize '%s' dir, %s", dir, err)
}
if expiry == 0 {
expiry = globalCacheExpiry
}
cache := diskCache{
dir: dir,
expiry: expiry,
maxDiskUsagePct: maxDiskUsagePct,
purgeChan: make(chan struct{}),
online: true,
onlineMutex: &sync.RWMutex{},
pool: sync.Pool{
New: func() interface{} {
b := directio.AlignedBlock(int(cacheBlkSize))
return &b
},
},
}
return &cache, nil
}
// Returns if the disk usage is low.
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
func (c *diskCache) diskUsageLow() bool {
minUsage := c.maxDiskUsagePct * 80 / 100
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return false
}
usedPercent := (di.Total - di.Free) * 100 / di.Total
return int(usedPercent) < minUsage
}
// Return if the disk usage is high.
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
func (c *diskCache) diskUsageHigh() bool {
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return true
}
usedPercent := (di.Total - di.Free) * 100 / di.Total
return int(usedPercent) > c.maxDiskUsagePct
}
// Returns if size space can be allocated without exceeding
// max disk usable for caching
func (c *diskCache) diskAvailable(size int64) bool {
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return false
}
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
return int(usedPercent) < c.maxDiskUsagePct
}
// Purge cache entries that were not accessed.
func (c *diskCache) purge() {
ctx := context.Background()
for {
olderThan := c.expiry
for !c.diskUsageLow() {
// delete unaccessed objects older than expiry duration
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
olderThan /= 2
if olderThan < 1 {
break
}
deletedCount := 0
objDirs, err := ioutil.ReadDir(c.dir)
if err != nil {
log.Fatal(err)
}
for _, obj := range objDirs {
if obj.Name() == minioMetaBucket {
continue
}
// stat entry to get atime
var fi os.FileInfo
fi, err := os.Stat(pathJoin(c.dir, obj.Name(), cacheDataFile))
if err != nil {
continue
}
objInfo, err := c.statCache(ctx, pathJoin(c.dir, obj.Name()))
if err != nil {
// delete any partially filled cache entry left behind.
removeAll(pathJoin(c.dir, obj.Name()))
continue
}
cc := cacheControlOpts(objInfo)
if atime.Get(fi).Before(expiry) ||
cc.isStale(objInfo.ModTime) {
if err = removeAll(pathJoin(c.dir, obj.Name())); err != nil {
logger.LogIf(ctx, err)
}
deletedCount++
// break early if sufficient disk space reclaimed.
if !c.diskUsageLow() {
break
}
}
}
if deletedCount == 0 {
break
}
}
lastRunTime := time.Now()
for {
<-c.purgeChan
timeElapsed := time.Since(lastRunTime)
if timeElapsed > time.Hour {
break
}
}
}
}
// sets cache drive status
func (c *diskCache) setOnline(status bool) {
c.onlineMutex.Lock()
c.online = status
c.onlineMutex.Unlock()
}
// returns true if cache drive is online
func (c *diskCache) IsOnline() bool {
c.onlineMutex.RLock()
defer c.onlineMutex.RUnlock()
return c.online
}
// Stat returns ObjectInfo from disk cache
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
oi, err = c.statCache(ctx, cacheObjPath)
if err != nil {
return
}
oi.Bucket = bucket
oi.Name = object
return
}
// statCache is a convenience function for purge() to get ObjectInfo for cached object
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (oi ObjectInfo, e error) {
// Stat the file to get file size.
metaPath := path.Join(cacheObjPath, cacheMetaJSONFile)
f, err := os.Open(metaPath)
if err != nil {
return oi, err
}
defer f.Close()
meta := &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return oi, err
}
fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile))
if err != nil {
return oi, err
}
meta.Stat.ModTime = atime.Get(fi)
return meta.ToObjectInfo("", ""), nil
}
// saves object metadata to disk cache
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64) error {
fileName := getCacheSHADir(c.dir, bucket, object)
metaPath := pathJoin(fileName, cacheMetaJSONFile)
f, err := os.Create(metaPath)
if err != nil {
return err
}
defer f.Close()
m := cacheMeta{Meta: meta, Version: cacheMetaVersion}
m.Stat.Size = actualSize
m.Stat.ModTime = UTCNow()
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
jsonData, err := json.Marshal(m)
if err != nil {
return err
}
_, err = f.Write(jsonData)
return err
}
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error {
if !reflect.DeepEqual(bkObjectInfo.UserDefined, cacheObjInfo.UserDefined) ||
bkObjectInfo.ETag != cacheObjInfo.ETag ||
bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
bkObjectInfo.Expires != cacheObjInfo.Expires {
return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size)
}
return nil
}
func getCacheSHADir(dir, bucket, object string) string {
return path.Join(dir, getSHA256Hash([]byte(path.Join(bucket, object))))
}
// Cache data to disk with bitrot checksum added for each block of 1MB
func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, reader io.Reader, size int64) (int64, error) {
if err := os.MkdirAll(cachePath, 0777); err != nil {
return 0, err
}
bufSize := int64(readSizeV1)
if size > 0 && bufSize > size {
bufSize = size
}
filePath := path.Join(cachePath, cacheDataFile)
if filePath == "" || reader == nil {
return 0, errInvalidArgument
}
if err := checkPathLength(filePath); err != nil {
return 0, err
}
f, err := os.Create(filePath)
if err != nil {
return 0, osErrToFSFileErr(err)
}
defer f.Close()
var bytesWritten int64
h := HighwayHash256S.New()
bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp)
for {
n, err := io.ReadFull(reader, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe {
return 0, err
}
eof := err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe
if n == 0 && size != 0 {
// Reached EOF, nothing more to be done.
break
}
h.Reset()
if _, err := h.Write((*bufp)[:n]); err != nil {
return 0, err
}
hashBytes := h.Sum(nil)
if _, err = f.Write(hashBytes); err != nil {
return 0, err
}
if _, err = f.Write((*bufp)[:n]); err != nil {
return 0, err
}
bytesWritten += int64(n)
if eof {
break
}
}
return bytesWritten, nil
}
// Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
if c.diskUsageHigh() {
select {
case c.purgeChan <- struct{}{}:
default:
}
return errDiskFull
}
if !c.diskAvailable(size) {
return errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
if err := os.MkdirAll(cachePath, 0777); err != nil {
return err
}
bufSize := int64(readSizeV1)
if size > 0 && bufSize > size {
bufSize = size
}
n, err := c.bitrotWriteToCache(ctx, cachePath, data, size)
if IsErr(err, baseErrs...) {
c.setOnline(false)
}
if err != nil {
return err
}
return c.saveMetadata(ctx, bucket, object, opts.UserDefined, n)
}
// checks streaming bitrot checksum of cached object before returning data
func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, offset, length int64, writer io.Writer) error {
h := HighwayHash256S.New()
checksumHash := make([]byte, h.Size())
startBlock := offset / cacheBlkSize
endBlock := (offset + length) / cacheBlkSize
// get block start offset
var blockStartOffset int64
if startBlock > 0 {
blockStartOffset = (cacheBlkSize + int64(h.Size())) * startBlock
}
tillLength := (cacheBlkSize + int64(h.Size())) * (endBlock - startBlock + 1)
// Start offset cannot be negative.
if offset < 0 {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
// Writer cannot be nil.
if writer == nil {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
var blockOffset, blockLength int64
rc, err := readCacheFileStream(filePath, blockStartOffset, tillLength)
if err != nil {
return err
}
bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp)
for block := startBlock; block <= endBlock; block++ {
switch {
case startBlock == endBlock:
blockOffset = offset % cacheBlkSize
blockLength = length
case block == startBlock:
blockOffset = offset % cacheBlkSize
blockLength = cacheBlkSize - blockOffset
case block == endBlock:
blockOffset = 0
blockLength = (offset + length) % cacheBlkSize
default:
blockOffset = 0
blockLength = cacheBlkSize
}
if blockLength == 0 {
break
}
if _, err := io.ReadFull(rc, checksumHash); err != nil {
return err
}
h.Reset()
n, err := io.ReadFull(rc, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
logger.LogIf(ctx, err)
return err
}
eof := err == io.EOF || err == io.ErrUnexpectedEOF
if n == 0 && length != 0 {
// Reached EOF, nothing more to be done.
break
}
if _, e := h.Write((*bufp)[:n]); e != nil {
return e
}
hashBytes := h.Sum(nil)
if !bytes.Equal(hashBytes, checksumHash) {
err = HashMismatchError{hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes)}
logger.LogIf(context.Background(), err)
return err
}
if _, err := io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
if err != io.ErrClosedPipe {
logger.LogIf(ctx, err)
}
return err
}
if eof {
break
}
}
return nil
}
// Get returns ObjectInfo and reader for object from disk cache
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
var objInfo ObjectInfo
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
if objInfo, err = c.statCache(ctx, cacheObjPath); err != nil {
return nil, toObjectErr(err, bucket, object)
}
var nsUnlocker = func() {}
// For a directory, we need to send an reader that returns no bytes.
if hasSuffix(object, SlashSeparator) {
// The lock taken above is released when
// objReader.Close() is called by the caller.
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
}
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
if nErr != nil {
return nil, nErr
}
filePath := path.Join(cacheObjPath, cacheDataFile)
pr, pw := io.Pipe()
go func() {
pw.CloseWithError(c.bitrotReadFromCache(ctx, filePath, off, length, pw))
}()
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func() { pr.Close() }
return fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser)
}
// Deletes the cached object
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cachePath := getCacheSHADir(c.dir, bucket, object)
return removeAll(cachePath)
}
// convenience function to check if object is cached on this diskCache
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
return false
}
return true
}

View file

@ -1,537 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/lock"
)
const (
// cache.json object metadata for cached objects.
cacheMetaJSONFile = "cache.json"
cacheEnvDelimiter = ";"
)
// cacheFSObjects implements the cache backend operations.
type cacheFSObjects struct {
*FSObjects
// caching drive path (from cache "drives" in config.json)
dir string
// expiry in days specified in config.json
expiry int
// max disk usage pct
maxDiskUsagePct int
// purge() listens on this channel to start the cache-purge process
purgeChan chan struct{}
// mark false if drive is offline
online bool
// mutex to protect updates to online variable
onlineMutex *sync.RWMutex
}
// Inits the cache directory if it is not init'ed already.
// Initializing implies creation of new FS Object layer.
func newCacheFSObjects(dir string, expiry int, maxDiskUsagePct int) (*cacheFSObjects, error) {
// Assign a new UUID for FS minio mode. Each server instance
// gets its own UUID for temporary file transaction.
fsUUID := mustGetUUID()
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolumeFS(dir, fsUUID); err != nil {
return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
trashPath := pathJoin(dir, minioMetaBucket, cacheTrashDir)
if err := os.MkdirAll(trashPath, 0777); err != nil {
return nil, err
}
if expiry == 0 {
expiry = globalCacheExpiry
}
// Initialize fs objects.
fsObjects := &FSObjects{
fsPath: dir,
metaJSONFile: cacheMetaJSONFile,
fsUUID: fsUUID,
rwPool: &fsIOPool{
readersMap: make(map[string]*lock.RLockedFile),
},
nsMutex: newNSLock(false),
listPool: NewTreeWalkPool(globalLookupTimeout),
appendFileMap: make(map[string]*fsAppendFile),
}
go fsObjects.cleanupStaleMultipartUploads(context.Background(), GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
cacheFS := cacheFSObjects{
FSObjects: fsObjects,
dir: dir,
expiry: expiry,
maxDiskUsagePct: maxDiskUsagePct,
purgeChan: make(chan struct{}),
online: true,
onlineMutex: &sync.RWMutex{},
}
return &cacheFS, nil
}
// Returns if the disk usage is low.
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
func (cfs *cacheFSObjects) diskUsageLow() bool {
minUsage := cfs.maxDiskUsagePct * 80 / 100
di, err := disk.GetInfo(cfs.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return false
}
usedPercent := (di.Total - di.Free) * 100 / di.Total
return int(usedPercent) < minUsage
}
// Return if the disk usage is high.
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
func (cfs *cacheFSObjects) diskUsageHigh() bool {
di, err := disk.GetInfo(cfs.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return true
}
usedPercent := (di.Total - di.Free) * 100 / di.Total
return int(usedPercent) > cfs.maxDiskUsagePct
}
// Returns if size space can be allocated without exceeding
// max disk usable for caching
func (cfs *cacheFSObjects) diskAvailable(size int64) bool {
di, err := disk.GetInfo(cfs.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
return false
}
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
return int(usedPercent) < cfs.maxDiskUsagePct
}
// purges all content marked trash from the cache.
func (cfs *cacheFSObjects) purgeTrash() {
ticker := time.NewTicker(time.Minute * cacheCleanupInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir)
entries, err := readDir(trashPath)
if err != nil {
return
}
for _, entry := range entries {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
fi, err := fsStatVolume(ctx, pathJoin(trashPath, entry))
if err != nil {
continue
}
dir := path.Join(trashPath, fi.Name())
// Delete all expired cache content.
fsRemoveAll(ctx, dir)
}
}
}
}
// Purge cache entries that were not accessed.
func (cfs *cacheFSObjects) purge() {
delimiter := SlashSeparator
maxKeys := 1000
ctx := context.Background()
for {
olderThan := cfs.expiry
for !cfs.diskUsageLow() {
// delete unaccessed objects older than expiry duration
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
olderThan /= 2
if olderThan < 1 {
break
}
deletedCount := 0
buckets, err := cfs.ListBuckets(ctx)
if err != nil {
logger.LogIf(ctx, err)
}
// Reset cache online status if drive was offline earlier.
if !cfs.IsOnline() {
cfs.setOnline(true)
}
for _, bucket := range buckets {
var continuationToken string
var marker string
for {
objects, err := cfs.ListObjects(ctx, bucket.Name, marker, continuationToken, delimiter, maxKeys)
if err != nil {
break
}
if !objects.IsTruncated {
break
}
marker = objects.NextMarker
for _, object := range objects.Objects {
// purge objects that qualify because of cache-control directives or
// past cache expiry duration.
if !filterFromCache(object.UserDefined) ||
!isStaleCache(object) ||
object.AccTime.After(expiry) {
continue
}
if err = cfs.DeleteObject(ctx, bucket.Name, object.Name); err != nil {
logger.LogIf(ctx, err)
continue
}
deletedCount++
}
}
}
if deletedCount == 0 {
// to avoid a busy loop
time.Sleep(time.Minute * 30)
}
}
<-cfs.purgeChan
}
}
// sets cache drive status
func (cfs *cacheFSObjects) setOnline(status bool) {
cfs.onlineMutex.Lock()
cfs.online = status
cfs.onlineMutex.Unlock()
}
// returns true if cache drive is online
func (cfs *cacheFSObjects) IsOnline() bool {
cfs.onlineMutex.RLock()
defer cfs.onlineMutex.RUnlock()
return cfs.online
}
// Caches the object to disk
func (cfs *cacheFSObjects) Put(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) error {
if cfs.diskUsageHigh() {
select {
case cfs.purgeChan <- struct{}{}:
default:
}
return errDiskFull
}
if !cfs.diskAvailable(data.Size()) {
return errDiskFull
}
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
if pErr != nil {
return pErr
}
}
_, err := cfs.PutObject(ctx, bucket, object, data, opts)
// if err is due to disk being offline , mark cache drive as offline
if IsErr(err, baseErrs...) {
cfs.setOnline(false)
}
return err
}
// Returns the handle for the cached object
func (cfs *cacheFSObjects) Get(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
return cfs.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
// Deletes the cached object
func (cfs *cacheFSObjects) Delete(ctx context.Context, bucket, object string) (err error) {
return cfs.DeleteObject(ctx, bucket, object)
}
// convenience function to check if object is cached on this cacheFSObjects
func (cfs *cacheFSObjects) Exists(ctx context.Context, bucket, object string) bool {
_, err := cfs.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
return err == nil
}
// Identical to fs PutObject operation except that it uses ETag in metadata
// headers.
func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) {
data := r.Reader
fs := cfs.FSObjects
// Lock the object.
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectLock.Unlock()
// No metadata is set, allocate a new one.
meta := make(map[string]string)
for k, v := range opts.UserDefined {
meta[k] = v
}
var err error
// Validate if bucket name is valid and exists.
if _, err = fs.statBucketDir(ctx, bucket); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket)
}
fsMeta := newFSMetaV1()
fsMeta.Meta = meta
// This is a special case with size as '0' and object ends
// with a slash separator, we treat it like a valid operation
// and return success.
if isObjectDir(object, data.Size()) {
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
}
if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
var fi os.FileInfo
if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
if err = checkPutObjectArgs(ctx, bucket, object, fs, data.Size()); err != nil {
return ObjectInfo{}, err
}
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
}
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return ObjectInfo{}, errInvalidArgument
}
var wlk *lock.LockedFile
if bucket != minioMetaBucket {
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
wlk, err = fs.rwPool.Create(fsMetaPath)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// This close will allow for locks to be synchronized on `fs.json`.
defer wlk.Close()
defer func() {
// Remove meta file when PutObject encounters any error
if retErr != nil {
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
}
}()
}
// Uploaded object will first be written to the temporary location which will eventually
// be renamed to the actual location. It is first written to the temporary location
// so that cleaning it up will be easy if the server goes down.
tempObj := mustGetUUID()
// Allocate a buffer to Read() from request body
bufSize := int64(readSizeV1)
if size := data.Size(); size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, int(bufSize))
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, buf, data.Size())
if err != nil {
fsRemoveFile(ctx, fsTmpObjPath)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
if fsMeta.Meta["etag"] == "" {
fsMeta.Meta["etag"] = r.MD5CurrentHexString()
}
// Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header.
if bytesWritten < data.Size() {
fsRemoveFile(ctx, fsTmpObjPath)
return ObjectInfo{}, IncompleteBody{}
}
// Delete the temporary object in the case of a
// failure. If PutObject succeeds, then there would be
// nothing to delete.
defer fsRemoveFile(ctx, fsTmpObjPath)
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
if bucket != minioMetaBucket {
// Write FS metadata after a successful namespace operation.
if _, err = fsMeta.WriteTo(wlk); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}
// Stat the file to fetch timestamp, size.
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Success.
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
// Implements S3 compatible initiate multipart API. Operation here is identical
// to fs backend implementation - with the exception that cache FS uses the uploadID
// generated on the backend
func (cfs *cacheFSObjects) NewMultipartUpload(ctx context.Context, bucket, object string, uploadID string, opts ObjectOptions) (string, error) {
if cfs.diskUsageHigh() {
select {
case cfs.purgeChan <- struct{}{}:
default:
}
return "", errDiskFull
}
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
if pErr != nil {
return "", pErr
}
}
fs := cfs.FSObjects
if err := checkNewMultipartArgs(ctx, bucket, object, fs); err != nil {
return "", toObjectErr(err, bucket)
}
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
return "", toObjectErr(err, bucket)
}
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
err := mkdirAll(uploadIDDir, 0755)
if err != nil {
logger.LogIf(ctx, err)
return "", err
}
// Initialize fs.json values.
fsMeta := newFSMetaV1()
fsMeta.Meta = opts.UserDefined
fsMetaBytes, err := json.Marshal(fsMeta)
if err != nil {
logger.LogIf(ctx, err)
return "", err
}
if err = ioutil.WriteFile(pathJoin(uploadIDDir, fs.metaJSONFile), fsMetaBytes, 0644); err != nil {
logger.LogIf(ctx, err)
return "", err
}
return uploadID, nil
}
// moveBucketToTrash clears cacheFSObjects of bucket contents and moves it to trash folder.
func (cfs *cacheFSObjects) moveBucketToTrash(ctx context.Context, bucket string) (err error) {
fs := cfs.FSObjects
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
if err = bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
bucketDir, err := fs.getBucketDir(ctx, bucket)
if err != nil {
return toObjectErr(err, bucket)
}
trashPath := pathJoin(cfs.fsPath, minioMetaBucket, cacheTrashDir)
expiredDir := path.Join(trashPath, bucket)
// Attempt to move regular bucket to expired directory.
if err = fsRenameDir(bucketDir, expiredDir); err != nil {
logger.LogIf(ctx, err)
return toObjectErr(err, bucket)
}
// Cleanup all the bucket metadata.
ominioMetadataBucketDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket)
nminioMetadataBucketDir := pathJoin(trashPath, MustGetUUID())
logger.LogIf(ctx, fsRenameDir(ominioMetadataBucketDir, nminioMetadataBucketDir))
return nil
}
// Removes a directory only if its empty, handles long
// paths for windows automatically.
func fsRenameDir(dirPath, newPath string) (err error) {
if dirPath == "" || newPath == "" {
return errInvalidArgument
}
if err = checkPathLength(dirPath); err != nil {
return err
}
if err = checkPathLength(newPath); err != nil {
return err
}
if err = os.Rename(dirPath, newPath); err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
} else if isSysErrNotEmpty(err) {
return errVolumeNotEmpty
}
return err
}
return nil
}

170
cmd/disk-cache-utils.go Normal file
View file

@ -0,0 +1,170 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io"
"os"
"strconv"
"strings"
"time"
"github.com/minio/minio/cmd/crypto"
)
type cacheControl struct {
expiry time.Time
maxAge int
sMaxAge int
minFresh int
maxStale int
}
func (c cacheControl) isEmpty() bool {
return c == cacheControl{}
}
func (c cacheControl) isStale(modTime time.Time) bool {
if c.isEmpty() {
return false
}
now := time.Now()
if c.sMaxAge > 0 && c.sMaxAge < int(now.Sub(modTime).Seconds()) {
return true
}
if c.maxAge > 0 && c.maxAge < int(now.Sub(modTime).Seconds()) {
return true
}
if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now().Add(time.Duration(c.maxStale))) {
return true
}
if c.minFresh > 0 && c.minFresh <= int(now.Sub(modTime).Seconds()) {
return true
}
return false
}
// returns struct with cache-control settings from user metadata.
func cacheControlOpts(o ObjectInfo) (c cacheControl) {
m := o.UserDefined
if o.Expires != timeSentinel {
c.expiry = o.Expires
}
var headerVal string
for k, v := range m {
if strings.ToLower(k) == "cache-control" {
headerVal = v
}
}
if headerVal == "" {
return
}
headerVal = strings.ToLower(headerVal)
headerVal = strings.TrimSpace(headerVal)
vals := strings.Split(headerVal, ",")
for _, val := range vals {
val = strings.TrimSpace(val)
p := strings.Split(val, "=")
if len(p) != 2 {
continue
}
if p[0] == "max-age" ||
p[0] == "s-maxage" ||
p[0] == "min-fresh" ||
p[0] == "max-stale" {
i, err := strconv.Atoi(p[1])
if err != nil {
return cacheControl{}
}
if p[0] == "max-age" {
c.maxAge = i
}
if p[0] == "s-maxage" {
c.sMaxAge = i
}
if p[0] == "min-fresh" {
c.minFresh = i
}
if p[0] == "max-stale" {
c.maxStale = i
}
}
}
return c
}
// backendDownError returns true if err is due to backend failure or faulty disk if in server mode
func backendDownError(err error) bool {
_, backendDown := err.(BackendDown)
return backendDown || IsErr(err, baseErrs...)
}
// IsCacheable returns if the object should be saved in the cache.
func (o ObjectInfo) IsCacheable() bool {
return !crypto.IsEncrypted(o.UserDefined)
}
// reads file cached on disk from offset upto length
func readCacheFileStream(filePath string, offset, length int64) (io.ReadCloser, error) {
if filePath == "" || offset < 0 {
return nil, errInvalidArgument
}
if err := checkPathLength(filePath); err != nil {
return nil, err
}
fr, err := os.Open(filePath)
if err != nil {
return nil, osErrToFSFileErr(err)
}
// Stat to get the size of the file at path.
st, err := fr.Stat()
if err != nil {
err = osErrToFSFileErr(err)
return nil, err
}
// Verify if its not a regular file, since subsequent Seek is undefined.
if !st.Mode().IsRegular() {
return nil, errIsNotRegular
}
if err = os.Chtimes(filePath, time.Now(), st.ModTime()); err != nil {
return nil, err
}
// Seek to the requested offset.
if offset > 0 {
_, err = fr.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
}
return struct {
io.Reader
io.Closer
}{Reader: io.LimitReader(fr, length), Closer: fr}, nil
}

View file

@ -0,0 +1,60 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/http"
"reflect"
"testing"
"time"
)
func TestGetCacheControlOpts(t *testing.T) {
expiry, _ := time.Parse(http.TimeFormat, "Wed, 21 Oct 2015 07:28:00 GMT")
testCases := []struct {
cacheControlHeaderVal string
expiryHeaderVal time.Time
expectedCacheControl cacheControl
expectedErr bool
}{
{"", timeSentinel, cacheControl{}, false},
{"max-age=2592000, public", timeSentinel, cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false},
{"max-age=2592000, no-store", timeSentinel, cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false},
{"must-revalidate, max-age=600", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false},
{"s-maxAge=2500, max-age=600", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, false},
{"s-maxAge=2500, max-age=600", expiry, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Date(2015, time.October, 21, 07, 28, 00, 00, time.UTC)}, false},
{"s-maxAge=2500, max-age=600s", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, true},
}
var m map[string]string
for i, testCase := range testCases {
m = make(map[string]string)
m["cache-control"] = testCase.cacheControlHeaderVal
if testCase.expiryHeaderVal != timeSentinel {
m["expires"] = testCase.expiryHeaderVal.String()
}
c := cacheControlOpts(ObjectInfo{UserDefined: m, Expires: testCase.expiryHeaderVal})
if testCase.expectedErr && (c != cacheControl{}) {
t.Errorf("expected err for case %d", i)
}
if !testCase.expectedErr && !reflect.DeepEqual(c, testCase.expectedCacheControl) {
t.Errorf("expected %v got %v for case %d", testCase.expectedCacheControl, c, i)
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,5 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
* MinIO Cloud Storage, (C) 2018,2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,34 +19,52 @@ package cmd
import (
"bytes"
"context"
"reflect"
"io"
"net/http"
"testing"
"time"
"github.com/minio/minio/pkg/hash"
)
// Initialize cache FS objects.
func initCacheFSObjects(disk string, cacheMaxUse int) (*cacheFSObjects, error) {
return newCacheFSObjects(disk, globalCacheExpiry, cacheMaxUse)
// Initialize cache objects.
func initCacheObjects(disk string, cacheMaxUse int) (*diskCache, error) {
return newdiskCache(disk, globalCacheExpiry, cacheMaxUse)
}
// inits diskCache struct for nDisks
func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) (*diskCache, error) {
var cfs []*cacheFSObjects
func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) ([]*diskCache, error) {
var cb []*diskCache
for _, d := range drives {
obj, err := initCacheFSObjects(d, cacheMaxUse)
obj, err := initCacheObjects(d, cacheMaxUse)
if err != nil {
return nil, err
}
cfs = append(cfs, obj)
cb = append(cb, obj)
}
return cb, nil
}
// Tests ToObjectInfo function.
func TestCacheMetadataObjInfo(t *testing.T) {
m := cacheMeta{Meta: nil}
objInfo := m.ToObjectInfo("testbucket", "testobject")
if objInfo.Size != 0 {
t.Fatal("Unexpected object info value for Size", objInfo.Size)
}
if objInfo.ModTime != timeSentinel {
t.Fatal("Unexpected object info value for ModTime ", objInfo.ModTime)
}
if objInfo.IsDir {
t.Fatal("Unexpected object info value for IsDir", objInfo.IsDir)
}
if !objInfo.Expires.IsZero() {
t.Fatal("Unexpected object info value for Expires ", objInfo.Expires)
}
return &diskCache{cfs: cfs}, nil
}
// test whether a drive being offline causes
// getCacheFS to fetch next online drive
func TestGetCacheFS(t *testing.T) {
// getCachedLoc to fetch next online drive
func TestGetCachedLoc(t *testing.T) {
for n := 1; n < 10; n++ {
fsDirs, err := getRandomDisks(n)
if err != nil {
@ -56,14 +74,15 @@ func TestGetCacheFS(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c := cacheObjects{cache: d}
bucketName := "testbucket"
objectName := "testobject"
ctx := context.Background()
// find cache drive where object would be hashed
index := d.hashIndex(bucketName, objectName)
index := c.hashIndex(bucketName, objectName)
// turn off drive by setting online status to false
d.cfs[index].online = false
cfs, err := d.getCacheFS(ctx, bucketName, objectName)
c.cache[index].online = false
cfs, err := c.getCacheLoc(ctx, bucketName, objectName)
if n == 1 && err == errDiskNotFound {
continue
}
@ -71,7 +90,7 @@ func TestGetCacheFS(t *testing.T) {
t.Fatal(err)
}
i := -1
for j, f := range d.cfs {
for j, f := range c.cache {
if f == cfs {
i = j
break
@ -84,8 +103,8 @@ func TestGetCacheFS(t *testing.T) {
}
// test whether a drive being offline causes
// getCacheFS to fetch next online drive
func TestGetCacheFSMaxUse(t *testing.T) {
// getCachedLoc to fetch next online drive
func TestGetCacheMaxUse(t *testing.T) {
for n := 1; n < 10; n++ {
fsDirs, err := getRandomDisks(n)
if err != nil {
@ -95,14 +114,16 @@ func TestGetCacheFSMaxUse(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c := cacheObjects{cache: d}
bucketName := "testbucket"
objectName := "testobject"
ctx := context.Background()
// find cache drive where object would be hashed
index := d.hashIndex(bucketName, objectName)
index := c.hashIndex(bucketName, objectName)
// turn off drive by setting online status to false
d.cfs[index].online = false
cfs, err := d.getCacheFS(ctx, bucketName, objectName)
c.cache[index].online = false
cb, err := c.getCacheLoc(ctx, bucketName, objectName)
if n == 1 && err == errDiskNotFound {
continue
}
@ -110,8 +131,8 @@ func TestGetCacheFSMaxUse(t *testing.T) {
t.Fatal(err)
}
i := -1
for j, f := range d.cfs {
if f == cfs {
for j, f := range d {
if f == cb {
i = j
break
}
@ -165,7 +186,9 @@ func TestDiskCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cache := d.cfs[0]
c := cacheObjects{cache: d}
cache := c.cache[0]
ctx := context.Background()
bucketName := "testbucket"
objectName := "testobject"
@ -191,14 +214,17 @@ func TestDiskCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
if err != nil {
t.Fatal(err)
}
cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts)
cReader, err := cache.Get(ctx, bucketName, objectName, nil, http.Header{
"Content-Type": []string{"application/json"},
}, opts)
if err != nil {
t.Fatal(err)
}
cachedObjInfo := cReader.ObjInfo
if !cache.Exists(ctx, bucketName, objectName) {
t.Fatal("Expected object to exist on cache")
}
@ -212,17 +238,16 @@ func TestDiskCache(t *testing.T) {
t.Fatal("Cached content-type does not match")
}
writer := bytes.NewBuffer(nil)
err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts)
_, err = io.Copy(writer, cReader)
if err != nil {
t.Fatal(err)
}
if ccontent := writer.Bytes(); !bytes.Equal([]byte(content), ccontent) {
t.Errorf("wrong cached file content")
}
err = cache.Delete(ctx, bucketName, objectName)
if err != nil {
t.Errorf("object missing from cache")
}
cReader.Close()
cache.Delete(ctx, bucketName, objectName)
online := cache.IsOnline()
if !online {
t.Errorf("expected cache drive to be online")
@ -239,7 +264,7 @@ func TestDiskCacheMaxUse(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cache := d.cfs[0]
cache := d[0]
ctx := context.Background()
bucketName := "testbucket"
objectName := "testobject"
@ -267,19 +292,20 @@ func TestDiskCacheMaxUse(t *testing.T) {
t.Fatal(err)
}
if !cache.diskAvailable(int64(size)) {
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
if err != errDiskFull {
t.Fatal("Cache max-use limit violated.")
}
} else {
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
if err != nil {
t.Fatal(err)
}
cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts)
cReader, err := cache.Get(ctx, bucketName, objectName, nil, nil, opts)
if err != nil {
t.Fatal(err)
}
cachedObjInfo := cReader.ObjInfo
if !cache.Exists(ctx, bucketName, objectName) {
t.Fatal("Expected object to exist on cache")
}
@ -293,92 +319,19 @@ func TestDiskCacheMaxUse(t *testing.T) {
t.Fatal("Cached content-type does not match")
}
writer := bytes.NewBuffer(nil)
err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts)
_, err = io.Copy(writer, cReader)
if err != nil {
t.Fatal(err)
}
if ccontent := writer.Bytes(); !bytes.Equal([]byte(content), ccontent) {
t.Errorf("wrong cached file content")
}
err = cache.Delete(ctx, bucketName, objectName)
if err != nil {
t.Errorf("object missing from cache")
}
cReader.Close()
cache.Delete(ctx, bucketName, objectName)
online := cache.IsOnline()
if !online {
t.Errorf("expected cache drive to be online")
}
}
}
func TestIsCacheExcludeDirective(t *testing.T) {
testCases := []struct {
cacheControlOpt string
expectedResult bool
}{
{"no-cache", true},
{"no-store", true},
{"must-revalidate", true},
{"no-transform", false},
{"max-age", false},
}
for i, testCase := range testCases {
if isCacheExcludeDirective(testCase.cacheControlOpt) != testCase.expectedResult {
t.Errorf("Cache exclude directive test failed for case %d", i)
}
}
}
func TestGetCacheControlOpts(t *testing.T) {
testCases := []struct {
cacheControlHeaderVal string
expiryHeaderVal string
expectedCacheControl cacheControl
expectedErr bool
}{
{"", "", cacheControl{}, false},
{"max-age=2592000, public", "", cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: false}, false},
{"max-age=2592000, no-store", "", cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: true}, false},
{"must-revalidate, max-age=600", "", cacheControl{maxAge: 600, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: true}, false},
{"s-maxAge=2500, max-age=600", "", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}, exclude: false}, false},
{"s-maxAge=2500, max-age=600", "Wed, 21 Oct 2015 07:28:00 GMT", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Date(2015, time.October, 21, 07, 28, 00, 00, time.UTC), exclude: false}, false},
{"s-maxAge=2500, max-age=600s", "", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}, exclude: false}, true},
}
var m map[string]string
for i, testCase := range testCases {
m = make(map[string]string)
m["cache-control"] = testCase.cacheControlHeaderVal
if testCase.expiryHeaderVal != "" {
m["expires"] = testCase.expiryHeaderVal
}
c, err := getCacheControlOpts(m)
if testCase.expectedErr && err == nil {
t.Errorf("expected err for case %d", i)
}
if !testCase.expectedErr && !reflect.DeepEqual(c, testCase.expectedCacheControl) {
t.Errorf("expected %v got %v for case %d", testCase.expectedCacheControl, c, i)
}
}
}
func TestFilterFromCache(t *testing.T) {
testCases := []struct {
metadata map[string]string
expectedResult bool
}{
{map[string]string{"content-type": "application/json"}, false},
{map[string]string{"cache-control": "private,no-store"}, true},
{map[string]string{"cache-control": "no-cache,must-revalidate"}, true},
{map[string]string{"cache-control": "no-transform"}, false},
{map[string]string{"cache-control": "max-age=3600"}, false},
}
for i, testCase := range testCases {
if filterFromCache(testCase.metadata) != testCase.expectedResult {
t.Errorf("Cache exclude directive test failed for case %d", i)
}
}
}

View file

@ -18,11 +18,16 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"github.com/minio/minio/cmd/logger"
)
@ -33,6 +38,7 @@ const (
// formatCacheV1.Cache.Version
formatCacheVersionV1 = "1"
formatCacheVersionV2 = "2"
formatMetaVersion1 = "1"
@ -56,6 +62,9 @@ type formatCacheV1 struct {
} `json:"cache"` // Cache field holds cache format.
}
// formatCacheV2 is same as formatCacheV1
type formatCacheV2 = formatCacheV1
// Used to detect the version of "cache" format.
type formatCacheVersionDetect struct {
Cache struct {
@ -64,17 +73,17 @@ type formatCacheVersionDetect struct {
}
// Return a slice of format, to be used to format uninitialized disks.
func newFormatCacheV1(drives []string) []*formatCacheV1 {
func newFormatCacheV2(drives []string) []*formatCacheV2 {
diskCount := len(drives)
var disks = make([]string, diskCount)
var formats = make([]*formatCacheV1, diskCount)
var formats = make([]*formatCacheV2, diskCount)
for i := 0; i < diskCount; i++ {
format := &formatCacheV1{}
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.DistributionAlgo = formatCacheV1DistributionAlgo
format.Cache.This = mustGetUUID()
formats[i] = format
@ -87,6 +96,15 @@ func newFormatCacheV1(drives []string) []*formatCacheV1 {
return formats
}
// Returns formatCache.Cache.Version
func formatCacheGetVersion(r io.ReadSeeker) (string, error) {
format := &formatCacheVersionDetect{}
if err := jsonLoad(r, format); err != nil {
return "", err
}
return format.Cache.Version, nil
}
// Creates a new cache format.json if unformatted.
func createFormatCache(fsFormatPath string, format *formatCacheV1) error {
// open file using READ & WRITE permission
@ -110,8 +128,8 @@ func createFormatCache(fsFormatPath string, format *formatCacheV1) error {
// This function creates a cache format file on disk and returns a slice
// of format cache config
func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV1, err error) {
nformats := newFormatCacheV1(drives)
func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV2, err error) {
nformats := newFormatCacheV2(drives)
for _, drive := range drives {
_, err = os.Stat(drive)
if err == nil {
@ -147,26 +165,33 @@ func initFormatCache(ctx context.Context, drives []string) (formats []*formatCac
return nformats, nil
}
func loadFormatCache(ctx context.Context, drives []string) ([]*formatCacheV1, error) {
formats := make([]*formatCacheV1, len(drives))
func loadFormatCache(ctx context.Context, drives []string) ([]*formatCacheV2, bool, error) {
formats := make([]*formatCacheV2, len(drives))
var formatV2 *formatCacheV2
migrating := false
for i, drive := range drives {
cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile)
f, err := os.Open(cacheFormatPath)
f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0)
if err != nil {
if os.IsNotExist(err) {
continue
}
logger.LogIf(ctx, err)
return nil, err
return nil, migrating, err
}
defer f.Close()
format, err := formatMetaCacheV1(f)
if err != nil {
continue
}
formats[i] = format
formatV2 = format
if format.Cache.Version != formatCacheVersionV2 {
migrating = true
}
formats[i] = formatV2
}
return formats, nil
return formats, migrating, nil
}
// unmarshalls the cache format.json into formatCacheV1
@ -178,26 +203,38 @@ func formatMetaCacheV1(r io.ReadSeeker) (*formatCacheV1, error) {
return format, nil
}
func checkFormatCacheValue(format *formatCacheV1) error {
func checkFormatCacheValue(format *formatCacheV2, migrating bool) error {
if format.Format != formatCache {
return fmt.Errorf("Unsupported cache format [%s] found", format.Format)
}
// during migration one or more cache drive(s) formats can be out of sync
if migrating {
// Validate format version and format type.
if format.Version != formatMetaVersion1 {
return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version)
}
if format.Cache.Version != formatCacheVersionV2 && format.Cache.Version != formatCacheVersionV1 {
return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version)
}
return nil
}
// Validate format version and format type.
if format.Version != formatMetaVersion1 {
return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version)
}
if format.Format != formatCache {
return fmt.Errorf("Unsupported cache format [%s] found", format.Format)
}
if format.Cache.Version != formatCacheVersionV1 {
if format.Cache.Version != formatCacheVersionV2 {
return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version)
}
return nil
}
func checkFormatCacheValues(formats []*formatCacheV1) (int, error) {
func checkFormatCacheValues(migrating bool, formats []*formatCacheV2) (int, error) {
for i, formatCache := range formats {
if formatCache == nil {
continue
}
if err := checkFormatCacheValue(formatCache); err != nil {
if err := checkFormatCacheValue(formatCache, migrating); err != nil {
return i, err
}
if len(formats) != len(formatCache.Cache.Disks) {
@ -210,7 +247,7 @@ func checkFormatCacheValues(formats []*formatCacheV1) (int, error) {
// checkCacheDisksConsistency - checks if "This" disk uuid on each disk is consistent with all "Disks" slices
// across disks.
func checkCacheDiskConsistency(formats []*formatCacheV1) error {
func checkCacheDiskConsistency(formats []*formatCacheV2) error {
var disks = make([]string, len(formats))
// Collect currently available disk uuids.
for index, format := range formats {
@ -236,7 +273,7 @@ func checkCacheDiskConsistency(formats []*formatCacheV1) error {
}
// checkCacheDisksSliceConsistency - validate cache Disks order if they are consistent.
func checkCacheDisksSliceConsistency(formats []*formatCacheV1) error {
func checkCacheDisksSliceConsistency(formats []*formatCacheV2) error {
var sentinelDisks []string
// Extract first valid Disks slice.
for _, format := range formats {
@ -269,7 +306,7 @@ func findCacheDiskIndex(disk string, disks []string) int {
}
// validate whether cache drives order has changed
func validateCacheFormats(ctx context.Context, formats []*formatCacheV1) error {
func validateCacheFormats(ctx context.Context, migrating bool, formats []*formatCacheV2) error {
count := 0
for _, format := range formats {
if format == nil {
@ -279,7 +316,7 @@ func validateCacheFormats(ctx context.Context, formats []*formatCacheV1) error {
if count == len(formats) {
return errors.New("Cache format files missing on all drives")
}
if _, err := checkFormatCacheValues(formats); err != nil {
if _, err := checkFormatCacheValues(migrating, formats); err != nil {
logger.LogIf(ctx, err)
return err
}
@ -308,17 +345,161 @@ func cacheDrivesUnformatted(drives []string) bool {
// create format.json for each cache drive if fresh disk or load format from disk
// Then validate the format for all drives in the cache to ensure order
// of cache drives has not changed.
func loadAndValidateCacheFormat(ctx context.Context, drives []string) (formats []*formatCacheV1, err error) {
func loadAndValidateCacheFormat(ctx context.Context, drives []string) (formats []*formatCacheV2, migrating bool, err error) {
if cacheDrivesUnformatted(drives) {
formats, err = initFormatCache(ctx, drives)
} else {
formats, err = loadFormatCache(ctx, drives)
formats, migrating, err = loadFormatCache(ctx, drives)
}
if err != nil {
return nil, err
return nil, false, err
}
if err = validateCacheFormats(ctx, formats); err != nil {
return nil, err
if err = validateCacheFormats(ctx, migrating, formats); err != nil {
return nil, false, err
}
return formats, nil
return formats, migrating, nil
}
// reads cached object on disk and writes it back after adding bitrot
// hashsum per block as per the new disk cache format.
func migrateData(ctx context.Context, c *diskCache, oldfile, destDir string) error {
st, err := os.Stat(oldfile)
if err != nil {
err = osErrToFSFileErr(err)
return err
}
readCloser, err := readCacheFileStream(oldfile, 0, st.Size())
if err != nil {
return err
}
_, err = c.bitrotWriteToCache(ctx, destDir, readCloser, st.Size())
return err
}
// migrate cache contents from old cacheFS format to new backend format
// new format is flat
// sha(bucket,object)/ <== dir name
// - part.1 <== data
// - cache.json <== metadata
func migrateOldCache(ctx context.Context, c *diskCache) error {
oldCacheBucketsPath := path.Join(c.dir, minioMetaBucket, "buckets")
cacheFormatPath := pathJoin(c.dir, minioMetaBucket, formatConfigFile)
if _, err := os.Stat(oldCacheBucketsPath); err != nil {
// remove .minio.sys sub directories
removeAll(path.Join(c.dir, minioMetaBucket, "multipart"))
removeAll(path.Join(c.dir, minioMetaBucket, "tmp"))
removeAll(path.Join(c.dir, minioMetaBucket, "trash"))
removeAll(path.Join(c.dir, minioMetaBucket, "buckets"))
// just migrate cache format
return migrateCacheFormatJSON(cacheFormatPath)
}
buckets, err := readDir(oldCacheBucketsPath)
if err != nil {
return err
}
for _, bucket := range buckets {
var objMetaPaths []string
root := path.Join(oldCacheBucketsPath, bucket)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, cacheMetaJSONFile) {
objMetaPaths = append(objMetaPaths, path)
}
return nil
})
if err != nil {
return err
}
for _, oMeta := range objMetaPaths {
objSlice := strings.SplitN(oMeta, cacheMetaJSONFile, 2)
object := strings.TrimPrefix(objSlice[0], path.Join(oldCacheBucketsPath, bucket))
object = strings.TrimSuffix(object, "/")
destdir := getCacheSHADir(c.dir, bucket, object)
if err := os.MkdirAll(destdir, 0777); err != nil {
return err
}
prevCachedPath := path.Join(c.dir, bucket, object)
// move cached object to new cache directory path
// migrate cache data and add bit-rot protection hash sum
// at the start of each block
if err := migrateData(ctx, c, prevCachedPath, destdir); err != nil {
continue
}
stat, err := os.Stat(prevCachedPath)
if err != nil {
if err == errFileNotFound {
continue
}
logger.LogIf(ctx, err)
return err
}
// old cached file can now be removed
if err := os.Remove(prevCachedPath); err != nil {
return err
}
// move cached metadata after changing cache metadata version
oldMetaPath := pathJoin(oldCacheBucketsPath, bucket, object, cacheMetaJSONFile)
metaPath := pathJoin(destdir, cacheMetaJSONFile)
metaBytes, err := ioutil.ReadFile(oldMetaPath)
if err != nil {
return err
}
// marshal cache metadata after adding version and stat info
meta := &cacheMeta{}
if err = json.Unmarshal(metaBytes, &meta); err != nil {
return err
}
meta.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
meta.Version = cacheMetaVersion
meta.Stat.Size = stat.Size()
meta.Stat.ModTime = stat.ModTime()
jsonData, err := json.Marshal(meta)
if err != nil {
return err
}
if err = ioutil.WriteFile(metaPath, jsonData, 0644); err != nil {
return err
}
}
// delete old bucket from cache, now that all contents are cleared
removeAll(path.Join(c.dir, bucket))
}
// remove .minio.sys sub directories
removeAll(path.Join(c.dir, minioMetaBucket, "multipart"))
removeAll(path.Join(c.dir, minioMetaBucket, "tmp"))
removeAll(path.Join(c.dir, minioMetaBucket, "trash"))
removeAll(path.Join(c.dir, minioMetaBucket, "buckets"))
return migrateCacheFormatJSON(cacheFormatPath)
}
func migrateCacheFormatJSON(cacheFormatPath string) error {
// now migrate format.json
f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0)
if err != nil {
return err
}
defer f.Close()
formatV1 := formatCacheV1{}
if err := jsonLoad(f, &formatV1); err != nil {
return err
}
formatV2 := &formatCacheV2{}
formatV2.formatMetaV1 = formatV1.formatMetaV1
formatV2.Version = formatMetaVersion1
formatV2.Cache = formatV1.Cache
formatV2.Cache.Version = formatCacheVersionV2
if err := jsonSave(f, formatV2); err != nil {
return err
}
return nil
}

View file

@ -18,20 +18,10 @@ package cmd
import (
"context"
"io"
"os"
"testing"
)
// Returns format.Cache.Version
func formatCacheGetVersion(r io.ReadSeeker) (string, error) {
format := &formatCacheVersionDetect{}
if err := jsonLoad(r, format); err != nil {
return "", err
}
return format.Cache.Version, nil
}
// TestDiskCacheFormat - tests initFormatCache, formatMetaGetFormatBackendCache, formatCacheGetVersion.
func TestDiskCacheFormat(t *testing.T) {
ctx := context.Background()
@ -55,8 +45,8 @@ func TestDiskCacheFormat(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if version != formatCacheVersionV1 {
t.Fatalf(`expected: %s, got: %s`, formatCacheVersionV1, version)
if version != formatCacheVersionV2 {
t.Fatalf(`expected: %s, got: %s`, formatCacheVersionV2, version)
}
// Corrupt the format.json file and test the functions.
@ -68,7 +58,7 @@ func TestDiskCacheFormat(t *testing.T) {
t.Fatal(err)
}
if _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
t.Fatal("expected to fail")
}
@ -81,15 +71,15 @@ func TestDiskCacheFormat(t *testing.T) {
t.Fatal(err)
}
if _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
t.Fatal("expected to fail")
}
}
// generates a valid format.json for Cache backend.
func genFormatCacheValid() []*formatCacheV1 {
func genFormatCacheValid() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
@ -97,7 +87,7 @@ func genFormatCacheValid() []*formatCacheV1 {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
@ -106,9 +96,9 @@ func genFormatCacheValid() []*formatCacheV1 {
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidVersion() []*formatCacheV1 {
func genFormatCacheInvalidVersion() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
@ -128,14 +118,14 @@ func genFormatCacheInvalidVersion() []*formatCacheV1 {
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidFormat() []*formatCacheV1 {
func genFormatCacheInvalidFormat() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
@ -150,14 +140,14 @@ func genFormatCacheInvalidFormat() []*formatCacheV1 {
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidCacheVersion() []*formatCacheV1 {
func genFormatCacheInvalidCacheVersion() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
@ -172,17 +162,17 @@ func genFormatCacheInvalidCacheVersion() []*formatCacheV1 {
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidDisksCount() []*formatCacheV1 {
func genFormatCacheInvalidDisksCount() []*formatCacheV2 {
disks := make([]string, 7)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
@ -191,9 +181,9 @@ func genFormatCacheInvalidDisksCount() []*formatCacheV1 {
}
// generates a invalid format.json Disks for Cache backend.
func genFormatCacheInvalidDisks() []*formatCacheV1 {
func genFormatCacheInvalidDisks() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
@ -201,7 +191,7 @@ func genFormatCacheInvalidDisks() []*formatCacheV1 {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
@ -226,7 +216,7 @@ func genFormatCacheInvalidThis() []*formatCacheV1 {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
@ -238,9 +228,9 @@ func genFormatCacheInvalidThis() []*formatCacheV1 {
}
// generates a invalid format.json Disk UUID in wrong order for Cache backend.
func genFormatCacheInvalidDisksOrder() []*formatCacheV1 {
func genFormatCacheInvalidDisksOrder() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
@ -248,7 +238,7 @@ func genFormatCacheInvalidDisksOrder() []*formatCacheV1 {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
@ -319,7 +309,7 @@ func TestFormatCache(t *testing.T) {
}
for i, testCase := range testCases {
err := validateCacheFormats(context.Background(), testCase.formatConfigs)
err := validateCacheFormats(context.Background(), false, testCase.formatConfigs)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass but failed with %s", i+1, err)
}

View file

@ -253,7 +253,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
if len(cacheConfig.Drives) > 0 {
var err error
// initialize the new disk cache objects.
globalCacheObjectAPI, err = newServerCacheObjects(cacheConfig)
globalCacheObjectAPI, err = newServerCacheObjects(context.Background(), cacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
}

View file

@ -337,7 +337,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
return
}
defer gr.Close()
objInfo := gr.ObjInfo
if objectAPI.IsEncryptionSupported() {
@ -379,7 +378,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
statusCodeWritten = true
w.WriteHeader(http.StatusPartialContent)
}
// Write object content to response body
if _, err = io.Copy(httpWriter, gr); err != nil {
if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
@ -1253,10 +1251,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
putObject = api.CacheAPI().PutObject
}
// Create the object..
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
if err != nil {
@ -1408,9 +1402,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
newMultipartUpload := objectAPI.NewMultipartUpload
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
newMultipartUpload = api.CacheAPI().NewMultipartUpload
}
uploadID, err := newMultipartUpload(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@ -1939,9 +1931,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
}
putObjectPart := objectAPI.PutObjectPart
if api.CacheAPI() != nil && !isEncrypted {
putObjectPart = api.CacheAPI().PutObjectPart
}
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
if err != nil {
// Verify if the underlying error is signature mismatch.
@ -1974,9 +1964,6 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
return
}
abortMultipartUpload := objectAPI.AbortMultipartUpload
if api.CacheAPI() != nil {
abortMultipartUpload = api.CacheAPI().AbortMultipartUpload
}
if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
@ -2257,9 +2244,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
if api.CacheAPI() != nil {
completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload
}
// This code is specifically to handle the requirements for slow
// complete multipart upload operations on FS mode.

View file

@ -346,7 +346,7 @@ func serverMain(ctx *cli.Context) {
var cacheConfig = globalServerConfig.GetCacheConfig()
if len(cacheConfig.Drives) > 0 {
// initialize the new disk cache objects.
globalCacheObjectAPI, err = newServerCacheObjects(cacheConfig)
globalCacheObjectAPI, err = newServerCacheObjects(context.Background(), cacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
}

View file

@ -257,9 +257,6 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
}
deleteBucket := objectAPI.DeleteBucket
if web.CacheAPI() != nil {
deleteBucket = web.CacheAPI().DeleteBucket
}
if err := deleteBucket(ctx, args.BucketName); err != nil {
return toJSONError(ctx, err, args.BucketName)
@ -302,9 +299,6 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re
return toJSONError(ctx, errServerNotInitialized)
}
listBuckets := objectAPI.ListBuckets
if web.CacheAPI() != nil {
listBuckets = web.CacheAPI().ListBuckets
}
claims, owner, authErr := webRequestAuthenticate(r)
if authErr != nil {
@ -407,9 +401,6 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
}
listObjects := objectAPI.ListObjects
if web.CacheAPI() != nil {
listObjects = web.CacheAPI().ListObjects
}
if isRemoteCallRequired(ctx, args.BucketName, objectAPI) {
sr, err := globalDNSConfig.Get(args.BucketName)
@ -600,9 +591,6 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
return toJSONError(ctx, errServerNotInitialized)
}
listObjects := objectAPI.ListObjects
if web.CacheAPI() != nil {
listObjects = web.CacheAPI().ListObjects
}
claims, owner, authErr := webRequestAuthenticate(r)
if authErr != nil {
@ -1065,10 +1053,8 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
putObject := objectAPI.PutObject
if !hasServerSideEncryptionHeader(r.Header) && web.CacheAPI() != nil {
putObject = web.CacheAPI().PutObject
}
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
objInfo, err := putObject(context.Background(), bucket, object, pReader, opts)
if err != nil {
writeWebErrorResponse(w, err)
return
@ -1307,32 +1293,29 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
writeWebErrorResponse(w, errInvalidBucketName)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
if web.CacheAPI() != nil {
getObjectNInfo = web.CacheAPI().GetObjectNInfo
}
getObject := objectAPI.GetObject
if web.CacheAPI() != nil {
getObject = web.CacheAPI().GetObject
}
listObjects := objectAPI.ListObjects
if web.CacheAPI() != nil {
listObjects = web.CacheAPI().ListObjects
}
archive := zip.NewWriter(w)
defer archive.Close()
getObjectInfo := objectAPI.GetObjectInfo
if web.CacheAPI() != nil {
getObjectInfo = web.CacheAPI().GetObjectInfo
}
opts := ObjectOptions{}
var length int64
for _, object := range args.Objects {
// Writes compressed object file to the response.
zipit := func(objectName string) error {
info, err := getObjectInfo(ctx, args.BucketName, objectName, opts)
var opts ObjectOptions
gr, err := getObjectNInfo(ctx, args.BucketName, objectName, nil, r.Header, readLock, opts)
if err != nil {
return err
}
defer gr.Close()
info := gr.ObjInfo
length = info.Size
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(&info, r.Header); err != nil {
@ -1395,7 +1378,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
// Response writer should be limited early on for decryption upto required length,
// additionally also skipping mod(offset)64KiB boundaries.
writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length)
writer, startOffset, length, err = DecryptBlocksRequest(writer, r,
writer, _, length, err = DecryptBlocksRequest(writer, r,
args.BucketName, objectName, startOffset, length, info, false)
if err != nil {
writeWebErrorResponse(w, err)
@ -1403,14 +1386,20 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
}
}
httpWriter := ioutil.WriteOnClose(writer)
if err = getObject(ctx, args.BucketName, objectName, startOffset, length, httpWriter, "", opts); err != nil {
// Write object content to response body
if _, err = io.Copy(httpWriter, gr); err != nil {
httpWriter.Close()
if info.IsCompressed() {
// Wait for decompression go-routine to retire.
wg.Wait()
}
if !httpWriter.HasWritten() { // write error response only if no data or headers has been written to client yet
writeWebErrorResponse(w, err)
}
return err
}
if err = httpWriter.Close(); err != nil {
if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet
writeWebErrorResponse(w, err)

View file

@ -34,13 +34,13 @@ minio server -h
- An object is only cached when drive has sufficient disk space.
## Behavior
Disk caching caches objects for both **uploaded** and **downloaded** objects i.e
Disk caching caches objects for **downloaded** objects i.e
- Caches new objects for entries not found in cache while downloading. Otherwise serves from the cache.
- Caches all successfully uploaded objects. Replaces existing cached entry of the same object if needed.
- Bitrot protection is added to cached content and verified when object is served from cache.
- When an object is deleted, corresponding entry in cache if any is deleted as well.
- Cache continues to work for read-only operations such as GET, HEAD when backend is offline.
- Cache disallows write operations when backend is offline.
- Cache-Control and Expires headers can be used to control how long objects stay in the cache
> NOTE: Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time.
@ -51,3 +51,5 @@ Upon restart of minio server after a running minio process is killed or crashes,
- Bucket policies are not cached, so anonymous operations are not supported when backend is offline.
- Objects are distributed using deterministic hashing among the list of configured cache drives. If one or more drives go offline, or cache drive configuration is altered in any way, performance may degrade to a linear lookup time depending on the number of disks in cache.
## TODO
- Encrypt cached objects automatically with a cache encryption master key