You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							282 lines
						
					
					
						
							8.3 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							282 lines
						
					
					
						
							8.3 KiB
						
					
					
				| package erasure_coding | |
| 
 | |
| import ( | |
| 	"errors" | |
| 	"fmt" | |
| 	"math" | |
| 	"os" | |
| 	"slices" | |
| 	"sync" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/idx" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/needle" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/types" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/volume_info" | |
| ) | |
| 
 | |
| var ( | |
| 	NotFoundError             = errors.New("needle not found") | |
| 	destroyDelaySeconds int64 = 0 | |
| ) | |
| 
 | |
| type EcVolume struct { | |
| 	VolumeId                  needle.VolumeId | |
| 	Collection                string | |
| 	dir                       string | |
| 	dirIdx                    string | |
| 	ecxFile                   *os.File | |
| 	ecxFileSize               int64 | |
| 	ecxCreatedAt              time.Time | |
| 	Shards                    []*EcVolumeShard | |
| 	ShardLocations            map[ShardId][]pb.ServerAddress | |
| 	ShardLocationsRefreshTime time.Time | |
| 	ShardLocationsLock        sync.RWMutex | |
| 	Version                   needle.Version | |
| 	ecjFile                   *os.File | |
| 	ecjFileAccessLock         sync.Mutex | |
| 	diskType                  types.DiskType | |
| 	datFileSize               int64 | |
| 	ExpireAtSec               uint64 //ec volume destroy time, calculated from the ec volume was created | |
| } | |
| 
 | |
| func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { | |
| 	ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType} | |
| 
 | |
| 	dataBaseFileName := EcShardFileName(collection, dir, int(vid)) | |
| 	indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) | |
| 
 | |
| 	// open ecx file | |
| 	if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { | |
| 		return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err) | |
| 	} | |
| 	ecxFi, statErr := ev.ecxFile.Stat() | |
| 	if statErr != nil { | |
| 		_ = ev.ecxFile.Close() | |
| 		return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr) | |
| 	} | |
| 	ev.ecxFileSize = ecxFi.Size() | |
| 	ev.ecxCreatedAt = ecxFi.ModTime() | |
| 
 | |
| 	// open ecj file | |
| 	if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { | |
| 		return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err) | |
| 	} | |
| 
 | |
| 	// read volume info | |
| 	ev.Version = needle.Version3 | |
| 	if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { | |
| 		ev.Version = needle.Version(volumeInfo.Version) | |
| 		ev.datFileSize = volumeInfo.DatFileSize | |
| 		ev.ExpireAtSec = volumeInfo.ExpireAtSec | |
| 	} else { | |
| 		glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName) | |
| 		volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) | |
| 	} | |
| 
 | |
| 	ev.ShardLocations = make(map[ShardId][]pb.ServerAddress) | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool { | |
| 	for _, s := range ev.Shards { | |
| 		if s.ShardId == ecVolumeShard.ShardId { | |
| 			return false | |
| 		} | |
| 	} | |
| 	ev.Shards = append(ev.Shards, ecVolumeShard) | |
| 	slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int { | |
| 		if a.VolumeId != b.VolumeId { | |
| 			return int(a.VolumeId - b.VolumeId) | |
| 		} | |
| 		return int(a.ShardId - b.ShardId) | |
| 	}) | |
| 	return true | |
| } | |
| 
 | |
| func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) { | |
| 	foundPosition := -1 | |
| 	for i, s := range ev.Shards { | |
| 		if s.ShardId == shardId { | |
| 			foundPosition = i | |
| 		} | |
| 	} | |
| 	if foundPosition < 0 { | |
| 		return nil, false | |
| 	} | |
| 
 | |
| 	ecVolumeShard = ev.Shards[foundPosition] | |
| 	ecVolumeShard.Unmount() | |
| 	ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...) | |
| 	return ecVolumeShard, true | |
| } | |
| 
 | |
| func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) { | |
| 	for _, s := range ev.Shards { | |
| 		if s.ShardId == shardId { | |
| 			return s, true | |
| 		} | |
| 	} | |
| 	return nil, false | |
| } | |
| 
 | |
| func (ev *EcVolume) Close() { | |
| 	for _, s := range ev.Shards { | |
| 		s.Close() | |
| 	} | |
| 	if ev.ecjFile != nil { | |
| 		ev.ecjFileAccessLock.Lock() | |
| 		_ = ev.ecjFile.Close() | |
| 		ev.ecjFile = nil | |
| 		ev.ecjFileAccessLock.Unlock() | |
| 	} | |
| 	if ev.ecxFile != nil { | |
| 		_ = ev.ecxFile.Sync() | |
| 		_ = ev.ecxFile.Close() | |
| 		ev.ecxFile = nil | |
| 	} | |
| } | |
| 
 | |
