mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-18 19:29:23 +08:00
filer: remember content is gzipped or not
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
package operation
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"google.golang.org/grpc"
|
||||
"io"
|
||||
"mime"
|
||||
"net/url"
|
||||
@@ -11,6 +9,8 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
)
|
||||
@@ -52,7 +52,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
|
||||
}
|
||||
ret, err := Assign(master, grpcDialOption, ar)
|
||||
if err != nil {
|
||||
for index, _ := range files {
|
||||
for index := range files {
|
||||
results[index].Error = err.Error()
|
||||
}
|
||||
return results, err
|
||||
@@ -214,12 +214,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
bufReader := bytes.NewReader(buf)
|
||||
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
|
||||
u, _ := url.Parse(fileUrl)
|
||||
q := u.Query()
|
||||
q.Set("cm", "true")
|
||||
u.RawQuery = q.Encode()
|
||||
_, e = Upload(u.String(), manifest.Name, false, bufReader, false, "application/json", nil, jwt)
|
||||
_, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
|
||||
return e
|
||||
}
|
||||
|
@@ -3,7 +3,7 @@ package operation
|
||||
import (
|
||||
"bytes"
|
||||
"compress/flate"
|
||||
"compress/gzip"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -28,6 +28,8 @@ type UploadResult struct {
|
||||
ETag string `json:"eTag,omitempty"`
|
||||
CipherKey []byte `json:"cipherKey,omitempty"`
|
||||
Mime string `json:"mime,omitempty"`
|
||||
Gzip uint32 `json:"gzip,omitempty"`
|
||||
Md5 string `json:"md5,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -43,22 +45,28 @@ func init() {
|
||||
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
||||
|
||||
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
||||
func UploadWithLocalCompressionLevel(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) {
|
||||
if compressionLevel < 1 {
|
||||
compressionLevel = 1
|
||||
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
||||
hash := md5.New()
|
||||
hash.Write(data)
|
||||
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
|
||||
if uploadResult != nil {
|
||||
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
|
||||
}
|
||||
if compressionLevel > 9 {
|
||||
compressionLevel = 9
|
||||
}
|
||||
return doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, compressionLevel, jwt)
|
||||
return
|
||||
}
|
||||
|
||||
// Upload sends a POST request to a volume server to upload the content with fast compression
|
||||
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
return doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, flate.BestSpeed, jwt)
|
||||
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
||||
hash := md5.New()
|
||||
reader = io.TeeReader(reader, hash)
|
||||
uploadResult, err = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, flate.BestSpeed, jwt)
|
||||
if uploadResult != nil {
|
||||
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||
func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
||||
contentIsGzipped := isInputGzipped
|
||||
shouldGzipNow := false
|
||||
if !isInputGzipped {
|
||||
@@ -67,33 +75,131 @@ func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader,
|
||||
contentIsGzipped = true
|
||||
}
|
||||
}
|
||||
// encrypt data
|
||||
var cipherKey util.CipherKey
|
||||
|
||||
var clearDataLen int
|
||||
var err error
|
||||
if cipher {
|
||||
cipherKey, reader, clearDataLen, _, err = util.EncryptReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
// gzip if possible
|
||||
// this could be double copying
|
||||
clearDataLen = len(data)
|
||||
if shouldGzipNow {
|
||||
data, err = util.GzipData(data)
|
||||
} else if isInputGzipped {
|
||||
// just to get the clear data length
|
||||
clearData, err := util.UnGzipData(data)
|
||||
if err == nil {
|
||||
clearDataLen = len(clearData)
|
||||
}
|
||||
}
|
||||
|
||||
// upload data
|
||||
uploadResult, err := upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
if shouldGzipNow {
|
||||
gzWriter, _ := gzip.NewWriterLevel(w, compression)
|
||||
_, err = io.Copy(gzWriter, reader)
|
||||
gzWriter.Close()
|
||||
} else {
|
||||
_, err = io.Copy(w, reader)
|
||||
}
|
||||
return
|
||||
}, filename, contentIsGzipped, mtype, pairMap, jwt)
|
||||
if cipher {
|
||||
// encrypt(gzip(data))
|
||||
|
||||
// remember cipher key
|
||||
if uploadResult != nil && cipherKey != nil {
|
||||
uploadResult.CipherKey = cipherKey
|
||||
uploadResult.Size = uint32(clearDataLen)
|
||||
// encrypt
|
||||
cipherKey := util.GenCipherKey()
|
||||
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
|
||||
if encryptionErr != nil {
|
||||
err = fmt.Errorf("encrypt input: %v", encryptionErr)
|
||||
return
|
||||
}
|
||||
|
||||
// upload data
|
||||
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
_, err = w.Write(encryptedData)
|
||||
return
|
||||
}, "", false, "", nil, jwt)
|
||||
if uploadResult != nil {
|
||||
uploadResult.Name = filename
|
||||
uploadResult.Mime = mtype
|
||||
uploadResult.CipherKey = cipherKey
|
||||
}
|
||||
} else {
|
||||
// upload data
|
||||
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
_, err = w.Write(data)
|
||||
return
|
||||
}, filename, contentIsGzipped, mtype, pairMap, jwt)
|
||||
}
|
||||
|
||||
uploadResult.Size = uint32(clearDataLen)
|
||||
if contentIsGzipped {
|
||||
uploadResult.Gzip = 1
|
||||
}
|
||||
|
||||
return uploadResult, err
|
||||
}
|
||||
|
||||
func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
|
||||
contentIsGzipped := isInputGzipped
|
||||
shouldGzipNow := false
|
||||
if !isInputGzipped {
|
||||
if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); mtype == "" || iAmSure && shouldBeZipped {
|
||||
shouldGzipNow = true
|
||||
contentIsGzipped = true
|
||||
}
|
||||
}
|
||||
|
||||
var clearDataLen int
|
||||
|
||||
// gzip if possible
|
||||
// this could be double copying
|
||||
data, readErr := ioutil.ReadAll(reader)
|
||||
if readErr != nil {
|
||||
err = fmt.Errorf("read input: %v", readErr)
|
||||
return
|
||||
}
|
||||
clearDataLen = len(data)
|
||||
if shouldGzipNow {
|
||||
data, err = util.GzipData(data)
|
||||
} else if isInputGzipped {
|
||||
// just to get the clear data length
|
||||
clearData, err := util.UnGzipData(data)
|
||||
if err == nil {
|
||||
clearDataLen = len(clearData)
|
||||
}
|
||||
}
|
||||
|
||||
println("data size", clearDataLen)
|
||||
|
||||
if cipher {
|
||||
// encrypt(gzip(data))
|
||||
|
||||
// encrypt
|
||||
cipherKey := util.GenCipherKey()
|
||||
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
|
||||
if encryptionErr != nil {
|
||||
err = fmt.Errorf("encrypt input: %v", encryptionErr)
|
||||
return
|
||||
}
|
||||
println("encrypted data size", len(encryptedData))
|
||||
|
||||
// upload data
|
||||
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
n, err := w.Write(encryptedData)
|
||||
println("writtern data size", n)
|
||||
return
|
||||
}, "", false, "", nil, jwt)
|
||||
if uploadResult != nil {
|
||||
uploadResult.Name = filename
|
||||
uploadResult.Mime = mtype
|
||||
uploadResult.CipherKey = cipherKey
|
||||
uploadResult.Size = uint32(clearDataLen)
|
||||
}
|
||||
} else {
|
||||
// upload data
|
||||
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||
n, err := w.Write(data)
|
||||
println("written data size", n)
|
||||
return
|
||||
}, filename, contentIsGzipped, mtype, pairMap, jwt)
|
||||
}
|
||||
|
||||
if uploadResult == nil {
|
||||
return
|
||||
}
|
||||
|
||||
uploadResult.Size = uint32(clearDataLen)
|
||||
if contentIsGzipped {
|
||||
uploadResult.Gzip = 1
|
||||
}
|
||||
|
||||
return uploadResult, err
|
||||
|
Reference in New Issue
Block a user