mirror of
				https://github.com/seaweedfs/seaweedfs.git
				synced 2025-10-22 07:17:23 +08:00 
			
		
		
		
	 5f7b024891
			
		
	
	5f7b024891
	
	
	
		
			
			Currently the file size of only one volume location is taken into account in the stats. This commit multiplies the disk usages by the amount of nodes holding a replica of the volume. This will yield the expected amount of disk usage and matches the total size calculations from before.
		
			
				
	
	
		
			193 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			193 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package weed_server
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"github.com/chrislusf/raft"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/types"
 | |
| 
 | |
| 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/security"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/needle"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/super_block"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/topology"
 | |
| )
 | |
| 
 | |
| func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	resp := &master_pb.LookupVolumeResponse{}
 | |
| 	volumeLocations := ms.lookupVolumeId(req.VolumeIds, req.Collection)
 | |
| 
 | |
| 	for _, result := range volumeLocations {
 | |
| 		var locations []*master_pb.Location
 | |
| 		for _, loc := range result.Locations {
 | |
| 			locations = append(locations, &master_pb.Location{
 | |
| 				Url:       loc.Url,
 | |
| 				PublicUrl: loc.PublicUrl,
 | |
| 			})
 | |
| 		}
 | |
| 		resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{
 | |
| 			VolumeId:  result.VolumeId,
 | |
| 			Locations: locations,
 | |
| 			Error:     result.Error,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	if req.Count == 0 {
 | |
| 		req.Count = 1
 | |
| 	}
 | |
| 
 | |
| 	if req.Replication == "" {
 | |
| 		req.Replication = ms.option.DefaultReplicaPlacement
 | |
| 	}
 | |
| 	replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	ttl, err := needle.ReadTTL(req.Ttl)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	diskType := types.ToDiskType(req.DiskType)
 | |
| 
 | |
| 	option := &topology.VolumeGrowOption{
 | |
| 		Collection:         req.Collection,
 | |
| 		ReplicaPlacement:   replicaPlacement,
 | |
| 		Ttl:                ttl,
 | |
| 		DiskType:           diskType,
 | |
| 		Prealloacte:        ms.preallocateSize,
 | |
| 		DataCenter:         req.DataCenter,
 | |
| 		Rack:               req.Rack,
 | |
| 		DataNode:           req.DataNode,
 | |
| 		MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
 | |
| 	}
 | |
| 
 | |
| 	if !ms.Topo.HasWritableVolume(option) {
 | |
| 		if ms.Topo.AvailableSpaceFor(option) <= 0 {
 | |
| 			return nil, fmt.Errorf("no free volumes left for " + option.String())
 | |
| 		}
 | |
| 		ms.vgLock.Lock()
 | |
| 		if !ms.Topo.HasWritableVolume(option) {
 | |
| 			if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil {
 | |
| 				ms.vgLock.Unlock()
 | |
| 				return nil, fmt.Errorf("Cannot grow volume group! %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 		ms.vgLock.Unlock()
 | |
| 	}
 | |
| 	fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("%v", err)
 | |
| 	}
 | |
| 
 | |
| 	return &master_pb.AssignResponse{
 | |
| 		Fid:       fid,
 | |
| 		Url:       dn.Url(),
 | |
| 		PublicUrl: dn.PublicUrl,
 | |
| 		Count:     count,
 | |
| 		Auth:      string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	if req.Replication == "" {
 | |
| 		req.Replication = ms.option.DefaultReplicaPlacement
 | |
| 	}
 | |
| 	replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	ttl, err := needle.ReadTTL(req.Ttl)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
 | |
| 	stats := volumeLayout.Stats()
 | |
| 	resp := &master_pb.StatisticsResponse{
 | |
| 		TotalSize: stats.TotalSize,
 | |
| 		UsedSize:  stats.UsedSize,
 | |
| 		FileCount: stats.FileCount,
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	resp := &master_pb.VolumeListResponse{
 | |
| 		TopologyInfo:      ms.Topo.ToTopologyInfo(),
 | |
| 		VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	resp := &master_pb.LookupEcVolumeResponse{}
 | |
| 
 | |
| 	ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
 | |
| 
 | |
| 	if !found {
 | |
| 		return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
 | |
| 	}
 | |
| 
 | |
| 	resp.VolumeId = req.VolumeId
 | |
| 
 | |
| 	for shardId, shardLocations := range ecLocations.Locations {
 | |
| 		var locations []*master_pb.Location
 | |
| 		for _, dn := range shardLocations {
 | |
| 			locations = append(locations, &master_pb.Location{
 | |
| 				Url:       string(dn.Id()),
 | |
| 				PublicUrl: dn.PublicUrl,
 | |
| 			})
 | |
| 		}
 | |
| 		resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
 | |
| 			ShardId:   uint32(shardId),
 | |
| 			Locations: locations,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
 | |
| 
 | |
| 	if !ms.Topo.IsLeader() {
 | |
| 		return nil, raft.NotLeaderError
 | |
| 	}
 | |
| 
 | |
| 	resp := &master_pb.VacuumVolumeResponse{}
 | |
| 
 | |
| 	ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize)
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 |