mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-08 09:34:46 +08:00
refactoring to get master function, instead of passing master values directly
this will enable retrying later
This commit is contained in:
@@ -37,7 +37,7 @@ func main() {
|
|||||||
sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
|
sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
|
err := operation.TailVolume(func()string{return *master}, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
|
||||||
if n.Size == 0 {
|
if n.Size == 0 {
|
||||||
println("-", n.String())
|
println("-", n.String())
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool {
|
|||||||
vid := needle.VolumeId(*s.volumeId)
|
vid := needle.VolumeId(*s.volumeId)
|
||||||
|
|
||||||
// find volume location, replication, ttl info
|
// find volume location, replication, ttl info
|
||||||
lookup, err := operation.Lookup(*s.master, vid.String())
|
lookup, err := operation.Lookup(func()string{return *s.master}, vid.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
|
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -238,12 +238,12 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||||||
Replication: *b.replication,
|
Replication: *b.replication,
|
||||||
DiskType: *b.diskType,
|
DiskType: *b.diskType,
|
||||||
}
|
}
|
||||||
if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
|
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
|
||||||
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
|
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
|
||||||
if !isSecure && assignResult.Auth != "" {
|
if !isSecure && assignResult.Auth != "" {
|
||||||
isSecure = true
|
isSecure = true
|
||||||
}
|
}
|
||||||
if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
|
if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
|
||||||
if random.Intn(100) < *b.deletePercentage {
|
if random.Intn(100) < *b.deletePercentage {
|
||||||
s.total++
|
s.total++
|
||||||
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
|
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
|
||||||
|
|||||||
@@ -44,15 +44,15 @@ var cmdDownload = &Command{
|
|||||||
|
|
||||||
func runDownload(cmd *Command, args []string) bool {
|
func runDownload(cmd *Command, args []string) bool {
|
||||||
for _, fid := range args {
|
for _, fid := range args {
|
||||||
if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil {
|
if e := downloadToFile(func()string{return *d.server}, fid, util.ResolvePath(*d.dir)); e != nil {
|
||||||
fmt.Println("Download Error: ", fid, e)
|
fmt.Println("Download Error: ", fid, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func downloadToFile(server, fileId, saveDir string) error {
|
func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) error {
|
||||||
fileUrl, lookupError := operation.LookupFileId(server, fileId)
|
fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
|
||||||
if lookupError != nil {
|
if lookupError != nil {
|
||||||
return lookupError
|
return lookupError
|
||||||
}
|
}
|
||||||
@@ -83,7 +83,7 @@ func downloadToFile(server, fileId, saveDir string) error {
|
|||||||
fids := strings.Split(string(content), "\n")
|
fids := strings.Split(string(content), "\n")
|
||||||
for _, partId := range fids {
|
for _, partId := range fids {
|
||||||
var n int
|
var n int
|
||||||
_, part, err := fetchContent(*d.server, partId)
|
_, part, err := fetchContent(masterFn, partId)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
n, err = f.Write(part)
|
n, err = f.Write(part)
|
||||||
}
|
}
|
||||||
@@ -103,8 +103,8 @@ func downloadToFile(server, fileId, saveDir string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
|
func fetchContent(masterFn operation.GetMasterFn, fileId string) (filename string, content []byte, e error) {
|
||||||
fileUrl, lookupError := operation.LookupFileId(server, fileId)
|
fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
|
||||||
if lookupError != nil {
|
if lookupError != nil {
|
||||||
return "", nil, lookupError
|
return "", nil, lookupError
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -463,7 +463,9 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
|
|||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
fileIds = append(fileIds, chunk.FileId)
|
fileIds = append(fileIds, chunk.FileId)
|
||||||
}
|
}
|
||||||
operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds)
|
operation.DeleteFiles(func() string {
|
||||||
|
return copy.masters[0]
|
||||||
|
}, false, worker.options.grpcDialOption, fileIds)
|
||||||
return uploadError
|
return uploadError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool {
|
|||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
|
results, e := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
if e != nil {
|
if e != nil {
|
||||||
@@ -113,7 +113,7 @@ func runUpload(cmd *Command, args []string) bool {
|
|||||||
if e != nil {
|
if e != nil {
|
||||||
fmt.Println(e.Error())
|
fmt.Println(e.Error())
|
||||||
}
|
}
|
||||||
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
|
results, _ := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
|
|||||||
WritableVolumeCount: rule.VolumeGrowthCount,
|
WritableVolumeCount: rule.VolumeGrowthCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
|
assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
|
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ type AssignResult struct {
|
|||||||
Auth security.EncodedJwt `json:"auth,omitempty"`
|
Auth security.EncodedJwt `json:"auth,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
|
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
|
||||||
|
|
||||||
var requests []*VolumeAssignRequest
|
var requests []*VolumeAssignRequest
|
||||||
requests = append(requests, primaryRequest)
|
requests = append(requests, primaryRequest)
|
||||||
@@ -48,7 +48,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
req := &master_pb.AssignRequest{
|
req := &master_pb.AssignRequest{
|
||||||
Count: request.Count,
|
Count: request.Count,
|
||||||
|
|||||||
@@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
|
|||||||
return json.Marshal(cm)
|
return json.Marshal(cm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
|
func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
|
||||||
var fileIds []string
|
var fileIds []string
|
||||||
for _, ci := range cm.Chunks {
|
for _, ci := range cm.Chunks {
|
||||||
fileIds = append(fileIds, ci.Fid)
|
fileIds = append(fileIds, ci.Fid)
|
||||||
}
|
}
|
||||||
results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds)
|
results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("delete %+v: %v", fileIds, err)
|
glog.V(0).Infof("delete %+v: %v", fileIds, err)
|
||||||
return fmt.Errorf("chunk delete: %v", err)
|
return fmt.Errorf("chunk delete: %v", err)
|
||||||
@@ -174,7 +174,9 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
|
|||||||
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
|
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
|
||||||
ci := cf.chunkList[chunkIndex]
|
ci := cf.chunkList[chunkIndex]
|
||||||
// if we need read date from local volume server first?
|
// if we need read date from local volume server first?
|
||||||
fileUrl, lookupError := LookupFileId(cf.master, ci.Fid)
|
fileUrl, lookupError := LookupFileId(func() string {
|
||||||
|
return cf.master
|
||||||
|
}, ci.Fid)
|
||||||
if lookupError != nil {
|
if lookupError != nil {
|
||||||
return n, lookupError
|
return n, lookupError
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,10 +28,10 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteFiles batch deletes a list of fileIds
|
// DeleteFiles batch deletes a list of fileIds
|
||||||
func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
|
func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
|
||||||
|
|
||||||
lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
|
lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
|
||||||
results, err = LookupVolumeIds(master, grpcDialOption, vids)
|
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
|
||||||
if err == nil && usePublicUrl {
|
if err == nil && usePublicUrl {
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
for _, loc := range result.Locations {
|
for _, loc := range result.Locations {
|
||||||
|
|||||||
@@ -33,10 +33,10 @@ var (
|
|||||||
vc VidCache // caching of volume locations, re-check if after 10 minutes
|
vc VidCache // caching of volume locations, re-check if after 10 minutes
|
||||||
)
|
)
|
||||||
|
|
||||||
func Lookup(server string, vid string) (ret *LookupResult, err error) {
|
func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) {
|
||||||
locations, cache_err := vc.Get(vid)
|
locations, cache_err := vc.Get(vid)
|
||||||
if cache_err != nil {
|
if cache_err != nil {
|
||||||
if ret, err = do_lookup(server, vid); err == nil {
|
if ret, err = do_lookup(masterFn, vid); err == nil {
|
||||||
vc.Set(vid, ret.Locations, 10*time.Minute)
|
vc.Set(vid, ret.Locations, 10*time.Minute)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func do_lookup(server string, vid string) (*LookupResult, error) {
|
func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("volumeId", vid)
|
values.Add("volumeId", vid)
|
||||||
|
server := masterFn()
|
||||||
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
|
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
|
|||||||
return &ret, nil
|
return &ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
|
func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) {
|
||||||
parts := strings.Split(fileId, ",")
|
parts := strings.Split(fileId, ",")
|
||||||
if len(parts) != 2 {
|
if len(parts) != 2 {
|
||||||
return "", errors.New("Invalid fileId " + fileId)
|
return "", errors.New("Invalid fileId " + fileId)
|
||||||
}
|
}
|
||||||
lookup, lookupError := Lookup(server, parts[0])
|
lookup, lookupError := Lookup(masterFn, parts[0])
|
||||||
if lookupError != nil {
|
if lookupError != nil {
|
||||||
return "", lookupError
|
return "", lookupError
|
||||||
}
|
}
|
||||||
@@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LookupVolumeIds find volume locations by cache and actual lookup
|
// LookupVolumeIds find volume locations by cache and actual lookup
|
||||||
func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
|
func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
|
||||||
ret := make(map[string]LookupResult)
|
ret := make(map[string]LookupResult)
|
||||||
var unknown_vids []string
|
var unknown_vids []string
|
||||||
|
|
||||||
@@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
|
|||||||
|
|
||||||
//only query unknown_vids
|
//only query unknown_vids
|
||||||
|
|
||||||
err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
req := &master_pb.LookupVolumeRequest{
|
req := &master_pb.LookupVolumeRequest{
|
||||||
VolumeIds: unknown_vids,
|
VolumeIds: unknown_vids,
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ type SubmitResult struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
|
type GetMasterFn func() string
|
||||||
|
|
||||||
|
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
|
||||||
results := make([]SubmitResult, len(files))
|
results := make([]SubmitResult, len(files))
|
||||||
for index, file := range files {
|
for index, file := range files {
|
||||||
results[index].FileName = file.FileName
|
results[index].FileName = file.FileName
|
||||||
@@ -52,7 +54,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
|
|||||||
Ttl: ttl,
|
Ttl: ttl,
|
||||||
DiskType: diskType,
|
DiskType: diskType,
|
||||||
}
|
}
|
||||||
ret, err := Assign(master, grpcDialOption, ar)
|
ret, err := Assign(masterFn, grpcDialOption, ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for index := range files {
|
for index := range files {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
@@ -73,7 +75,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
|
|||||||
file.DataCenter = dataCenter
|
file.DataCenter = dataCenter
|
||||||
file.Ttl = ttl
|
file.Ttl = ttl
|
||||||
file.DiskType = diskType
|
file.DiskType = diskType
|
||||||
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
|
results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
}
|
}
|
||||||
@@ -116,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
|
|||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
|
func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
|
||||||
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
||||||
if fi.ModTime != 0 {
|
if fi.ModTime != 0 {
|
||||||
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
||||||
@@ -148,7 +150,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
|
|||||||
Ttl: fi.Ttl,
|
Ttl: fi.Ttl,
|
||||||
DiskType: fi.DiskType,
|
DiskType: fi.DiskType,
|
||||||
}
|
}
|
||||||
ret, err = Assign(master, grpcDialOption, ar)
|
ret, err = Assign(masterFn, grpcDialOption, ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -162,10 +164,10 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
|
|||||||
Ttl: fi.Ttl,
|
Ttl: fi.Ttl,
|
||||||
DiskType: fi.DiskType,
|
DiskType: fi.DiskType,
|
||||||
}
|
}
|
||||||
ret, err = Assign(master, grpcDialOption, ar)
|
ret, err = Assign(masterFn, grpcDialOption, ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// delete all uploaded chunks
|
// delete all uploaded chunks
|
||||||
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
id = ret.Fid
|
id = ret.Fid
|
||||||
@@ -182,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
|
|||||||
count, e := upload_one_chunk(
|
count, e := upload_one_chunk(
|
||||||
baseName+"-"+strconv.FormatInt(i+1, 10),
|
baseName+"-"+strconv.FormatInt(i+1, 10),
|
||||||
io.LimitReader(fi.Reader, chunkSize),
|
io.LimitReader(fi.Reader, chunkSize),
|
||||||
master, fileUrl,
|
masterFn, fileUrl,
|
||||||
ret.Auth)
|
ret.Auth)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
// delete all uploaded chunks
|
// delete all uploaded chunks
|
||||||
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
cm.Chunks = append(cm.Chunks,
|
cm.Chunks = append(cm.Chunks,
|
||||||
@@ -201,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
|
|||||||
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
|
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// delete all uploaded chunks
|
// delete all uploaded chunks
|
||||||
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
|
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
|
ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
|
||||||
@@ -213,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_one_chunk(filename string, reader io.Reader, master,
|
func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
|
||||||
fileUrl string, jwt security.EncodedJwt,
|
fileUrl string, jwt security.EncodedJwt,
|
||||||
) (size uint32, e error) {
|
) (size uint32, e error) {
|
||||||
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
||||||
|
|||||||
@@ -11,9 +11,9 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
|
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
|
||||||
// find volume location, replication, ttl info
|
// find volume location, replication, ttl info
|
||||||
lookup, err := Lookup(master, vid.String())
|
lookup, err := Lookup(masterFn, vid.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("look up volume %d: %v", vid, err)
|
return fmt.Errorf("look up volume %d: %v", vid, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ func debug(params ...interface{}) {
|
|||||||
glog.V(4).Infoln(params...)
|
glog.V(4).Infoln(params...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
|
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
|
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
|
||||||
@@ -133,7 +133,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
|
|||||||
Ttl: r.FormValue("ttl"),
|
Ttl: r.FormValue("ttl"),
|
||||||
DiskType: r.FormValue("disk"),
|
DiskType: r.FormValue("disk"),
|
||||||
}
|
}
|
||||||
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
|
assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, ae)
|
writeJsonError(w, r, http.StatusInternalServerError, ae)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -337,7 +337,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
|
|||||||
|
|
||||||
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
|
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
|
||||||
|
|
||||||
assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
|
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("AssignVolume: %v", err)
|
glog.V(3).Infof("AssignVolume: %v", err)
|
||||||
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
|
|||||||
|
|
||||||
ar, altRequest := so.ToAssignRequests(1)
|
ar, altRequest := so.ToAssignRequests(1)
|
||||||
|
|
||||||
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
|
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
glog.Errorf("failing to assign a file id: %v", ae)
|
glog.Errorf("failing to assign a file id: %v", ae)
|
||||||
err = ae
|
err = ae
|
||||||
|
|||||||
@@ -125,13 +125,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
|
|||||||
}
|
}
|
||||||
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if ms.Topo.IsLeader() {
|
if ms.Topo.IsLeader() {
|
||||||
submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption)
|
submitForClientHandler(w, r, func()string{return ms.selfUrl(r)}, ms.grpcDialOption)
|
||||||
} else {
|
} else {
|
||||||
masterUrl, err := ms.Topo.Leader()
|
masterUrl, err := ms.Topo.Leader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||||
} else {
|
} else {
|
||||||
submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
|
submitForClientHandler(w, r, func()string{return masterUrl}, ms.grpcDialOption)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
|
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
|
||||||
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
|
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
|
||||||
if err == nil && len(lookupResult.Locations) > 0 {
|
if err == nil && len(lookupResult.Locations) > 0 {
|
||||||
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
|
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret := operation.UploadResult{}
|
ret := operation.UploadResult{}
|
||||||
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, reqNeedle, r)
|
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r)
|
||||||
|
|
||||||
// http 204 status code does not allow body
|
// http 204 status code does not allow body
|
||||||
if writeError == nil && isUnchanged {
|
if writeError == nil && isUnchanged {
|
||||||
@@ -128,7 +128,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// make sure all chunks had deleted before delete manifest
|
// make sure all chunks had deleted before delete manifest
|
||||||
if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil {
|
if e := chunkManifest.DeleteChunks(vs.GetMaster, false, vs.grpcDialOption); e != nil {
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
|
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -143,7 +143,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
|
_, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r)
|
||||||
|
|
||||||
writeDeleteResult(err, count, w, r)
|
writeDeleteResult(err, count, w, r)
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
|
func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
|
||||||
|
|
||||||
//check JWT
|
//check JWT
|
||||||
jwt := security.GetJwt(r)
|
jwt := security.GetJwt(r)
|
||||||
@@ -27,7 +27,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
|
|||||||
var remoteLocations []operation.Location
|
var remoteLocations []operation.Location
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
// this is the initial request
|
// this is the initial request
|
||||||
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
|
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return
|
return
|
||||||
@@ -92,7 +92,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReplicatedDelete(masterNode string, store *storage.Store,
|
func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store,
|
||||||
volumeId needle.VolumeId, n *needle.Needle,
|
volumeId needle.VolumeId, n *needle.Needle,
|
||||||
r *http.Request) (size types.Size, err error) {
|
r *http.Request) (size types.Size, err error) {
|
||||||
|
|
||||||
@@ -101,7 +101,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
|
|||||||
|
|
||||||
var remoteLocations []operation.Location
|
var remoteLocations []operation.Location
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
|
remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return
|
return
|
||||||
@@ -161,7 +161,7 @@ func distributedOperation(locations []operation.Location, store *storage.Store,
|
|||||||
return ret.Error()
|
return ret.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) (
|
func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (
|
||||||
remoteLocations []operation.Location, err error) {
|
remoteLocations []operation.Location, err error) {
|
||||||
|
|
||||||
v := s.GetVolume(volumeId)
|
v := s.GetVolume(volumeId)
|
||||||
@@ -170,7 +170,7 @@ func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, m
|
|||||||
}
|
}
|
||||||
|
|
||||||
// not on local store, or has replications
|
// not on local store, or has replications
|
||||||
lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String())
|
lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String())
|
||||||
if lookupErr == nil {
|
if lookupErr == nil {
|
||||||
selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
|
selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
|
||||||
for _, location := range lookupResult.Locations {
|
for _, location := range lookupResult.Locations {
|
||||||
|
|||||||
Reference in New Issue
Block a user