add stream list directory entries

This commit is contained in:
Chris Lu
2021-01-15 23:56:24 -08:00
parent 01dc8a43ba
commit a4063a5437
20 changed files with 267 additions and 242 deletions

View File

@@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
@@ -187,26 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
if entries, _, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
for _, entry := range entries {
store.DeleteEntry(ctx, entry.FullPath)
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
return false
}
}
return nil
return true
})
return
}
func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if string(dirPath) == "/" {
return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit)
return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
}
return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
}
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
return entries, false, err
return
}
for _, index := range indexResult {
if index.Index == indexKV {
@@ -216,32 +218,33 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
fileName := getFileName(entry.FullPath)
lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
limit--
if limit < 0 {
hasMore = true
break
}
entries = append(entries, entry)
if !eachEntryFunc(entry) {
break
}
}
}
}
return entries, hasMore, nil
return
}
func (store *ElasticStore) listDirectoryEntries(
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64,
) (entries []*filer.Entry, hasMore bool, err error) {
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true
index := getIndex(fullpath)
nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath))
if _, err := store.client.Refresh(index).Do(ctx); err != nil {
if _, err = store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx)
return entries, hasMore, nil
return
}
}
for {
@@ -249,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, hasMore, err
return
}
} else {
fullPath := string(fullpath) + "/" + startFileName
@@ -259,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, hasMore, err
return
}
}
first = false
@@ -271,22 +274,21 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit--
if limit < 0 {
hasMore = true
return entries, hasMore, nil
return lastFileName, nil
}
nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath)
lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
entries = append(entries, esEntry.Entry)
if !eachEntryFunc(esEntry.Entry) {
break
}
}
}
if len(result.Hits.Hits) < store.maxPageSize {
break
}
}
return entries, hasMore, nil
return
}
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {