remove CreateFile with streaming response
Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
parent
1c7b1bb25c
commit
23a37d0855
|
@ -494,6 +494,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
@ -528,6 +529,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
||||
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
|
||||
|
@ -544,7 +546,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
// Pick one from the first valid metadata.
|
||||
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, err
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Once part is successfully committed, proceed with updating erasure metadata.
|
||||
|
@ -571,6 +574,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
|
||||
// Writes update `xl.meta` format for each disk.
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
|
|
|
@ -801,11 +801,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
|||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
|
||||
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
||||
req.Header.Set("Expect", "100-continue") // set expect continue to honor expect continue timeouts
|
||||
if length > 0 {
|
||||
req.ContentLength = length
|
||||
}
|
||||
|
|
|
@ -221,7 +221,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
}()
|
||||
err = newCache.deserialize(pr)
|
||||
pr.CloseWithError(err)
|
||||
return newCache, err
|
||||
return newCache, toStorageErr(err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
|
@ -337,10 +337,6 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = waitForHTTPResponse(respBody)
|
||||
defer http.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
@ -459,7 +455,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
|
|||
defer readMsgpReaderPool.Put(dec)
|
||||
|
||||
err = fi.DecodeMsg(dec)
|
||||
return fi, err
|
||||
return fi, toStorageErr(err)
|
||||
}
|
||||
|
||||
// ReadAll - reads all contents of a file.
|
||||
|
@ -482,11 +478,7 @@ func (client *storageRESTClient) ReadFileStream(ctx context.Context, volume, pat
|
|||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTOffset, strconv.Itoa(int(offset)))
|
||||
values.Set(storageRESTLength, strconv.Itoa(int(length)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return respBody, nil
|
||||
return client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
|
||||
}
|
||||
|
||||
// ReadFile - reads section of a file.
|
||||
|
@ -509,7 +501,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
|
|||
}
|
||||
defer http.DrainBody(respBody)
|
||||
n, err := io.ReadFull(respBody, buf)
|
||||
return int64(n), err
|
||||
return int64(n), toStorageErr(err)
|
||||
}
|
||||
|
||||
// ListDir - lists a directory.
|
||||
|
@ -524,7 +516,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
|
|||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&entries)
|
||||
return entries, err
|
||||
return entries, toStorageErr(err)
|
||||
}
|
||||
|
||||
// DeleteFile - deletes a file.
|
||||
|
@ -570,7 +562,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||
reader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = err
|
||||
errs[i] = toStorageErr(err)
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
@ -620,12 +612,12 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
|||
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
verifyResp := &VerifyFileResp{}
|
||||
if err = gob.NewDecoder(respReader).Decode(verifyResp); err != nil {
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
return toStorageErr(verifyResp.Err)
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v29" // Removed WalkVersions()
|
||||
storageRESTVersion = "v30" // CreateFile is back to non-streaming
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
|
|
@ -288,8 +288,10 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
|
|||
return
|
||||
}
|
||||
|
||||
done := keepHTTPResponseAlive(w)
|
||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
|
||||
err = s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteVersion delete updated metadata.
|
||||
|
@ -693,6 +695,7 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
|
|||
if doneCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Indicate we are ready to write.
|
||||
doneCh <- err
|
||||
|
||||
|
|
|
@ -455,7 +455,7 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
|
|||
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
TLSHandshakeTimeout: 15 * time.Second,
|
||||
ExpectContinueTimeout: 15 * time.Second,
|
||||
TLSClientConfig: tlsConfig,
|
||||
|
|
Loading…
Reference in a new issue