mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-08 19:24:46 +08:00
Changes logging function (#6919)
* updated logging methods for stores * updated logging methods for stores * updated logging methods for filer * updated logging methods for uploader and http_util * updated logging methods for weed server --------- Co-authored-by: akosov <a.kosov@kryptonite.ru>
This commit is contained in:
@@ -169,7 +169,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
|
|||||||
if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") {
|
if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") {
|
||||||
// now the insert failed possibly due to duplication constraints
|
// now the insert failed possibly due to duplication constraints
|
||||||
sqlInsert = "falls back to update"
|
sqlInsert = "falls back to update"
|
||||||
glog.V(1).Infof("insert %s %s: %v", entry.FullPath, sqlInsert, err)
|
glog.V(1).InfofCtx(ctx, "insert %s %s: %v", entry.FullPath, sqlInsert, err)
|
||||||
res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
|
res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -277,7 +277,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
|
glog.V(4).InfofCtx(ctx, "delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
|
||||||
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
|
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
|
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
|
||||||
@@ -312,7 +312,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
|
|||||||
var name string
|
var name string
|
||||||
var data []byte
|
var data []byte
|
||||||
if err = rows.Scan(&name, &data); err != nil {
|
if err = rows.Scan(&name, &data); err != nil {
|
||||||
glog.V(0).Infof("scan %s : %v", dirPath, err)
|
glog.V(0).InfofCtx(ctx, "scan %s : %v", dirPath, err)
|
||||||
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
|
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
|
||||||
}
|
}
|
||||||
lastFileName = name
|
lastFileName = name
|
||||||
@@ -321,7 +321,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
|
|||||||
FullPath: util.NewFullPath(string(dirPath), name),
|
FullPath: util.NewFullPath(string(dirPath), name),
|
||||||
}
|
}
|
||||||
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
|
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
|
||||||
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "scan decode %s : %v", entry.FullPath, err)
|
||||||
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
|
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now the insert failed possibly due to duplication constraints
|
// now the insert failed possibly due to duplication constraints
|
||||||
glog.V(1).Infof("kv insert falls back to update: %s", err)
|
glog.V(1).InfofCtx(ctx, "kv insert falls back to update: %s", err)
|
||||||
|
|
||||||
res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
|
res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -233,7 +233,7 @@ func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPat
|
|||||||
if driver.IsNotFound(err) {
|
if driver.IsNotFound(err) {
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
glog.Errorf("find %s: %v", fullpath, err)
|
glog.ErrorfCtx(ctx, "find %s: %v", fullpath, err)
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
if len(data.Meta) == 0 {
|
if len(data.Meta) == 0 {
|
||||||
@@ -257,7 +257,7 @@ func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullP
|
|||||||
}
|
}
|
||||||
_, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
|
_, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
|
||||||
if err != nil && !driver.IsNotFound(err) {
|
if err != nil && !driver.IsNotFound(err) {
|
||||||
glog.Errorf("find %s: %v", fullpath, err)
|
glog.ErrorfCtx(ctx, "find %s: %v", fullpath, err)
|
||||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -331,7 +331,7 @@ sort d.name asc
|
|||||||
converted := arrayToBytes(data.Meta)
|
converted := arrayToBytes(data.Meta)
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte
|
|||||||
return nil, filer.ErrKvNotFound
|
return nil, filer.ErrKvNotFound
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("kv get: %s %v", string(key), err)
|
glog.ErrorfCtx(ctx, "kv get: %s %v", string(key), err)
|
||||||
return nil, filer.ErrKvNotFound
|
return nil, filer.ErrKvNotFound
|
||||||
}
|
}
|
||||||
return arrayToBytes(model.Meta), nil
|
return arrayToBytes(model.Meta), nil
|
||||||
@@ -47,7 +47,7 @@ func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte
|
|||||||
func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
|
func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
|
||||||
_, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key)))
|
_, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("kv del: %v", err)
|
glog.ErrorfCtx(ctx, "kv del: %v", err)
|
||||||
return filer.ErrKvNotFound
|
return filer.ErrKvNotFound
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -4,9 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gocql/gocql"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
@@ -202,7 +203,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
|
|||||||
lastFileName = name
|
lastFileName = name
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
@@ -210,7 +211,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = iter.Close(); err != nil {
|
if err = iter.Close(); err != nil {
|
||||||
glog.V(0).Infof("list iterator close: %v", err)
|
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastFileName, err
|
return lastFileName, err
|
||||||
|
|||||||
@@ -4,9 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gocql/gocql"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
@@ -202,7 +203,7 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath
|
|||||||
lastFileName = name
|
lastFileName = name
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
@@ -210,7 +211,7 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = iter.Close(); err != nil {
|
if err = iter.Close(); err != nil {
|
||||||
glog.V(0).Infof("list iterator close: %v", err)
|
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastFileName, err
|
return lastFileName, err
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
|
|||||||
}
|
}
|
||||||
value, err := jsoniter.Marshal(esEntry)
|
value, err := jsoniter.Marshal(esEntry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
glog.ErrorfCtx(ctx, "insert entry(%s) %v.", string(entry.FullPath), err)
|
||||||
return fmt.Errorf("insert entry marshal %v", err)
|
return fmt.Errorf("insert entry marshal %v", err)
|
||||||
}
|
}
|
||||||
_, err = store.client.Index().
|
_, err = store.client.Index().
|
||||||
@@ -123,7 +123,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
|
|||||||
BodyJson(string(value)).
|
BodyJson(string(value)).
|
||||||
Do(ctx)
|
Do(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
glog.ErrorfCtx(ctx, "insert entry(%s) %v.", string(entry.FullPath), err)
|
||||||
return fmt.Errorf("insert entry %v", err)
|
return fmt.Errorf("insert entry %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -152,7 +152,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
|
|||||||
err := jsoniter.Unmarshal(searchResult.Source, esEntry)
|
err := jsoniter.Unmarshal(searchResult.Source, esEntry)
|
||||||
return esEntry.Entry, err
|
return esEntry.Entry, err
|
||||||
}
|
}
|
||||||
glog.Errorf("find entry(%s),%v.", string(fullpath), err)
|
glog.ErrorfCtx(ctx, "find entry(%s),%v.", string(fullpath), err)
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -178,7 +178,7 @@ func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err e
|
|||||||
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
|
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
glog.Errorf("delete index(%s) %v.", index, err)
|
glog.ErrorfCtx(ctx, "delete index(%s) %v.", index, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -193,14 +193,14 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
|
glog.ErrorfCtx(ctx, "delete entry(index:%s,_id:%s) %v.", index, id, err)
|
||||||
return fmt.Errorf("delete entry %v", err)
|
return fmt.Errorf("delete entry %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||||
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
|
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
|
||||||
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
|
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
|
||||||
glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
|
glog.ErrorfCtx(ctx, "elastic delete %s: %v.", entry.FullPath, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
@@ -228,7 +228,7 @@ func (store *ElasticStore) listDirectoryEntries(
|
|||||||
result := &elastic.SearchResult{}
|
result := &elastic.SearchResult{}
|
||||||
if (startFileName == "" && first) || inclusive {
|
if (startFileName == "" && first) || inclusive {
|
||||||
if result, err = store.search(ctx, index, parentId); err != nil {
|
if result, err = store.search(ctx, index, parentId); err != nil {
|
||||||
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
glog.ErrorfCtx(ctx, "search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -238,7 +238,7 @@ func (store *ElasticStore) listDirectoryEntries(
|
|||||||
}
|
}
|
||||||
after := weed_util.Md5String([]byte(fullPath))
|
after := weed_util.Md5String([]byte(fullPath))
|
||||||
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
|
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
|
||||||
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
glog.ErrorfCtx(ctx, "searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.Errorf("delete key(id:%s) %v.", string(key), err)
|
glog.ErrorfCtx(ctx, "delete key(id:%s) %v.", string(key), err)
|
||||||
return fmt.Errorf("delete key %v", err)
|
return fmt.Errorf("delete key %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,7 +44,7 @@ func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte,
|
|||||||
return esEntry.Value, nil
|
return esEntry.Value, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.Errorf("find key(%s),%v.", string(key), err)
|
glog.ErrorfCtx(ctx, "find key(%s),%v.", string(key), err)
|
||||||
return value, filer.ErrKvNotFound
|
return value, filer.ErrKvNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +52,7 @@ func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte)
|
|||||||
esEntry := &ESKVEntry{value}
|
esEntry := &ESKVEntry{value}
|
||||||
val, err := jsoniter.Marshal(esEntry)
|
val, err := jsoniter.Marshal(esEntry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("insert key(%s) %v.", string(key), err)
|
glog.ErrorfCtx(ctx, "insert key(%s) %v.", string(key), err)
|
||||||
return fmt.Errorf("insert key %v", err)
|
return fmt.Errorf("insert key %v", err)
|
||||||
}
|
}
|
||||||
_, err = store.client.Index().
|
_, err = store.client.Index().
|
||||||
|
|||||||
@@ -4,10 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||||
|
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
@@ -95,7 +96,7 @@ func (store *EtcdStore) initialize(servers, username, password string, timeout t
|
|||||||
return fmt.Errorf("error checking etcd connection: %s", err)
|
return fmt.Errorf("error checking etcd connection: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("сonnection to etcd has been successfully verified. etcd version: %s", resp.Version)
|
glog.V(0).InfofCtx(ctx, "сonnection to etcd has been successfully verified. etcd version: %s", resp.Version)
|
||||||
store.client = client
|
store.client = client
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -208,7 +209,7 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
|
|||||||
}
|
}
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ func ResolveOneChunkManifest(ctx context.Context, lookupFileIdFn wdclient.Lookup
|
|||||||
func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
|
func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
|
||||||
urlStrings, err := lookupFileIdFn(ctx, fileId)
|
urlStrings, err := lookupFileIdFn(ctx, fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
glog.ErrorfCtx(ctx, "operation LookupFileId %s failed, err: %v", fileId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
|
err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
|
||||||
@@ -159,7 +159,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
|
glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -169,7 +169,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil && shouldRetry {
|
if err != nil && shouldRetry {
|
||||||
glog.V(0).Infof("retry reading in %v", waitTime)
|
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
|
|||||||
@@ -220,19 +220,19 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
glog.V(4).InfofCtx(ctx, "InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
||||||
if err := f.Store.InsertEntry(ctx, entry); err != nil {
|
if err := f.Store.InsertEntry(ctx, entry); err != nil {
|
||||||
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
|
glog.ErrorfCtx(ctx, "insert entry %s: %v", entry.FullPath, err)
|
||||||
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
|
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if o_excl {
|
if o_excl {
|
||||||
glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
|
glog.V(3).InfofCtx(ctx, "EEXIST: entry %s already exists", entry.FullPath)
|
||||||
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
|
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
|
glog.V(4).InfofCtx(ctx, "UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
|
||||||
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
|
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
|
||||||
glog.Errorf("update entry %s: %v", entry.FullPath, err)
|
glog.ErrorfCtx(ctx, "update entry %s: %v", entry.FullPath, err)
|
||||||
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
|
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -241,7 +241,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
|||||||
|
|
||||||
f.deleteChunksIfNotNew(ctx, oldEntry, entry)
|
f.deleteChunksIfNotNew(ctx, oldEntry, entry)
|
||||||
|
|
||||||
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
|
glog.V(4).InfofCtx(ctx, "CreateEntry %s: created", entry.FullPath)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -256,7 +256,7 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di
|
|||||||
// fmt.Printf("%d dirPath: %+v\n", level, dirPath)
|
// fmt.Printf("%d dirPath: %+v\n", level, dirPath)
|
||||||
|
|
||||||
// check the store directly
|
// check the store directly
|
||||||
glog.V(4).Infof("find uncached directory: %s", dirPath)
|
glog.V(4).InfofCtx(ctx, "find uncached directory: %s", dirPath)
|
||||||
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
|
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
|
||||||
|
|
||||||
// no such existing directory
|
// no such existing directory
|
||||||
@@ -291,11 +291,11 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
|
glog.V(2).InfofCtx(ctx, "create directory: %s %v", dirPath, dirEntry.Mode)
|
||||||
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
|
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
|
||||||
if mkdirErr != nil {
|
if mkdirErr != nil {
|
||||||
if fEntry, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound || fEntry == nil {
|
if fEntry, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound || fEntry == nil {
|
||||||
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
|
glog.V(3).InfofCtx(ctx, "mkdir %s: %v", dirPath, mkdirErr)
|
||||||
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -305,7 +305,7 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di
|
|||||||
}
|
}
|
||||||
|
|
||||||
} else if !dirEntry.IsDirectory() {
|
} else if !dirEntry.IsDirectory() {
|
||||||
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
|
glog.ErrorfCtx(ctx, "CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
|
||||||
return fmt.Errorf("%s is a file", dirPath)
|
return fmt.Errorf("%s is a file", dirPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -316,11 +316,11 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
|
|||||||
if oldEntry != nil {
|
if oldEntry != nil {
|
||||||
entry.Attr.Crtime = oldEntry.Attr.Crtime
|
entry.Attr.Crtime = oldEntry.Attr.Crtime
|
||||||
if oldEntry.IsDirectory() && !entry.IsDirectory() {
|
if oldEntry.IsDirectory() && !entry.IsDirectory() {
|
||||||
glog.Errorf("existing %s is a directory", oldEntry.FullPath)
|
glog.ErrorfCtx(ctx, "existing %s is a directory", oldEntry.FullPath)
|
||||||
return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
|
return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
|
||||||
}
|
}
|
||||||
if !oldEntry.IsDirectory() && entry.IsDirectory() {
|
if !oldEntry.IsDirectory() && entry.IsDirectory() {
|
||||||
glog.Errorf("existing %s is a file", oldEntry.FullPath)
|
glog.ErrorfCtx(ctx, "existing %s is a file", oldEntry.FullPath)
|
||||||
return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
|
return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("delete directory %s: %v", p, err)
|
glog.V(2).InfofCtx(ctx, "delete directory %s: %v", p, err)
|
||||||
return fmt.Errorf("delete directory %s: %v", p, err)
|
return fmt.Errorf("delete directory %s: %v", p, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,12 +74,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||||||
for {
|
for {
|
||||||
entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "")
|
entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("list folder %s: %v", entry.FullPath, err)
|
glog.ErrorfCtx(ctx, "list folder %s: %v", entry.FullPath, err)
|
||||||
return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
if lastFileName == "" && !isRecursive && len(entries) > 0 {
|
if lastFileName == "" && !isRecursive && len(entries) > 0 {
|
||||||
// only for first iteration in the loop
|
// only for first iteration in the loop
|
||||||
glog.V(2).Infof("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
|
glog.V(2).InfofCtx(ctx, "deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
|
||||||
return fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
|
return fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,7 +110,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("deleting directory %v delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
glog.V(3).InfofCtx(ctx, "deleting directory %v delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
||||||
|
|
||||||
if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
||||||
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||||
@@ -124,7 +124,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||||||
|
|
||||||
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
|
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
|
||||||
|
|
||||||
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
||||||
|
|
||||||
if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
|
if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
|
||||||
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||||
@@ -153,7 +153,7 @@ func (f *Filer) DoDeleteCollection(collectionName string) (err error) {
|
|||||||
func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLinkId) {
|
func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLinkId) {
|
||||||
for _, hardLinkId := range hardLinkIds {
|
for _, hardLinkId := range hardLinkIds {
|
||||||
if err := f.Store.DeleteHardLink(ctx, hardLinkId); err != nil {
|
if err := f.Store.DeleteHardLink(ctx, hardLinkId); err != nil {
|
||||||
glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
|
glog.ErrorfCtx(ctx, "delete hard link id %d : %v", hardLinkId, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ func (f *Filer) doDeleteChunks(ctx context.Context, chunks []*filer_pb.FileChunk
|
|||||||
}
|
}
|
||||||
dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk)
|
dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk)
|
||||||
if manifestResolveErr != nil {
|
if manifestResolveErr != nil {
|
||||||
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
glog.V(0).InfofCtx(ctx, "failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
||||||
}
|
}
|
||||||
for _, dChunk := range dataChunks {
|
for _, dChunk := range dataChunks {
|
||||||
f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
|
f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
|
||||||
@@ -119,7 +119,7 @@ func (f *Filer) deleteChunksIfNotNew(ctx context.Context, oldEntry, newEntry *En
|
|||||||
|
|
||||||
toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
|
toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
|
glog.ErrorfCtx(ctx, "Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
f.DeleteChunksNotRecursive(toDelete)
|
f.DeleteChunksNotRecursive(toDelete)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
)
|
||||||
@@ -31,7 +32,7 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
|
|||||||
|
|
||||||
// remove old hard link
|
// remove old hard link
|
||||||
if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
|
if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
|
||||||
glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
|
glog.V(4).InfofCtx(ctx, "handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
|
||||||
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -50,7 +51,7 @@ func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) err
|
|||||||
return encodeErr
|
return encodeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
|
glog.V(4).InfofCtx(ctx, "setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
|
||||||
|
|
||||||
return fsw.KvPut(ctx, key, newBlob)
|
return fsw.KvPut(ctx, key, newBlob)
|
||||||
}
|
}
|
||||||
@@ -63,16 +64,16 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
|
|||||||
|
|
||||||
value, err := fsw.KvGet(ctx, key)
|
value, err := fsw.KvGet(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
|
glog.ErrorfCtx(ctx, "read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = entry.DecodeAttributesAndChunks(value); err != nil {
|
if err = entry.DecodeAttributesAndChunks(value); err != nil {
|
||||||
glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
|
glog.ErrorfCtx(ctx, "decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
|
glog.V(4).InfofCtx(ctx, "maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -94,7 +95,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
|
|||||||
|
|
||||||
entry.HardLinkCounter--
|
entry.HardLinkCounter--
|
||||||
if entry.HardLinkCounter <= 0 {
|
if entry.HardLinkCounter <= 0 {
|
||||||
glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
|
glog.V(4).InfofCtx(ctx, "DeleteHardLink KvDelete %v", key)
|
||||||
return fsw.KvDelete(ctx, key)
|
return fsw.KvDelete(ctx, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,7 +104,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
|
|||||||
return encodeErr
|
return encodeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("DeleteHardLink KvPut %v", key)
|
glog.V(4).InfofCtx(ctx, "DeleteHardLink KvPut %v", key)
|
||||||
return fsw.KvPut(ctx, key, newBlob)
|
return fsw.KvPut(ctx, key, newBlob)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
|
|||||||
// remove hard link
|
// remove hard link
|
||||||
op := ctx.Value("OP")
|
op := ctx.Value("OP")
|
||||||
if op != "MV" {
|
if op != "MV" {
|
||||||
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
|
glog.V(4).InfofCtx(ctx, "DeleteHardLink %s", existingEntry.FullPath)
|
||||||
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -215,7 +215,7 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
|
|||||||
// remove hard link
|
// remove hard link
|
||||||
op := ctx.Value("OP")
|
op := ctx.Value("OP")
|
||||||
if op != "MV" {
|
if op != "MV" {
|
||||||
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
|
glog.V(4).InfofCtx(ctx, "DeleteHardLink %s", existingEntry.FullPath)
|
||||||
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -203,7 +203,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
|
|||||||
}
|
}
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -4,13 +4,14 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
|
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
|
||||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
|
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
@@ -205,7 +206,7 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
|
|||||||
}
|
}
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
|
|||||||
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -342,7 +342,7 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
|
|||||||
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -187,7 +187,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
|
|||||||
var where = bson.M{"directory": dir, "name": name}
|
var where = bson.M{"directory": dir, "name": name}
|
||||||
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
|
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
|
||||||
if err != mongo.ErrNoDocuments && err != nil {
|
if err != mongo.ErrNoDocuments && err != nil {
|
||||||
glog.Errorf("find %s: %v", fullpath, err)
|
glog.ErrorfCtx(ctx, "find %s: %v", fullpath, err)
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -272,7 +272,7 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
|
|||||||
lastFileName = data.Name
|
lastFileName = data.Name
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -283,7 +283,7 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := cur.Close(ctx); err != nil {
|
if err := cur.Close(ctx); err != nil {
|
||||||
glog.V(0).Infof("list iterator close: %v", err)
|
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastFileName, err
|
return lastFileName, err
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package mongodb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
@@ -37,7 +38,7 @@ func (store *MongodbStore) KvGet(ctx context.Context, key []byte) (value []byte,
|
|||||||
var where = bson.M{"directory": dir, "name": name}
|
var where = bson.M{"directory": dir, "name": name}
|
||||||
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
|
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
|
||||||
if err != mongo.ErrNoDocuments && err != nil {
|
if err != mongo.ErrNoDocuments && err != nil {
|
||||||
glog.Errorf("kv get: %v", err)
|
glog.ErrorfCtx(ctx, "kv get: %v", err)
|
||||||
return nil, filer.ErrKvNotFound
|
return nil, filer.ErrKvNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
|
|||||||
|
|
||||||
locations = resp.LocationsMap[vid]
|
locations = resp.LocationsMap[vid]
|
||||||
if locations == nil || len(locations.Locations) == 0 {
|
if locations == nil || len(locations.Locations) == 0 {
|
||||||
glog.V(0).Infof("failed to locate %s", fileId)
|
glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId)
|
||||||
return fmt.Errorf("failed to locate %s", fileId)
|
return fmt.Errorf("failed to locate %s", fileId)
|
||||||
}
|
}
|
||||||
vicCacheLock.Lock()
|
vicCacheLock.Lock()
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
|
|||||||
entry, err := store.FindEntry(ctx, path)
|
entry, err := store.FindEntry(ctx, path)
|
||||||
lastFileName = fileName
|
lastFileName = fileName
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("list %s : %v", path, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", path, err)
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -194,7 +194,7 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
|
|||||||
entry, err := store.FindEntry(ctx, path)
|
entry, err := store.FindEntry(ctx, path)
|
||||||
lastFileName = fileName
|
lastFileName = fileName
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("list %s : %v", path, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", path, err)
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package redis3
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
@@ -31,7 +32,7 @@ func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key stri
|
|||||||
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
|
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
|
||||||
|
|
||||||
if err := nameList.WriteName(name); err != nil {
|
if err := nameList.WriteName(name); err != nil {
|
||||||
glog.Errorf("add %s %s: %v", key, name, err)
|
glog.ErrorfCtx(ctx, "add %s %s: %v", key, name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,7 +101,7 @@ func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key s
|
|||||||
|
|
||||||
if err = nameList.ListNames("", func(name string) bool {
|
if err = nameList.ListNames("", func(name string) bool {
|
||||||
if err := onDeleteFn(name); err != nil {
|
if err := onDeleteFn(name); err != nil {
|
||||||
glog.Errorf("delete %s child %s: %v", key, name, err)
|
glog.ErrorfCtx(ctx, "delete %s child %s: %v", key, name, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -151,7 +151,7 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
|
|||||||
entry, err := store.FindEntry(ctx, path)
|
entry, err := store.FindEntry(ctx, path)
|
||||||
lastFileName = fileName
|
lastFileName = fileName
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("list %s : %v", path, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", path, err)
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -162,7 +162,7 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d
|
|||||||
entry, err := store.FindEntry(ctx, path)
|
entry, err := store.FindEntry(ctx, path)
|
||||||
lastFileName = fileName
|
lastFileName = fileName
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("list %s : %v", path, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", path, err)
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -266,7 +266,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
|
|||||||
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ func noJwtFunc(string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
||||||
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
|
glog.V(4).InfofCtx(ctx, "prepare to stream content for chunks: %d", len(chunks))
|
||||||
chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
||||||
|
|
||||||
fileId2Url := make(map[string][]string)
|
fileId2Url := make(map[string][]string)
|
||||||
@@ -96,15 +96,15 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
|
|||||||
if err == nil && len(urlStrings) > 0 {
|
if err == nil && len(urlStrings) > 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
|
glog.V(4).InfofCtx(ctx, "waiting for chunk: %s", chunkView.FileId)
|
||||||
time.Sleep(backoff)
|
time.Sleep(backoff)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if len(urlStrings) == 0 {
|
} else if len(urlStrings) == 0 {
|
||||||
errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
|
errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
|
||||||
glog.Error(errUrlNotFound)
|
glog.ErrorCtx(ctx, errUrlNotFound)
|
||||||
return nil, errUrlNotFound
|
return nil, errUrlNotFound
|
||||||
}
|
}
|
||||||
fileId2Url[chunkView.FileId] = urlStrings
|
fileId2Url[chunkView.FileId] = urlStrings
|
||||||
@@ -118,7 +118,7 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
|
|||||||
if offset < chunkView.ViewOffset {
|
if offset < chunkView.ViewOffset {
|
||||||
gap := chunkView.ViewOffset - offset
|
gap := chunkView.ViewOffset - offset
|
||||||
remaining -= gap
|
remaining -= gap
|
||||||
glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset)
|
glog.V(4).InfofCtx(ctx, "zero [%d,%d)", offset, chunkView.ViewOffset)
|
||||||
err := writeZero(writer, gap)
|
err := writeZero(writer, gap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
|
return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
|
||||||
@@ -140,7 +140,7 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
|
|||||||
downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
|
downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
|
||||||
}
|
}
|
||||||
if remaining > 0 {
|
if remaining > 0 {
|
||||||
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
|
glog.V(4).InfofCtx(ctx, "zero [%d,%d)", offset, offset+remaining)
|
||||||
err := writeZero(writer, remaining)
|
err := writeZero(writer, remaining)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
|
return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
|
||||||
@@ -192,7 +192,7 @@ func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterCl
|
|||||||
chunkView := x.Value
|
chunkView := x.Value
|
||||||
urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
|
urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -260,39 +260,39 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(results) < 1 {
|
if len(results) < 1 {
|
||||||
glog.Errorf("Can't find results, data is empty")
|
glog.ErrorfCtx(ctx, "Can't find results, data is empty")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, ok := results[0].([]interface{})
|
rows, ok := results[0].([]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Can't convert results[0] to list")
|
glog.ErrorfCtx(ctx, "Can't convert results[0] to list")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, result := range rows {
|
for _, result := range rows {
|
||||||
row, ok := result.([]interface{})
|
row, ok := result.([]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Can't convert result to list")
|
glog.ErrorfCtx(ctx, "Can't convert result to list")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(row) < 5 {
|
if len(row) < 5 {
|
||||||
glog.Errorf("Length of result is less than needed: %v", len(row))
|
glog.ErrorfCtx(ctx, "Length of result is less than needed: %v", len(row))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nameRaw := row[2]
|
nameRaw := row[2]
|
||||||
name, ok := nameRaw.(string)
|
name, ok := nameRaw.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw)
|
glog.ErrorfCtx(ctx, "Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
dataRaw := row[4]
|
dataRaw := row[4]
|
||||||
data, ok := dataRaw.(string)
|
data, ok := dataRaw.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw)
|
glog.ErrorfCtx(ctx, "Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -302,7 +302,7 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w
|
|||||||
lastFileName = name
|
lastFileName = name
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !eachEntryFunc(entry) {
|
if !eachEntryFunc(entry) {
|
||||||
|
|||||||
@@ -249,7 +249,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
|
|||||||
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
|
||||||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(iter.Value())); decodeErr != nil {
|
||||||
err = decodeErr
|
err = decodeErr
|
||||||
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
|
if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
|
||||||
|
|||||||
@@ -6,14 +6,15 @@ package ydb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ydb-platform/ydb-go-sdk/v3/query"
|
|
||||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ydb-platform/ydb-go-sdk/v3/query"
|
||||||
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
|
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
@@ -234,7 +235,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath)
|
|||||||
dir, name := fullpath.DirAndName()
|
dir, name := fullpath.DirAndName()
|
||||||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
|
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
|
||||||
q := withPragma(tablePathPrefix, deleteQuery)
|
q := withPragma(tablePathPrefix, deleteQuery)
|
||||||
glog.V(4).Infof("DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir)
|
glog.V(4).InfofCtx(ctx, "DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir)
|
||||||
queryParams := table.NewQueryParameters(
|
queryParams := table.NewQueryParameters(
|
||||||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
|
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
|
||||||
table.ValueParam("$directory", types.UTF8Value(*shortDir)),
|
table.ValueParam("$directory", types.UTF8Value(*shortDir)),
|
||||||
@@ -433,7 +434,7 @@ func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("deleted table %s", prefix)
|
glog.V(4).InfofCtx(ctx, "deleted table %s", prefix)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -446,11 +447,11 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
|
|||||||
}
|
}
|
||||||
|
|
||||||
prefixBuckets := store.dirBuckets + "/"
|
prefixBuckets := store.dirBuckets + "/"
|
||||||
glog.V(4).Infof("dir: %s, prefixBuckets: %s", *dir, prefixBuckets)
|
glog.V(4).InfofCtx(ctx, "dir: %s, prefixBuckets: %s", *dir, prefixBuckets)
|
||||||
if strings.HasPrefix(*dir, prefixBuckets) {
|
if strings.HasPrefix(*dir, prefixBuckets) {
|
||||||
// detect bucket
|
// detect bucket
|
||||||
bucketAndDir := (*dir)[len(prefixBuckets):]
|
bucketAndDir := (*dir)[len(prefixBuckets):]
|
||||||
glog.V(4).Infof("bucketAndDir: %s", bucketAndDir)
|
glog.V(4).InfofCtx(ctx, "bucketAndDir: %s", bucketAndDir)
|
||||||
var bucket string
|
var bucket string
|
||||||
if t := strings.Index(bucketAndDir, "/"); t > 0 {
|
if t := strings.Index(bucketAndDir, "/"); t > 0 {
|
||||||
bucket = bucketAndDir[:t]
|
bucket = bucketAndDir[:t]
|
||||||
@@ -465,17 +466,17 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
|
|||||||
defer store.dbsLock.Unlock()
|
defer store.dbsLock.Unlock()
|
||||||
|
|
||||||
if _, found := store.dbs[bucket]; !found {
|
if _, found := store.dbs[bucket]; !found {
|
||||||
glog.V(4).Infof("bucket %q not in cache, verifying existence via DescribeTable", bucket)
|
glog.V(4).InfofCtx(ctx, "bucket %q not in cache, verifying existence via DescribeTable", bucket)
|
||||||
tablePath := path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)
|
tablePath := path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)
|
||||||
err2 := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
err2 := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||||
_, err3 := s.DescribeTable(ctx, tablePath)
|
_, err3 := s.DescribeTable(ctx, tablePath)
|
||||||
return err3
|
return err3
|
||||||
})
|
})
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
glog.V(4).Infof("bucket %q not found (DescribeTable %s failed)", bucket, tablePath)
|
glog.V(4).InfofCtx(ctx, "bucket %q not found (DescribeTable %s failed)", bucket, tablePath)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("bucket %q exists, adding to cache", bucket)
|
glog.V(4).InfofCtx(ctx, "bucket %q exists, adding to cache", bucket)
|
||||||
store.dbs[bucket] = true
|
store.dbs[bucket] = true
|
||||||
}
|
}
|
||||||
bucketPrefix := path.Join(store.tablePathPrefix, bucket)
|
bucketPrefix := path.Join(store.tablePathPrefix, bucket)
|
||||||
@@ -487,7 +488,7 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
|
|||||||
func (store *YdbStore) ensureTables(ctx context.Context) error {
|
func (store *YdbStore) ensureTables(ctx context.Context) error {
|
||||||
prefixFull := store.tablePathPrefix
|
prefixFull := store.tablePathPrefix
|
||||||
|
|
||||||
glog.V(4).Infof("creating base table %s", prefixFull)
|
glog.V(4).InfofCtx(ctx, "creating base table %s", prefixFull)
|
||||||
baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE)
|
baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE)
|
||||||
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||||
return s.CreateTable(ctx, baseTable, store.createTableOptions()...)
|
return s.CreateTable(ctx, baseTable, store.createTableOptions()...)
|
||||||
@@ -495,17 +496,17 @@ func (store *YdbStore) ensureTables(ctx context.Context) error {
|
|||||||
return fmt.Errorf("failed to create base table %s: %v", baseTable, err)
|
return fmt.Errorf("failed to create base table %s: %v", baseTable, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("creating bucket tables")
|
glog.V(4).InfofCtx(ctx, "creating bucket tables")
|
||||||
if store.SupportBucketTable {
|
if store.SupportBucketTable {
|
||||||
store.dbsLock.Lock()
|
store.dbsLock.Lock()
|
||||||
defer store.dbsLock.Unlock()
|
defer store.dbsLock.Unlock()
|
||||||
for bucket := range store.dbs {
|
for bucket := range store.dbs {
|
||||||
glog.V(4).Infof("creating bucket table %s", bucket)
|
glog.V(4).InfofCtx(ctx, "creating bucket table %s", bucket)
|
||||||
bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE)
|
bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE)
|
||||||
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||||
return s.CreateTable(ctx, bucketTable, store.createTableOptions()...)
|
return s.CreateTable(ctx, bucketTable, store.createTableOptions()...)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
glog.Errorf("failed to create bucket table %s: %v", bucketTable, err)
|
glog.ErrorfCtx(ctx, "failed to create bucket table %s: %v", bucketTable, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -189,7 +189,7 @@ func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, op
|
|||||||
uploadResult.RetryCount = i
|
uploadResult.RetryCount = i
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.Warningf("uploading %d to %s: %v", i, option.UploadUrl, err)
|
glog.WarningfCtx(ctx, "uploading %d to %s: %v", i, option.UploadUrl, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -331,16 +331,16 @@ func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction
|
|||||||
|
|
||||||
file_writer, cp_err := body_writer.CreatePart(h)
|
file_writer, cp_err := body_writer.CreatePart(h)
|
||||||
if cp_err != nil {
|
if cp_err != nil {
|
||||||
glog.V(0).Infoln("error creating form file", cp_err.Error())
|
glog.V(0).InfolnCtx(ctx, "error creating form file", cp_err.Error())
|
||||||
return nil, cp_err
|
return nil, cp_err
|
||||||
}
|
}
|
||||||
if err := fillBufferFunction(file_writer); err != nil {
|
if err := fillBufferFunction(file_writer); err != nil {
|
||||||
glog.V(0).Infoln("error copying data", err)
|
glog.V(0).InfolnCtx(ctx, "error copying data", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
content_type := body_writer.FormDataContentType()
|
content_type := body_writer.FormDataContentType()
|
||||||
if err := body_writer.Close(); err != nil {
|
if err := body_writer.Close(); err != nil {
|
||||||
glog.V(0).Infoln("error closing body", err)
|
glog.V(0).InfolnCtx(ctx, "error closing body", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if option.BytesBuffer == nil {
|
if option.BytesBuffer == nil {
|
||||||
@@ -350,7 +350,7 @@ func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction
|
|||||||
}
|
}
|
||||||
req, postErr := http.NewRequest(http.MethodPost, option.UploadUrl, reqReader)
|
req, postErr := http.NewRequest(http.MethodPost, option.UploadUrl, reqReader)
|
||||||
if postErr != nil {
|
if postErr != nil {
|
||||||
glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr)
|
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
|
||||||
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", content_type)
|
req.Header.Set("Content-Type", content_type)
|
||||||
@@ -369,7 +369,7 @@ func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction
|
|||||||
if post_err != nil {
|
if post_err != nil {
|
||||||
if strings.Contains(post_err.Error(), "connection reset by peer") ||
|
if strings.Contains(post_err.Error(), "connection reset by peer") ||
|
||||||
strings.Contains(post_err.Error(), "use of closed network connection") {
|
strings.Contains(post_err.Error(), "use of closed network connection") {
|
||||||
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
|
glog.V(1).InfofCtx(ctx, "repeat error upload request %s: %v", option.UploadUrl, postErr)
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
|
||||||
resp, post_err = uploader.httpClient.Do(req)
|
resp, post_err = uploader.httpClient.Do(req)
|
||||||
defer util_http.CloseResponse(resp)
|
defer util_http.CloseResponse(resp)
|
||||||
@@ -394,7 +394,7 @@ func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction
|
|||||||
|
|
||||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||||
if unmarshal_err != nil {
|
if unmarshal_err != nil {
|
||||||
glog.Errorf("unmarshal %s: %v", option.UploadUrl, string(resp_body))
|
glog.ErrorfCtx(ctx, "unmarshal %s: %v", option.UploadUrl, string(resp_body))
|
||||||
return nil, fmt.Errorf("unmarshal %v: %v", option.UploadUrl, unmarshal_err)
|
return nil, fmt.Errorf("unmarshal %v: %v", option.UploadUrl, unmarshal_err)
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != "" {
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath util.Fu
|
|||||||
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
|
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
|
||||||
resp, err := LookupEntry(ctx, client, request)
|
resp, err := LookupEntry(ctx, client, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
|
glog.V(3).InfofCtx(ctx, "read %s %v: %v", fullFilePath, resp, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +117,7 @@ func doSeaweedList(ctx context.Context, client SeaweedFilerClient, fullDirPath u
|
|||||||
InclusiveStartFrom: inclusive,
|
InclusiveStartFrom: inclusive,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("read directory: %v", request)
|
glog.V(4).InfofCtx(ctx, "read directory: %v", request)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
stream, err := client.ListEntries(ctx, request)
|
stream, err := client.ListEntries(ctx, request)
|
||||||
@@ -165,14 +165,14 @@ func Exists(ctx context.Context, filerClient FilerClient, parentDirectoryPath st
|
|||||||
Name: entryName,
|
Name: entryName,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
glog.V(4).InfofCtx(ctx, "exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
||||||
resp, err := LookupEntry(ctx, client, request)
|
resp, err := LookupEntry(ctx, client, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrNotFound {
|
if err == ErrNotFound {
|
||||||
exists = false
|
exists = false
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
glog.V(0).Infof("exists entry %v: %v", request, err)
|
glog.V(0).InfofCtx(ctx, "exists entry %v: %v", request, err)
|
||||||
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -193,9 +193,9 @@ func Touch(ctx context.Context, filerClient FilerClient, parentDirectoryPath str
|
|||||||
Entry: entry,
|
Entry: entry,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("touch entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
glog.V(4).InfofCtx(ctx, "touch entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
||||||
if err := UpdateEntry(ctx, client, request); err != nil {
|
if err := UpdateEntry(ctx, client, request); err != nil {
|
||||||
glog.V(0).Infof("touch exists entry %v: %v", request, err)
|
glog.V(0).InfofCtx(ctx, "touch exists entry %v: %v", request, err)
|
||||||
return fmt.Errorf("touch exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
return fmt.Errorf("touch exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,9 +232,9 @@ func DoMkdir(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath
|
|||||||
Entry: entry,
|
Entry: entry,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("mkdir: %v", request)
|
glog.V(1).InfofCtx(ctx, "mkdir: %v", request)
|
||||||
if err := CreateEntry(ctx, client, request); err != nil {
|
if err := CreateEntry(ctx, client, request); err != nil {
|
||||||
glog.V(0).Infof("mkdir %v: %v", request, err)
|
glog.V(0).InfofCtx(ctx, "mkdir %v: %v", request, err)
|
||||||
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
|
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -266,9 +266,9 @@ func MkFile(ctx context.Context, filerClient FilerClient, parentDirectoryPath st
|
|||||||
Entry: entry,
|
Entry: entry,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
|
glog.V(1).InfofCtx(ctx, "create file: %s/%s", parentDirectoryPath, fileName)
|
||||||
if err := CreateEntry(ctx, client, request); err != nil {
|
if err := CreateEntry(ctx, client, request); err != nil {
|
||||||
glog.V(0).Infof("create file %v:%v", request, err)
|
glog.V(0).InfofCtx(ctx, "create file %v:%v", request, err)
|
||||||
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
|
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -111,11 +111,11 @@ func AfterEntryDeserialization(chunks []*FileChunk) {
|
|||||||
func CreateEntry(ctx context.Context, client SeaweedFilerClient, request *CreateEntryRequest) error {
|
func CreateEntry(ctx context.Context, client SeaweedFilerClient, request *CreateEntryRequest) error {
|
||||||
resp, err := client.CreateEntry(ctx, request)
|
resp, err := client.CreateEntry(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
|
glog.V(1).InfofCtx(ctx, "create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
|
||||||
return fmt.Errorf("CreateEntry: %v", err)
|
return fmt.Errorf("CreateEntry: %v", err)
|
||||||
}
|
}
|
||||||
if resp.Error != "" {
|
if resp.Error != "" {
|
||||||
glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, resp.Error)
|
glog.V(1).InfofCtx(ctx, "create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, resp.Error)
|
||||||
return fmt.Errorf("CreateEntry : %v", resp.Error)
|
return fmt.Errorf("CreateEntry : %v", resp.Error)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -124,7 +124,7 @@ func CreateEntry(ctx context.Context, client SeaweedFilerClient, request *Create
|
|||||||
func UpdateEntry(ctx context.Context, client SeaweedFilerClient, request *UpdateEntryRequest) error {
|
func UpdateEntry(ctx context.Context, client SeaweedFilerClient, request *UpdateEntryRequest) error {
|
||||||
_, err := client.UpdateEntry(ctx, request)
|
_, err := client.UpdateEntry(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
|
glog.V(1).InfofCtx(ctx, "update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
|
||||||
return fmt.Errorf("UpdateEntry: %v", err)
|
return fmt.Errorf("UpdateEntry: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -136,7 +136,7 @@ func LookupEntry(ctx context.Context, client SeaweedFilerClient, request *Lookup
|
|||||||
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
|
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
glog.V(3).Infof("read %s/%v: %v", request.Directory, request.Name, err)
|
glog.V(3).InfofCtx(ctx, "read %s/%v: %v", request.Directory, request.Name, err)
|
||||||
return nil, fmt.Errorf("LookupEntry1: %v", err)
|
return nil, fmt.Errorf("LookupEntry1: %v", err)
|
||||||
}
|
}
|
||||||
if resp.Entry == nil {
|
if resp.Entry == nil {
|
||||||
|
|||||||
@@ -76,14 +76,14 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
|
glog.V(1).InfofCtx(ctx, "LookupFileId volume id %s: %v", vid, err)
|
||||||
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
|
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
locations := vid2Locations[vid]
|
locations := vid2Locations[vid]
|
||||||
|
|
||||||
if locations == nil || len(locations.Locations) == 0 {
|
if locations == nil || len(locations.Locations) == 0 {
|
||||||
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
|
glog.V(1).InfofCtx(ctx, "LookupFileId locate volume id %s: %v", vid, err)
|
||||||
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
|
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,14 +21,14 @@ import (
|
|||||||
|
|
||||||
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
|
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
|
||||||
|
|
||||||
glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
|
glog.V(4).InfofCtx(ctx, "LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
|
||||||
|
|
||||||
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
|
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
return &filer_pb.LookupDirectoryEntryResponse{}, err
|
return &filer_pb.LookupDirectoryEntryResponse{}, err
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
|
glog.V(3).InfofCtx(ctx, "LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,7 +97,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
|
|||||||
for _, vidString := range req.VolumeIds {
|
for _, vidString := range req.VolumeIds {
|
||||||
vid, err := strconv.Atoi(vidString)
|
vid, err := strconv.Atoi(vidString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("Unknown volume id %d", vid)
|
glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var locs []*filer_pb.Location
|
var locs []*filer_pb.Location
|
||||||
@@ -138,7 +138,7 @@ func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetU
|
|||||||
|
|
||||||
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
|
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("CreateEntry %v/%v", req.Directory, req.Entry.Name)
|
glog.V(4).InfofCtx(ctx, "CreateEntry %v/%v", req.Directory, req.Entry.Name)
|
||||||
|
|
||||||
resp = &filer_pb.CreateEntryResponse{}
|
resp = &filer_pb.CreateEntryResponse{}
|
||||||
|
|
||||||
@@ -160,7 +160,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
|||||||
if createErr == nil {
|
if createErr == nil {
|
||||||
fs.filer.DeleteChunksNotRecursive(garbage)
|
fs.filer.DeleteChunksNotRecursive(garbage)
|
||||||
} else {
|
} else {
|
||||||
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
|
glog.V(3).InfofCtx(ctx, "CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
|
||||||
resp.Error = createErr.Error()
|
resp.Error = createErr.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +169,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
|||||||
|
|
||||||
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
||||||
|
|
||||||
glog.V(4).Infof("UpdateEntry %v", req)
|
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)
|
||||||
|
|
||||||
fullpath := util.Join(req.Directory, req.Entry.Name)
|
fullpath := util.Join(req.Directory, req.Entry.Name)
|
||||||
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
|
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
|
||||||
@@ -195,7 +195,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
|||||||
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
|
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
|
glog.V(3).InfofCtx(ctx, "UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &filer_pb.UpdateEntryResponse{}, err
|
return &filer_pb.UpdateEntryResponse{}, err
|
||||||
@@ -230,7 +230,7 @@ func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, exist
|
|||||||
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
|
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// not good, but should be ok
|
// not good, but should be ok
|
||||||
glog.V(0).Infof("MaybeManifestize: %v", err)
|
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -241,7 +241,7 @@ func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, exist
|
|||||||
|
|
||||||
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
|
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
|
||||||
|
|
||||||
glog.V(4).Infof("AppendToEntry %v", req)
|
glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req)
|
||||||
fullpath := util.NewFullPath(req.Directory, req.EntryName)
|
fullpath := util.NewFullPath(req.Directory, req.EntryName)
|
||||||
|
|
||||||
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
|
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
|
||||||
@@ -273,13 +273,13 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
|||||||
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
|
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
|
||||||
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
|
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("detectStorageOption: %v", err)
|
glog.WarningfCtx(ctx, "detectStorageOption: %v", err)
|
||||||
return &filer_pb.AppendToEntryResponse{}, err
|
return &filer_pb.AppendToEntryResponse{}, err
|
||||||
}
|
}
|
||||||
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
|
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// not good, but should be ok
|
// not good, but should be ok
|
||||||
glog.V(0).Infof("MaybeManifestize: %v", err)
|
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
|
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
|
||||||
@@ -289,7 +289,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
|||||||
|
|
||||||
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
|
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("DeleteEntry %v", req)
|
glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req)
|
||||||
|
|
||||||
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
|
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
|
||||||
resp = &filer_pb.DeleteEntryResponse{}
|
resp = &filer_pb.DeleteEntryResponse{}
|
||||||
@@ -307,7 +307,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
|
|||||||
|
|
||||||
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
|
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("AssignVolume: %v", err)
|
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
|
||||||
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,11 +315,11 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
|
|||||||
|
|
||||||
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("AssignVolume: %v", err)
|
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
|
||||||
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
||||||
}
|
}
|
||||||
if assignResult.Error != "" {
|
if assignResult.Error != "" {
|
||||||
glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
|
glog.V(3).InfofCtx(ctx, "AssignVolume error: %v", assignResult.Error)
|
||||||
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -339,7 +339,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
|
|||||||
|
|
||||||
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
|
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("CollectionList %v", req)
|
glog.V(4).InfofCtx(ctx, "CollectionList %v", req)
|
||||||
resp = &filer_pb.CollectionListResponse{}
|
resp = &filer_pb.CollectionListResponse{}
|
||||||
|
|
||||||
err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
||||||
@@ -361,7 +361,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect
|
|||||||
|
|
||||||
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
|
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("DeleteCollection %v", req)
|
glog.V(4).InfofCtx(ctx, "DeleteCollection %v", req)
|
||||||
|
|
||||||
err = fs.filer.DoDeleteCollection(req.GetCollection())
|
err = fs.filer.DoDeleteCollection(req.GetCollection())
|
||||||
|
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
|
|||||||
MinorVersion: version.MINOR_VERSION,
|
MinorVersion: version.MINOR_VERSION,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("GetFilerConfiguration: %v", t)
|
glog.V(4).InfofCtx(ctx, "GetFilerConfiguration: %v", t)
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
|
|||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
|
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("locate %s: %v", fileId, err)
|
glog.ErrorfCtx(ctx, "locate %s: %v", fileId, err)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
|
|||||||
|
|
||||||
proxyReq, err := http.NewRequest(r.Method, urlStrings[rand.IntN(len(urlStrings))], r.Body)
|
proxyReq, err := http.NewRequest(r.Method, urlStrings[rand.IntN(len(urlStrings))], r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("NewRequest %s: %v", urlStrings[0], err)
|
glog.ErrorfCtx(ctx, "NewRequest %s: %v", urlStrings[0], err)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -66,7 +66,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
|
|||||||
proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
|
proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
|
||||||
|
|
||||||
if postErr != nil {
|
if postErr != nil {
|
||||||
glog.Errorf("post to filer: %v", postErr)
|
glog.ErrorfCtx(ctx, "post to filer: %v", postErr)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -102,11 +102,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err == filer_pb.ErrNotFound {
|
if err == filer_pb.ErrNotFound {
|
||||||
glog.V(2).Infof("Not found %s: %v", path, err)
|
glog.V(2).InfofCtx(ctx, "Not found %s: %v", path, err)
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("Internal %s: %v", path, err)
|
glog.ErrorfCtx(ctx, "Internal %s: %v", path, err)
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
@@ -244,7 +244,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
defer mem.Free(data)
|
defer mem.Free(data)
|
||||||
err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks())
|
err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("failed to read %s: %v", path, err)
|
glog.ErrorfCtx(ctx, "failed to read %s: %v", path, err)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -260,7 +260,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
_, err := writer.Write(entry.Content[offset : offset+size])
|
_, err := writer.Write(entry.Content[offset : offset+size])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
|
||||||
glog.Errorf("failed to write entry content: %v", err)
|
glog.ErrorfCtx(ctx, "failed to write entry content: %v", err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}, nil
|
}, nil
|
||||||
@@ -273,7 +273,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
Name: name,
|
Name: name,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
|
||||||
glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
|
glog.ErrorfCtx(ctx, "CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
|
||||||
return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err)
|
return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err)
|
||||||
} else {
|
} else {
|
||||||
chunks = resp.Entry.GetChunks()
|
chunks = resp.Entry.GetChunks()
|
||||||
@@ -283,14 +283,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
||||||
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
|
glog.ErrorfCtx(ctx, "failed to prepare stream content %s: %v", r.URL, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return func(writer io.Writer) error {
|
return func(writer io.Writer) error {
|
||||||
err := streamFn(writer)
|
err := streamFn(writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
||||||
glog.Errorf("failed to stream content %s: %v", r.URL, err)
|
glog.ErrorfCtx(ctx, "failed to stream content %s: %v", r.URL, err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
|
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
|
glog.V(0).InfofCtx(ctx, "listDirectory %s %s %d: %s", path, lastFileName, limit, err)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -58,7 +58,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
emptyFolder = false
|
emptyFolder = false
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
|
glog.V(4).InfofCtx(ctx, "listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
|
||||||
|
|
||||||
if r.Header.Get("Accept") == "application/json" {
|
if r.Header.Get("Accept") == "application/json" {
|
||||||
writeJsonQuiet(w, r, http.StatusOK, struct {
|
writeJsonQuiet(w, r, http.StatusOK, struct {
|
||||||
@@ -103,7 +103,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
fs.option.ShowUIDirectoryDelete,
|
fs.option.ShowUIDirectoryDelete,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("Template Execute Error: %v", err)
|
glog.V(0).InfofCtx(ctx, "Template Execute Error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||||
glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
|
glog.V(0).InfofCtx(ctx, "failing to update %s tagging : %v", path, dbErr)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -109,7 +109,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||||
glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
|
glog.V(0).InfofCtx(ctx, "failing to delete %s tagging : %v", path, dbErr)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.Stor
|
|||||||
|
|
||||||
assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
glog.Errorf("failing to assign a file id: %v", ae)
|
glog.ErrorfCtx(ctx, "failing to assign a file id: %v", ae)
|
||||||
err = ae
|
err = ae
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -93,14 +93,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
|
|||||||
if err == ErrReadOnly {
|
if err == ErrReadOnly {
|
||||||
w.WriteHeader(http.StatusInsufficientStorage)
|
w.WriteHeader(http.StatusInsufficientStorage)
|
||||||
} else {
|
} else {
|
||||||
glog.V(1).Infoln("post", r.RequestURI, ":", err.Error())
|
glog.V(1).InfolnCtx(ctx, "post", r.RequestURI, ":", err.Error())
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if util.FullPath(r.URL.Path).IsLongerFileName(so.MaxFileNameLength) {
|
if util.FullPath(r.URL.Path).IsLongerFileName(so.MaxFileNameLength) {
|
||||||
glog.V(1).Infoln("post", r.RequestURI, ": ", "entry name too long")
|
glog.V(1).InfolnCtx(ctx, "post", r.RequestURI, ": ", "entry name too long")
|
||||||
w.WriteHeader(http.StatusRequestURITooLong)
|
w.WriteHeader(http.StatusRequestURITooLong)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -128,7 +128,7 @@ func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.
|
|||||||
src := r.URL.Query().Get("mv.from")
|
src := r.URL.Query().Get("mv.from")
|
||||||
dst := r.URL.Path
|
dst := r.URL.Path
|
||||||
|
|
||||||
glog.V(2).Infof("FilerServer.move %v to %v", src, dst)
|
glog.V(2).InfofCtx(ctx, "FilerServer.move %v to %v", src, dst)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if src, err = clearName(src); err != nil {
|
if src, err = clearName(src); err != nil {
|
||||||
@@ -261,7 +261,7 @@ func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCol
|
|||||||
if ttlSeconds == 0 {
|
if ttlSeconds == 0 {
|
||||||
ttl, err := needle.ReadTTL(rule.GetTtl())
|
ttl, err := needle.ReadTTL(rule.GetTtl())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
|
glog.ErrorfCtx(ctx, "fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
|
||||||
}
|
}
|
||||||
ttlSeconds = int32(ttl.Minutes()) * 60
|
ttlSeconds = int32(ttl.Minutes()) * 60
|
||||||
}
|
}
|
||||||
@@ -284,7 +284,7 @@ func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCo
|
|||||||
|
|
||||||
ttl, err := needle.ReadTTL(qTtl)
|
ttl, err := needle.ReadTTL(qTtl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
|
glog.ErrorfCtx(ctx, "fail to parse ttl %s: %v", qTtl, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
|
so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -14,6 +13,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
@@ -240,7 +241,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
}
|
}
|
||||||
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
|
glog.ErrorfCtx(ctx, "Invalid mode format: %s, use 0660 by default", modeStr)
|
||||||
mode = 0660
|
mode = 0660
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,7 +258,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
if isAppend || isOffsetWrite {
|
if isAppend || isOffsetWrite {
|
||||||
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
|
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
|
||||||
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
||||||
glog.V(0).Infof("failing to find %s: %v", path, findErr)
|
glog.V(0).InfofCtx(ctx, "failing to find %s: %v", path, findErr)
|
||||||
}
|
}
|
||||||
entry = existingEntry
|
entry = existingEntry
|
||||||
}
|
}
|
||||||
@@ -280,7 +281,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Infoln("saving", path)
|
glog.V(4).InfolnCtx(ctx, "saving", path)
|
||||||
newChunks = fileChunks
|
newChunks = fileChunks
|
||||||
entry = &filer.Entry{
|
entry = &filer.Entry{
|
||||||
FullPath: util.FullPath(path),
|
FullPath: util.FullPath(path),
|
||||||
@@ -302,14 +303,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
// maybe concatenate small chunks into one whole chunk
|
// maybe concatenate small chunks into one whole chunk
|
||||||
mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks)
|
mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks)
|
||||||
if replyerr != nil {
|
if replyerr != nil {
|
||||||
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
|
glog.V(0).InfofCtx(ctx, "merge chunks %s: %v", r.RequestURI, replyerr)
|
||||||
mergedChunks = newChunks
|
mergedChunks = newChunks
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe compact entry chunks
|
// maybe compact entry chunks
|
||||||
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
|
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
|
||||||
if replyerr != nil {
|
if replyerr != nil {
|
||||||
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
|
glog.V(0).InfofCtx(ctx, "manifestize %s: %v", r.RequestURI, replyerr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
entry.Chunks = mergedChunks
|
entry.Chunks = mergedChunks
|
||||||
@@ -344,7 +345,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
if dbErr != nil {
|
if dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
glog.V(0).InfofCtx(ctx, "failing to write %s to filer server : %v", path, dbErr)
|
||||||
}
|
}
|
||||||
return filerResult, replyerr
|
return filerResult, replyerr
|
||||||
}
|
}
|
||||||
@@ -404,7 +405,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
}
|
}
|
||||||
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
|
glog.ErrorfCtx(ctx, "Invalid mode format: %s, use 0660 by default", modeStr)
|
||||||
mode = 0660
|
mode = 0660
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -420,7 +421,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infoln("mkdir", path)
|
glog.V(4).InfolnCtx(ctx, "mkdir", path)
|
||||||
entry := &filer.Entry{
|
entry := &filer.Entry{
|
||||||
FullPath: util.FullPath(path),
|
FullPath: util.FullPath(path),
|
||||||
Attr: filer.Attr{
|
Attr: filer.Attr{
|
||||||
@@ -440,7 +441,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
|
glog.V(0).InfofCtx(ctx, "failing to create dir %s on filer server : %v", path, dbErr)
|
||||||
}
|
}
|
||||||
return filerResult, replyerr
|
return filerResult, replyerr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
|
|||||||
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
|
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
|
glog.V(4).InfofCtx(ctx, "write %s to %v", r.URL.Path, urlLocation)
|
||||||
|
|
||||||
// Note: encrypt(gzip(data)), encrypt data first, then gzip
|
// Note: encrypt(gzip(data)), encrypt data first, then gzip
|
||||||
|
|
||||||
|
|||||||
@@ -2,13 +2,14 @@ package weed_server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const MergeChunkMinCount int = 1000
|
const MergeChunkMinCount int = 1000
|
||||||
@@ -57,7 +58,7 @@ func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOpt
|
|||||||
|
|
||||||
garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
|
garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
|
glog.ErrorfCtx(ctx, "Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
|
||||||
mergedChunks, inputChunks)
|
mergedChunks, inputChunks)
|
||||||
return mergedChunks, err
|
return mergedChunks, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reade
|
|||||||
fileChunksSize := len(fileChunks) + len(chunks)
|
fileChunksSize := len(fileChunks) + len(chunks)
|
||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
fileChunks = append(fileChunks, chunk)
|
fileChunks = append(fileChunks, chunk)
|
||||||
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
|
glog.V(4).InfofCtx(ctx, "uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
|
||||||
}
|
}
|
||||||
fileChunksLock.Unlock()
|
fileChunksLock.Unlock()
|
||||||
}
|
}
|
||||||
@@ -149,9 +149,9 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reade
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
glog.V(0).Infof("upload file %s error: %v", fileName, uploadErr)
|
glog.V(0).InfofCtx(ctx, "upload file %s error: %v", fileName, uploadErr)
|
||||||
for _, chunk := range fileChunks {
|
for _, chunk := range fileChunks {
|
||||||
glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(4).InfofCtx(ctx, "purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
}
|
}
|
||||||
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
|
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
|
||||||
return nil, md5Hash, 0, uploadErr, nil
|
return nil, md5Hash, 0, uploadErr, nil
|
||||||
@@ -206,14 +206,14 @@ func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType st
|
|||||||
// assign one file id for one chunk
|
// assign one file id for one chunk
|
||||||
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
|
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
|
glog.V(4).InfofCtx(ctx, "retry later due to assign error: %v", uploadErr)
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
|
||||||
return uploadErr
|
return uploadErr
|
||||||
}
|
}
|
||||||
// upload the chunk to the volume server
|
// upload the chunk to the volume server
|
||||||
uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
|
uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
|
glog.V(4).InfofCtx(ctx, "retry later due to upload error: %v", uploadErr)
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
|
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
|
||||||
fid, _ := filer_pb.ToFileIdObject(fileId)
|
fid, _ := filer_pb.ToFileIdObject(fileId)
|
||||||
fileChunk := filer_pb.FileChunk{
|
fileChunk := filer_pb.FileChunk{
|
||||||
@@ -227,7 +227,7 @@ func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType st
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("upload error: %v", err)
|
glog.ErrorfCtx(ctx, "upload error: %v", err)
|
||||||
return failedFileChunks, err
|
return failedFileChunks, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import (
|
|||||||
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
if e := r.ParseForm(); e != nil {
|
if e := r.ParseForm(); e != nil {
|
||||||
glog.V(0).Infoln("form parse error:", e)
|
glog.V(0).InfolnCtx(ctx, "form parse error:", e)
|
||||||
writeJsonError(w, r, http.StatusBadRequest, e)
|
writeJsonError(w, r, http.StatusBadRequest, e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -26,7 +26,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
|
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
|
||||||
volumeId, ve := needle.NewVolumeId(vid)
|
volumeId, ve := needle.NewVolumeId(vid)
|
||||||
if ve != nil {
|
if ve != nil {
|
||||||
glog.V(0).Infoln("NewVolumeId error:", ve)
|
glog.V(0).InfolnCtx(ctx, "NewVolumeId error:", ve)
|
||||||
writeJsonError(w, r, http.StatusBadRequest, ve)
|
writeJsonError(w, r, http.StatusBadRequest, ve)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -285,7 +285,7 @@ func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCom
|
|||||||
// drains the response body to avoid memory leak
|
// drains the response body to avoid memory leak
|
||||||
data, _ := io.ReadAll(reader)
|
data, _ := io.ReadAll(reader)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
|
glog.V(1).InfofCtx(ctx, "%s reader has remaining %d bytes", contentEncoding, len(data))
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
@@ -368,7 +368,7 @@ func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte
|
|||||||
if isContentCompressed {
|
if isContentCompressed {
|
||||||
decryptedData, err = util.DecompressData(decryptedData)
|
decryptedData, err = util.DecompressData(decryptedData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
|
glog.V(0).InfofCtx(ctx, "unzip decrypt %s: %v", fileUrl, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(decryptedData) < int(offset)+size {
|
if len(decryptedData) < int(offset)+size {
|
||||||
@@ -472,13 +472,13 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
|
glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil && shouldRetry {
|
if err != nil && shouldRetry {
|
||||||
glog.V(0).Infof("retry reading in %v", waitTime)
|
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
|
|||||||
Reference in New Issue
Block a user