mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-20 07:02:09 +08:00
Fixed weed mount reads with jwt.signing.read.key (#7061)
This commit is contained in:
parent
f5c53b1bd8
commit
9fadd9def8
@ -122,7 +122,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
|
||||
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||
return 0, err
|
||||
}
|
||||
return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset)
|
||||
return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId)
|
||||
}
|
||||
|
||||
func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
|
||||
|
@ -178,7 +178,7 @@ func (s *SingleChunkCacher) startCaching() {
|
||||
|
||||
s.data = mem.Allocate(s.chunkSize)
|
||||
|
||||
_, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
|
||||
_, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
|
||||
if s.err != nil {
|
||||
mem.Free(s.data)
|
||||
s.data = nil
|
||||
|
@ -196,7 +196,7 @@ func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterCl
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
|
||||
n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, chunkView.FileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
@ -18,10 +19,24 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
)
|
||||
|
||||
var ErrNotFound = fmt.Errorf("not found")
|
||||
|
||||
var (
|
||||
jwtSigningReadKey security.SigningKey
|
||||
jwtSigningReadKeyExpires int
|
||||
loadJwtConfigOnce sync.Once
|
||||
)
|
||||
|
||||
func loadJwtConfig() {
|
||||
v := util.GetViper()
|
||||
jwtSigningReadKey = security.SigningKey(v.GetString("jwt.signing.read.key"))
|
||||
jwtSigningReadKeyExpires = v.GetInt("jwt.signing.read.expires_after_seconds")
|
||||
}
|
||||
|
||||
func Post(url string, values url.Values) ([]byte, error) {
|
||||
r, err := GetGlobalHttpClient().PostForm(url, values)
|
||||
if err != nil {
|
||||
@ -452,7 +467,17 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
|
||||
return n, err
|
||||
}
|
||||
|
||||
func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
|
||||
func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, fileId string) (n int, err error) {
|
||||
|
||||
loadJwtConfigOnce.Do(loadJwtConfig)
|
||||
var jwt security.EncodedJwt
|
||||
if len(jwtSigningReadKey) > 0 {
|
||||
jwt = security.GenJwtForVolumeServer(
|
||||
jwtSigningReadKey,
|
||||
jwtSigningReadKeyExpires,
|
||||
fileId,
|
||||
)
|
||||
}
|
||||
|
||||
var shouldRetry bool
|
||||
|
||||
@ -462,7 +487,7 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
|
||||
if strings.Contains(urlString, "%") {
|
||||
urlString = url.PathEscape(urlString)
|
||||
}
|
||||
shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
|
||||
shouldRetry, err = ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
|
||||
if n < len(buffer) {
|
||||
x := copy(buffer[n:], data)
|
||||
n += x
|
||||
|
Loading…
Reference in New Issue
Block a user