shell vacuum volume by collection and volume id

This commit is contained in:
Konstantin Lebedev
2022-04-18 18:40:58 +05:00
parent cdb31c5b61
commit 1e35b4929f
20 changed files with 272 additions and 243 deletions

View File

@@ -60,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
}
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
@@ -116,6 +117,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
}
return isVacuumSuccess
}
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
@@ -144,6 +146,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
}
return isCommitSuccess
}
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
@@ -161,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -172,23 +175,30 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
// now only one vacuum process going on
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
garbageThreshold, collection, volumeId)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
if collection != "" && collection != c.Name {
continue
}
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, volumeId, preallocate)
}
}
}
}
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, volumeId uint32, preallocate int64) {
volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
if volumeId > 0 && volumeId != uint32(vid) {
continue
}
tmpMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()