You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							164 lines
						
					
					
						
							4.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							164 lines
						
					
					
						
							4.6 KiB
						
					
					
				| package filer | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"io" | |
| 	"sync" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" | |
| 	"github.com/seaweedfs/seaweedfs/weed/wdclient" | |
| ) | |
| 
 | |
| type ChunkGroup struct { | |
| 	lookupFn     wdclient.LookupFileIdFunctionType | |
| 	sections     map[SectionIndex]*FileChunkSection | |
| 	sectionsLock sync.RWMutex | |
| 	readerCache  *ReaderCache | |
| } | |
| 
 | |
| func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { | |
| 	group := &ChunkGroup{ | |
| 		lookupFn:    lookupFn, | |
| 		sections:    make(map[SectionIndex]*FileChunkSection), | |
| 		readerCache: NewReaderCache(32, chunkCache, lookupFn), | |
| 	} | |
| 
 | |
| 	err := group.SetChunks(chunks) | |
| 	return group, err | |
| } | |
| 
 | |
| func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { | |
| 
 | |
| 	group.sectionsLock.Lock() | |
| 	defer group.sectionsLock.Unlock() | |
| 
 | |
| 	sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) | |
| 	for si := sectionIndexStart; si < sectionIndexStop+1; si++ { | |
| 		section, found := group.sections[si] | |
| 		if !found { | |
| 			section = NewFileChunkSection(si) | |
| 			group.sections[si] = section | |
| 		} | |
| 		section.addChunk(chunk) | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) { | |
| 	if offset >= fileSize { | |
| 		return 0, 0, io.EOF | |
| 	} | |
| 
 | |
| 	group.sectionsLock.RLock() | |
| 	defer group.sectionsLock.RUnlock() | |
| 
 | |
| 	sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) | |
| 	for si := sectionIndexStart; si < sectionIndexStop+1; si++ { | |
| 		section, found := group.sections[si] | |
| 		rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) | |
| 		if rangeStart >= rangeStop { | |
| 			continue | |
| 		} | |
| 		if !found { | |
| 			rangeStop = min(rangeStop, fileSize) | |
| 			for i := rangeStart; i < rangeStop; i++ { | |
| 				buff[i-offset] = 0 | |
| 			} | |
| 			n = int(int64(n) + rangeStop - rangeStart) | |
| 			continue | |
| 		} | |
| 		xn, xTsNs, xErr := section.readDataAt(ctx, group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart) | |
| 		if xErr != nil { | |
| 			return n + xn, max(tsNs, xTsNs), xErr | |
| 		} | |
| 		n += xn | |
| 		tsNs = max(tsNs, xTsNs) | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { | |
| 	group.sectionsLock.RLock() | |
| 	defer group.sectionsLock.RUnlock() | |
| 
 | |
| 	var dataChunks []*filer_pb.FileChunk | |
| 	for _, chunk := range chunks { | |
| 
 | |
| 		if !chunk.IsChunkManifest { | |
| 			dataChunks = append(dataChunks, chunk) | |
| 			continue | |
| 		} | |
| 
 | |
| 		resolvedChunks, err := ResolveOneChunkManifest(context.Background(), group.lookupFn, chunk) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 
 | |
| 		dataChunks = append(dataChunks, resolvedChunks...) | |
| 	} | |
| 
 | |
| 	sections := make(map[SectionIndex]*FileChunkSection) | |
| 
 | |
| 	for _, chunk := range dataChunks { | |
| 		sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) | |
| 		for si := sectionIndexStart; si < sectionIndexStop+1; si++ { | |
| 			section, found := sections[si] | |
| 			if !found { | |
| 				section = NewFileChunkSection(si) | |
| 				sections[si] = section | |
| 			} | |
| 			section.chunks = append(section.chunks, chunk) | |
| 		} | |
| 	} | |
| 
 | |
| 	group.sections = sections | |
| 	return nil | |
| } | |
| 
 | |
| const ( | |
| 	// see weedfs_file_lseek.go | |
| 	SEEK_DATA uint32 = 3 // seek to next data after the offset | |
| 	// SEEK_HOLE uint32 = 4 // seek to next hole after the offset | |
| ) | |
| 
 | |
| // FIXME: needa tests | |
| func (group *ChunkGroup) SearchChunks(ctx context.Context, offset, fileSize int64, whence uint32) (found bool, out int64) { | |
| 	group.sectionsLock.RLock() | |
| 	defer group.sectionsLock.RUnlock() | |
| 
 | |
| 	return group.doSearchChunks(ctx, offset, fileSize, whence) | |
| } | |
| 
 | |
| func (group *ChunkGroup) doSearchChunks(ctx context.Context, offset, fileSize int64, whence uint32) (found bool, out int64) { | |
| 
 | |
| 	sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize) | |
| 	if whence == SEEK_DATA { | |
| 		for si := sectionIndex; si < maxSectionIndex+1; si++ { | |
| 			section, foundSection := group.sections[si] | |
| 			if !foundSection { | |
| 				continue | |
| 			} | |
| 			sectionStart := section.DataStartOffset(ctx, group, offset, fileSize) | |
| 			if sectionStart == -1 { | |
| 				continue | |
| 			} | |
| 			return true, sectionStart | |
| 		} | |
| 		return false, 0 | |
| 	} else { | |
| 		// whence == SEEK_HOLE | |
| 		for si := sectionIndex; si < maxSectionIndex; si++ { | |
| 			section, foundSection := group.sections[si] | |
| 			if !foundSection { | |
| 				return true, offset | |
| 			} | |
| 			holeStart := section.NextStopOffset(ctx, group, offset, fileSize) | |
| 			if holeStart%SectionSize == 0 { | |
| 				continue | |
| 			} | |
| 			return true, holeStart | |
| 		} | |
| 		return true, fileSize | |
| 	} | |
| }
 |