mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 04:07:57 +08:00
refactor
This commit is contained in:
@@ -147,8 +147,9 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
|
|||||||
)
|
)
|
||||||
|
|
||||||
for time.Now().Sub(startTime) < maxTimeout {
|
for time.Now().Sub(startTime) < maxTimeout {
|
||||||
fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
|
fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
dn := dnList.Head()
|
||||||
return &master_pb.AssignResponse{
|
return &master_pb.AssignResponse{
|
||||||
Fid: fid,
|
Fid: fid,
|
||||||
Url: dn.Url(),
|
Url: dn.Url(),
|
||||||
|
@@ -130,9 +130,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
|
fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ms.maybeAddJwtAuthorization(w, fid, true)
|
ms.maybeAddJwtAuthorization(w, fid, true)
|
||||||
|
dn := dnList.Head()
|
||||||
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
|
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
|
||||||
} else {
|
} else {
|
||||||
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
|
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
|
||||||
|
@@ -136,7 +136,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
|
|||||||
return active > 0
|
return active > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
|
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) {
|
||||||
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
|
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
|
return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
|
||||||
@@ -145,7 +145,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
|
|||||||
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
|
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
|
||||||
}
|
}
|
||||||
fileId := t.Sequence.NextFileId(count)
|
fileId := t.Sequence.NextFileId(count)
|
||||||
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
|
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
|
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
|
||||||
|
Reference in New Issue
Block a user