| func (ev *EcVolume) Destroy() { | |
| 
 | |
| 	ev.Close() | |
| 
 | |
| 	for _, s := range ev.Shards { | |
| 		s.Destroy() | |
| 	} | |
| 	os.Remove(ev.FileName(".ecx")) | |
| 	os.Remove(ev.FileName(".ecj")) | |
| 	os.Remove(ev.FileName(".vif")) | |
| } | |
| 
 | |
| func (ev *EcVolume) FileName(ext string) string { | |
| 	switch ext { | |
| 	case ".ecx", ".ecj": | |
| 		return ev.IndexBaseFileName() + ext | |
| 	} | |
| 	// .vif | |
| 	return ev.DataBaseFileName() + ext | |
| } | |
| 
 | |
| func (ev *EcVolume) DataBaseFileName() string { | |
| 	return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) | |
| } | |
| 
 | |
| func (ev *EcVolume) IndexBaseFileName() string { | |
| 	return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId)) | |
| } | |
| 
 | |
| func (ev *EcVolume) ShardSize() uint64 { | |
| 	if len(ev.Shards) > 0 { | |
| 		return uint64(ev.Shards[0].Size()) | |
| 	} | |
| 	return 0 | |
| } | |
| 
 | |
| func (ev *EcVolume) Size() (size int64) { | |
| 	for _, shard := range ev.Shards { | |
| 		size += shard.Size() | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) CreatedAt() time.Time { | |
| 	return ev.ecxCreatedAt | |
| } | |
| 
 | |
| func (ev *EcVolume) ShardIdList() (shardIds []ShardId) { | |
| 	for _, s := range ev.Shards { | |
| 		shardIds = append(shardIds, s.ShardId) | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) { | |
| 	prevVolumeId := needle.VolumeId(math.MaxUint32) | |
| 	var m *master_pb.VolumeEcShardInformationMessage | |
| 	for _, s := range ev.Shards { | |
| 		if s.VolumeId != prevVolumeId { | |
| 			m = &master_pb.VolumeEcShardInformationMessage{ | |
| 				Id:          uint32(s.VolumeId), | |
| 				Collection:  s.Collection, | |
| 				DiskType:    string(ev.diskType), | |
| 				ExpireAtSec: ev.ExpireAtSec, | |
| 			} | |
| 			messages = append(messages, m) | |
| 		} | |
| 		prevVolumeId = s.VolumeId | |
| 		m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId)) | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) { | |
| 
 | |
| 	// find the needle from ecx file | |
| 	offset, size, err = ev.FindNeedleFromEcx(needleId) | |
| 	if err != nil { | |
| 		return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err) | |
| 	} | |
| 
 | |
| 	intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) { | |
| 	shard := ev.Shards[0] | |
| 	// Usually shard will be padded to round of ErasureCodingSmallBlockSize. | |
| 	// So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize, | |
| 	// the data would be in small blocks. | |
| 	shardSize := shard.ecdFileSize - 1 | |
| 	if ev.datFileSize > 0 { | |
| 		// To get the correct LargeBlockRowsCount | |
| 		// use datFileSize to calculate the shardSize to match the EC encoding logic. | |
| 		shardSize = ev.datFileSize / DataShardsCount | |
| 	} | |
| 	// calculate the locations in the ec shards | |
| 	intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version))) | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) { | |
| 	return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) | |
| } | |
| 
 | |
| func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) { | |
| 	var key types.NeedleId | |
| 	buf := make([]byte, types.NeedleMapEntrySize) | |
| 	l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize | |
| 	for l < h { | |
| 		m := (l + h) / 2 | |
| 		if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { | |
| 			if n != types.NeedleMapEntrySize { | |
| 				return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) | |
| 			} | |
| 		} | |
| 		key, offset, size = idx.IdxFileEntry(buf) | |
| 		if key == needleId { | |
| 			if processNeedleFn != nil { | |
| 				err = processNeedleFn(ecxFile, m*types.NeedleMapEntrySize) | |
| 			} | |
| 			return | |
| 		} | |
| 		if key < needleId { | |
| 			l = m + 1 | |
| 		} else { | |
| 			h = m | |
| 		} | |
| 	} | |
| 
 | |
| 	err = NotFoundError | |
| 	return | |
| } | |
| 
 | |
| func (ev *EcVolume) IsTimeToDestroy() bool { | |
| 	return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds) | |
| }
 |