mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 03:59:56 +08:00
preparing to support S3 multipart uploads
This commit is contained in:
@@ -49,50 +49,15 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
|
||||
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
|
||||
proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
|
||||
|
||||
if errCode != ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
proxyReq.Header.Set("Host", s3a.option.Filer)
|
||||
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
||||
|
||||
for header, values := range r.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
resp, postErr := client.Do(proxyReq)
|
||||
|
||||
if postErr != nil {
|
||||
glog.Errorf("post to filer: %v", postErr)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
||||
if ra_err != nil {
|
||||
glog.Errorf("upload to filer response read: %v", ra_err)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
var ret UploadResult
|
||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||
if unmarshal_err != nil {
|
||||
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
if ret.Error != "" {
|
||||
glog.Errorf("upload to filer error: %v", ret.Error)
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
setEtag(w, etag)
|
||||
|
||||
writeSuccessResponseEmpty(w)
|
||||
}
|
||||
@@ -134,6 +99,12 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
}
|
||||
|
||||
// DeleteMultipleObjectsHandler - Delete multiple objects
|
||||
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// TODO
|
||||
writeErrorResponse(w, ErrNotImplemented, r.URL)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
|
||||
|
||||
glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
|
||||
@@ -173,3 +144,56 @@ func passThroghResponse(proxyResonse *http.Response, w http.ResponseWriter) {
|
||||
w.WriteHeader(proxyResonse.StatusCode)
|
||||
io.Copy(w, proxyResonse.Body)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
|
||||
|
||||
proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
|
||||
return "", ErrInternalError
|
||||
}
|
||||
|
||||
proxyReq.Header.Set("Host", s3a.option.Filer)
|
||||
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
||||
|
||||
for header, values := range r.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
resp, postErr := client.Do(proxyReq)
|
||||
|
||||
if postErr != nil {
|
||||
glog.Errorf("post to filer: %v", postErr)
|
||||
return "", ErrInternalError
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
etag = resp.Header.Get("ETag")
|
||||
|
||||
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
||||
if ra_err != nil {
|
||||
glog.Errorf("upload to filer response read: %v", ra_err)
|
||||
return etag, ErrInternalError
|
||||
}
|
||||
var ret UploadResult
|
||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||
if unmarshal_err != nil {
|
||||
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
|
||||
return etag, ErrInternalError
|
||||
}
|
||||
if ret.Error != "" {
|
||||
glog.Errorf("upload to filer error: %v", ret.Error)
|
||||
return etag, ErrInternalError
|
||||
}
|
||||
|
||||
return etag, ErrNone
|
||||
}
|
||||
|
||||
func setEtag(w http.ResponseWriter, etag string) {
|
||||
if etag != "" {
|
||||
w.Header().Set("ETag", "\""+etag+"\"")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user