mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-18 19:07:55 +08:00
filer: streaming file listing
This commit is contained in:
@@ -3,11 +3,13 @@ package shell
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"google.golang.org/grpc"
|
||||
"io"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -43,62 +45,12 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
|
||||
path = path + "/"
|
||||
}
|
||||
|
||||
|
||||
var blockCount, byteCount uint64
|
||||
dir, name := filer2.FullPath(path).DirAndName()
|
||||
blockCount, byteCount, err = duTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name)
|
||||
|
||||
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
_, _, err = paginateDirectory(ctx, writer, client, dir, name, 1000)
|
||||
|
||||
return err
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int) (blockCount uint64, byteCount uint64, err error) {
|
||||
|
||||
paginatedCount := -1
|
||||
startFromFileName := ""
|
||||
|
||||
for paginatedCount == -1 || paginatedCount == paginateSize {
|
||||
resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: dir,
|
||||
Prefix: name,
|
||||
StartFromFileName: startFromFileName,
|
||||
InclusiveStartFrom: false,
|
||||
Limit: uint32(paginateSize),
|
||||
})
|
||||
if listErr != nil {
|
||||
err = listErr
|
||||
return
|
||||
}
|
||||
|
||||
paginatedCount = len(resp.Entries)
|
||||
|
||||
for _, entry := range resp.Entries {
|
||||
if entry.IsDirectory {
|
||||
subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
|
||||
if dir == "/" {
|
||||
subDir = "/" + entry.Name
|
||||
}
|
||||
numBlock, numByte, err := paginateDirectory(ctx, writer, client, subDir, "", paginateSize)
|
||||
if err == nil {
|
||||
blockCount += numBlock
|
||||
byteCount += numByte
|
||||
}
|
||||
} else {
|
||||
blockCount += uint64(len(entry.Chunks))
|
||||
byteCount += filer2.TotalSize(entry.Chunks)
|
||||
}
|
||||
startFromFileName = entry.Name
|
||||
|
||||
if name != "" && !entry.IsDirectory {
|
||||
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if name == "" {
|
||||
if name == "" && err == nil {
|
||||
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
|
||||
}
|
||||
|
||||
@@ -106,6 +58,31 @@ func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.Se
|
||||
|
||||
}
|
||||
|
||||
func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount uint64, byteCount uint64, err error) {
|
||||
|
||||
err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
|
||||
if entry.IsDirectory {
|
||||
subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
|
||||
if dir == "/" {
|
||||
subDir = "/" + entry.Name
|
||||
}
|
||||
numBlock, numByte, err := duTraverseDirectory(ctx, writer, filerClient, subDir, "")
|
||||
if err == nil {
|
||||
blockCount += numBlock
|
||||
byteCount += numByte
|
||||
}
|
||||
} else {
|
||||
blockCount += uint64(len(entry.Chunks))
|
||||
byteCount += filer2.TotalSize(entry.Chunks)
|
||||
}
|
||||
|
||||
if name != "" && !entry.IsDirectory {
|
||||
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
|
||||
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
|
||||
@@ -115,3 +92,20 @@ func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string,
|
||||
}, filerGrpcAddress, env.option.GrpcDialOption)
|
||||
|
||||
}
|
||||
|
||||
type commandFilerClient struct {
|
||||
env *CommandEnv
|
||||
filerServer string
|
||||
filerPort int64
|
||||
}
|
||||
|
||||
func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *commandFilerClient {
|
||||
return &commandFilerClient{
|
||||
env: env,
|
||||
filerServer: filerServer,
|
||||
filerPort: filerPort,
|
||||
}
|
||||
}
|
||||
func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn)
|
||||
}
|
||||
|
Reference in New Issue
Block a user