complete submit chunked file

This commit is contained in:
tnextday
2015-12-02 15:00:46 +08:00
parent de5e07ce3e
commit 2c0a7fe75e
4 changed files with 40 additions and 14 deletions

View File

@@ -85,10 +85,9 @@ func (cm *ChunkManifest) DeleteChunks(master string) error {
return nil return nil
} }
func (cm *ChunkManifest) StoredHelper() error { //func (cm *ChunkManifest) StoredHelper() error {
//TODO // return nil
return nil //}
}
func httpRangeDownload(fileUrl string, w io.Writer, offset int64) (written int64, e error) { func httpRangeDownload(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
req, err := http.NewRequest("GET", fileUrl, nil) req, err := http.NewRequest("GET", fileUrl, nil)

View File

@@ -9,6 +9,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"net/url"
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/security"
) )
@@ -117,7 +119,13 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) { if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
chunkSize := int64(maxMB * 1024 * 1024) chunkSize := int64(maxMB * 1024 * 1024)
chunks := fi.FileSize/chunkSize + 1 chunks := fi.FileSize/chunkSize + 1
var fids []string cm := ChunkManifest{
Name: fi.FileName,
Size: fi.FileSize,
Mime: fi.MimeType,
Chunks: make([]*ChunkInfo, 0, chunks),
}
for i := int64(0); i < chunks; i++ { for i := int64(0); i < chunks; i++ {
id, count, e := upload_one_chunk( id, count, e := upload_one_chunk(
fi.FileName+"-"+strconv.FormatInt(i+1, 10), fi.FileName+"-"+strconv.FormatInt(i+1, 10),
@@ -125,12 +133,24 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
master, fi.Replication, fi.Collection, fi.Ttl, master, fi.Replication, fi.Collection, fi.Ttl,
jwt) jwt)
if e != nil { if e != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
return 0, e return 0, e
} }
fids = append(fids, id) cm.Chunks = append(cm.Chunks,
&ChunkInfo{
Offset: i * chunkSize,
Size: int64(count),
Fid: id,
},
)
retSize += count retSize += count
} }
err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt) err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
}
} else { } else {
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
if e != nil { if e != nil {
@@ -158,10 +178,17 @@ func upload_one_chunk(filename string, reader io.Reader, master,
return fid, uploadResult.Size, nil return fid, uploadResult.Size, nil
} }
func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error { func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
var buf bytes.Buffer buf, e := manifest.GetData()
buf.WriteString(strings.Join(fids, "\n")) if e != nil {
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...") return e
_, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt) }
bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "1")
u.RawQuery = q.Encode()
_, e = Upload(u.String(), manifest.Name, bufReader, false, "text/plain", jwt)
return e return e
} }

View File

@@ -132,7 +132,7 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
} }
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
ttl, _ = ReadTTL(r.FormValue("ttl")) ttl, _ = ReadTTL(r.FormValue("ttl"))
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cf")) isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
return return
} }
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {

View File

@@ -270,7 +270,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
return true return true
} }
chunkedFileReader := operation.ChunkedFileReader{ chunkedFileReader := &operation.ChunkedFileReader{
Manifest: chunkManifest, Manifest: chunkManifest,
Master: vs.GetMasterNode(), Master: vs.GetMasterNode(),
} }