read from alternative replica

related to https://github.com/chrislusf/seaweedfs/issues/1512
This commit is contained in:
Chris Lu
2020-10-07 22:49:04 -07:00
parent da4edf3651
commit a8624c2e4f
12 changed files with 185 additions and 150 deletions

View File

@@ -0,0 +1,40 @@
package repl_util
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
)
func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
for _, chunk := range chunkViews {
fileUrls, err := filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
var writeErr error
for _, fileUrl := range fileUrls {
err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else if writeErr != nil {
glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
} else {
break
}
}
if err != nil {
return err
}
}
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"net/url"
"strings"
@@ -107,25 +108,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return err
}
for _, chunk := range chunkViews {
fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
var writeErr error
readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
})
if readErr != nil {
return readErr
}
if writeErr != nil {
return writeErr
}
writeFunc := func(data []byte) error {
_, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
return writeErr
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
return nil

View File

@@ -2,6 +2,7 @@ package B2Sink
import (
"context"
"github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background())
for _, chunk := range chunkViews {
fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
var writeErr error
readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err := writer.Write(data)
if err != nil {
writeErr = err
}
})
if readErr != nil {
return readErr
}
if writeErr != nil {
return writeErr
}
writeFunc := func(data []byte) error {
_, writeErr := writer.Write(data)
return writeErr
}
return writer.Close()
defer writer.Close()
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
return nil
}

View File

@@ -3,6 +3,7 @@ package gcssink
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"os"
"cloud.google.com/go/storage"
@@ -93,25 +94,14 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
defer wc.Close()
for _, chunk := range chunkViews {
fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
wc.Write(data)
})
if err != nil {
return err
}
writeFunc := func(data []byte) error {
_, writeErr := wc.Write(data)
return writeErr
}
if err := wc.Close(); err != nil {
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}

View File

@@ -157,11 +157,18 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou
}
func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) {
fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId)
fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return nil, err
}
buf := make([]byte, chunk.Size)
util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else {
break
}
}
return bytes.NewReader(buf), nil
}

View File

@@ -41,7 +41,7 @@ func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error)
return nil
}
func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
@@ -64,29 +64,38 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
if err != nil {
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
}
return
}
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
fileUrl, err := fs.LookupFileId(part)
fileUrls, err := fs.LookupFileId(part)
if err != nil {
return "", nil, nil, err
}
filename, header, resp, err = util.DownloadFile(fileUrl)
for _, fileUrl := range fileUrls {
filename, header, resp, err = util.DownloadFile(fileUrl)
if err != nil {
glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
} else {
break
}
}
return filename, header, resp, err
}