| 
									
										
										
										
											2019-12-25 09:53:13 -08:00
										 |  |  | package weed_server | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" | 
					
						
							|  |  |  | 	"github.com/chrislusf/seaweedfs/weed/storage/backend" | 
					
						
							|  |  |  | 	"github.com/chrislusf/seaweedfs/weed/storage/needle" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server | 
					
						
							|  |  |  | func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error { | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// find existing volume | 
					
						
							|  |  |  | 	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) | 
					
						
							|  |  |  | 	if v == nil { | 
					
						
							|  |  |  | 		return fmt.Errorf("volume %d not found", req.VolumeId) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// verify the collection | 
					
						
							|  |  |  | 	if v.Collection != req.Collection { | 
					
						
							|  |  |  | 		return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// locate the disk file | 
					
						
							|  |  |  | 	storageName, storageKey := v.RemoteStorageNameKey() | 
					
						
							|  |  |  | 	if storageName == "" || storageKey == "" { | 
					
						
							|  |  |  | 		return fmt.Errorf("volume %d is already on local disk", req.VolumeId) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// check whether the local .dat already exists | 
					
						
							|  |  |  | 	_, ok := v.DataBackend.(*backend.DiskFile) | 
					
						
							|  |  |  | 	if ok { | 
					
						
							|  |  |  | 		return fmt.Errorf("volume %d is already on local disk", req.VolumeId) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// check valid storage backend type | 
					
						
							|  |  |  | 	backendStorage, found := backend.BackendStorages[storageName] | 
					
						
							|  |  |  | 	if !found { | 
					
						
							|  |  |  | 		var keys []string | 
					
						
							|  |  |  | 		for key := range backend.BackendStorages { | 
					
						
							|  |  |  | 			keys = append(keys, key) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	startTime := time.Now() | 
					
						
							|  |  |  | 	fn := func(progressed int64, percentage float32) error { | 
					
						
							|  |  |  | 		now := time.Now() | 
					
						
							|  |  |  | 		if now.Sub(startTime) < time.Second { | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		startTime = now | 
					
						
							|  |  |  | 		return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{ | 
					
						
							|  |  |  | 			Processed:           progressed, | 
					
						
							|  |  |  | 			ProcessedPercentage: percentage, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// copy the data file | 
					
						
							| 
									
										
										
										
											2020-11-27 03:17:10 -08:00
										 |  |  | 	_, err := backendStorage.DownloadFile(v.FileName(".dat"), storageKey, fn) | 
					
						
							| 
									
										
										
										
											2019-12-25 09:53:13 -08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2020-11-27 03:17:10 -08:00
										 |  |  | 		return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName(".dat"), err) | 
					
						
							| 
									
										
										
										
											2019-12-25 09:53:13 -08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if req.KeepRemoteDatFile { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// remove remote file | 
					
						
							|  |  |  | 	if err := backendStorage.DeleteFile(storageKey); err != nil { | 
					
						
							|  |  |  | 		return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// forget remote file | 
					
						
							| 
									
										
										
										
											2019-12-28 11:21:49 -08:00
										 |  |  | 	v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:] | 
					
						
							|  |  |  | 	if err := v.SaveVolumeInfo(); err != nil { | 
					
						
							| 
									
										
										
										
											2019-12-25 09:53:13 -08:00
										 |  |  | 		return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	v.DataBackend.Close() | 
					
						
							|  |  |  | 	v.DataBackend = nil | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |