update ChunkedFile to seekable reader, so we can use io.* to read data

This commit is contained in:
tnextday
2015-12-01 20:23:50 +08:00
parent f825d23789
commit 6b0894d806
5 changed files with 313 additions and 65 deletions

View File

@@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return

View File

@@ -81,6 +81,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
w.Header().Set("Etag", etag)
if vs.tryHandleChunkedFile(n, filename, w, r) {
return
}
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
dotIndex := strings.LastIndex(filename, ".")
@@ -215,3 +220,148 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
io.CopyN(w, sendContent, sendSize)
}
func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedFile() {
return false
}
processed = true
raw, _ := strconv.ParseBool(r.FormValue("raw"))
if raw {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
if _, e := w.Write(n.Data); e != nil {
glog.V(0).Infoln("response write error:", e)
}
return true
}
chunkManifest, e := operation.LoadChunkedManifest(n.Data)
if e != nil {
return false
}
ext := ""
if fileName == "" && chunkManifest.Name != "" {
fileName = chunkManifest.Name
dotIndex := strings.LastIndex(fileName, ".")
if dotIndex > 0 {
ext = fileName[dotIndex:]
}
}
mtype := ""
if ext != "" {
mtype = mime.TypeByExtension(ext)
}
if chunkManifest.Mime != "" {
mt := chunkManifest.Mime
if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt
}
}
if mtype != "" {
w.Header().Set("Content-Type", mtype)
}
if fileName != "" {
w.Header().Set("Content-Disposition", `filename="`+fileNameEscaper.Replace(fileName)+`"`)
}
w.Header().Set("X-File-Store", "chunked")
w.Header().Set("Accept-Ranges", "bytes")
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
return true
}
chunkedFileReader := operation.ChunkedFileReader{
Manifest: chunkManifest,
Master: vs.GetMasterNode(),
}
defer chunkedFileReader.Close()
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
if _, e = io.Copy(w, chunkedFileReader); e != nil {
glog.V(0).Infoln("response write error:", e)
}
return true
}
//the rest is dealing with partial content request
//mostly copy from src/pkg/net/http/fs.go
size := chunkManifest.Size
ranges, err := parseRange(rangeReq, size)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
if sumRangesSize(ranges) > size {
// The total number of bytes in all the ranges
// is larger than the size of the file by
// itself, so this is probably an attack, or a
// dumb client. Ignore the range request.
ranges = nil
return
}
if len(ranges) == 0 {
return
}
if len(ranges) == 1 {
// RFC 2616, Section 14.16:
// "When an HTTP message includes the content of a single
// range (for example, a response to a request for a
// single range, or to a request for a set of ranges
// that overlap without any holes), this content is
// transmitted with a Content-Range header, and a
// Content-Length header showing the number of bytes
// actually transferred.
// ...
// A response to a request for a single range MUST NOT
// be sent using the multipart/byteranges media type."
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(size))
w.WriteHeader(http.StatusPartialContent)
if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
glog.V(0).Infoln("response write error:", e)
}
if _, e = io.CopyN(w, chunkedFileReader, ra.length); e != nil {
glog.V(0).Infoln("response write error:", e)
}
return
}
// process multiple ranges
for _, ra := range ranges {
if ra.start > size {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return
}
}
sendSize := rangesMIMESize(ranges, mtype, size)
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent := pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
if err != nil {
pw.CloseWithError(err)
return
}
if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
glog.V(0).Infoln("response write error:", e)
}
if _, err = io.CopyN(part, chunkedFileReader, ra.length); err != nil {
pw.CloseWithError(err)
return
}
}
mw.Close()
pw.Close()
}()
if w.Header().Get("Content-Encoding") == "" {
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
w.WriteHeader(http.StatusPartialContent)
io.CopyN(w, sendContent, sendSize)
return
}