You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							222 lines
						
					
					
						
							7.0 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							222 lines
						
					
					
						
							7.0 KiB
						
					
					
				| package page_writer | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"sync" | |
| 	"sync/atomic" | |
| ) | |
| 
 | |
| type LogicChunkIndex int | |
| 
 | |
| type UploadPipeline struct { | |
| 	uploaderCount      int32 | |
| 	uploaderCountCond  *sync.Cond | |
| 	filepath           util.FullPath | |
| 	ChunkSize          int64 | |
| 	uploaders          *util.LimitedConcurrentExecutor | |
| 	saveToStorageFn    SaveToStorageFunc | |
| 	writableChunkLimit int | |
| 	swapFile           *SwapFile | |
| 	chunksLock         sync.Mutex | |
| 	writableChunks     map[LogicChunkIndex]PageChunk | |
| 	sealedChunks       map[LogicChunkIndex]*SealedChunk | |
| 	activeReadChunks   map[LogicChunkIndex]int | |
| 	readerCountCond    *sync.Cond | |
| } | |
| 
 | |
| type SealedChunk struct { | |
| 	chunk            PageChunk | |
| 	referenceCounter int // track uploading or reading processes | |
| } | |
| 
 | |
| func (sc *SealedChunk) FreeReference(messageOnFree string) { | |
| 	sc.referenceCounter-- | |
| 	if sc.referenceCounter == 0 { | |
| 		glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) | |
| 		sc.chunk.FreeResource() | |
| 	} | |
| } | |
| 
 | |
| func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { | |
| 	t := &UploadPipeline{ | |
| 		ChunkSize:          chunkSize, | |
| 		writableChunks:     make(map[LogicChunkIndex]PageChunk), | |
| 		sealedChunks:       make(map[LogicChunkIndex]*SealedChunk), | |
| 		uploaders:          writers, | |
| 		uploaderCountCond:  sync.NewCond(&sync.Mutex{}), | |
| 		saveToStorageFn:    saveToStorageFn, | |
| 		activeReadChunks:   make(map[LogicChunkIndex]int), | |
| 		writableChunkLimit: bufferChunkLimit, | |
| 		swapFile:           NewSwapFile(swapFileDir, chunkSize), | |
| 	} | |
| 	t.readerCountCond = sync.NewCond(&t.chunksLock) | |
| 	return t | |
| } | |
| 
 | |
| func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) { | |
| 
 | |
| 	up.chunksLock.Lock() | |
| 	defer up.chunksLock.Unlock() | |
| 
 | |
| 	logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) | |
| 
 | |
| 	pageChunk, found := up.writableChunks[logicChunkIndex] | |
| 	if !found { | |
| 		if len(up.writableChunks) > up.writableChunkLimit { | |
| 			// if current file chunks is over the per file buffer count limit | |
| 			candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0) | |
| 			for lci, mc := range up.writableChunks { | |
| 				chunkFullness := mc.WrittenSize() | |
| 				if fullness < chunkFullness { | |
| 					candidateChunkIndex = lci | |
| 					fullness = chunkFullness | |
| 				} | |
| 			} | |
| 			/*  // this algo generates too many chunks | |
| 			candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) | |
| 			for wci, wc := range up.writableChunks { | |
| 				activityScore := wc.ActivityScore() | |
| 				if lowestActivityScore >= activityScore { | |
| 					if lowestActivityScore == activityScore { | |
| 						chunkFullness := wc.WrittenSize() | |
| 						if fullness < chunkFullness { | |
| 							candidateChunkIndex = lci | |
| 							fullness = chunkFullness | |
| 						} | |
| 					} | |
| 					candidateChunkIndex = wci | |
| 					lowestActivityScore = activityScore | |
| 				} | |
| 			} | |
| 			*/ | |
| 			up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) | |
| 			// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) | |
| 		} | |
| 		// fmt.Printf("isSequential:%v len(up.writableChunks):%v memChunkCounter:%v", isSequential, len(up.writableChunks), memChunkCounter) | |
| 		if isSequential && | |
| 			len(up.writableChunks) < up.writableChunkLimit && | |
| 			atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { | |
| 			pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) | |
| 			// fmt.Printf(" create mem  chunk %d\n", logicChunkIndex) | |
| 		} else { | |
| 			pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) | |
| 			// fmt.Printf(" create file chunk %d\n", logicChunkIndex) | |
| 		} | |
| 		up.writableChunks[logicChunkIndex] = pageChunk | |
| 	} | |
| 	//if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed { | |
| 	//	println("found already sealed chunk", logicChunkIndex) | |
| 	//} | |
| 	//if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading { | |
| 	//	println("found active read chunk", logicChunkIndex) | |
| 	//} | |
| 	n = pageChunk.WriteDataAt(p, off, tsNs) | |
| 	up.maybeMoveToSealed(pageChunk, logicChunkIndex) | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { | |
| 	logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) | |
| 
 | |
| 	up.chunksLock.Lock() | |
| 	defer func() { | |
| 		up.readerCountCond.Signal() | |
| 		up.chunksLock.Unlock() | |
| 	}() | |
| 
 | |
| 	// read from sealed chunks first | |
| 	sealedChunk, found := up.sealedChunks[logicChunkIndex] | |
| 	if found { | |
| 		maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs) | |
| 		glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) | |
| 	} | |
| 
 | |
| 	// read from writable chunks last | |
| 	writableChunk, found := up.writableChunks[logicChunkIndex] | |
| 	if !found { | |
| 		return | |
| 	} | |
| 	writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs) | |
| 	glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) | |
| 	maxStop = max(maxStop, writableMaxStop) | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| func (up *UploadPipeline) FlushAll() { | |
| 	up.flushChunks() | |
| 	up.waitForCurrentWritersToComplete() | |
| } | |
| 
 | |
| func (up *UploadPipeline) flushChunks() { | |
| 	up.chunksLock.Lock() | |
| 	defer up.chunksLock.Unlock() | |
| 
 | |
| 	for logicChunkIndex, memChunk := range up.writableChunks { | |
| 		up.moveToSealed(memChunk, logicChunkIndex) | |
| 	} | |
| } | |
| 
 | |
| func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { | |
| 	if memChunk.IsComplete() { | |
| 		up.moveToSealed(memChunk, logicChunkIndex) | |
| 	} | |
| } | |
| 
 | |
| func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { | |
| 	atomic.AddInt32(&up.uploaderCount, 1) | |
| 	glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) | |
| 
 | |
| 	if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { | |
| 		oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) | |
| 	} | |
| 	sealedChunk := &SealedChunk{ | |
| 		chunk:            memChunk, | |
| 		referenceCounter: 1, // default 1 is for uploading process | |
| 	} | |
| 	up.sealedChunks[logicChunkIndex] = sealedChunk | |
| 	delete(up.writableChunks, logicChunkIndex) | |
| 
 | |
| 	// unlock before submitting the uploading jobs | |
| 	up.chunksLock.Unlock() | |
| 	up.uploaders.Execute(func() { | |
| 		// first add to the file chunks | |
| 		sealedChunk.chunk.SaveContent(up.saveToStorageFn) | |
| 
 | |
| 		// notify waiting process | |
| 		atomic.AddInt32(&up.uploaderCount, -1) | |
| 		glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) | |
| 		// Lock and Unlock are not required, | |
| 		// but it may signal multiple times during one wakeup, | |
| 		// and the waiting goroutine may miss some of them! | |
| 		up.uploaderCountCond.L.Lock() | |
| 		up.uploaderCountCond.Broadcast() | |
| 		up.uploaderCountCond.L.Unlock() | |
| 
 | |
| 		// wait for readers | |
| 		up.chunksLock.Lock() | |
| 		defer up.chunksLock.Unlock() | |
| 		for up.IsLocked(logicChunkIndex) { | |
| 			up.readerCountCond.Wait() | |
| 		} | |
| 
 | |
| 		// then remove from sealed chunks | |
| 		delete(up.sealedChunks, logicChunkIndex) | |
| 		sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) | |
| 
 | |
| 	}) | |
| 	up.chunksLock.Lock() | |
| } | |
| 
 | |
| func (up *UploadPipeline) Shutdown() { | |
| 	up.swapFile.FreeResource() | |
| 
 | |
| 	up.chunksLock.Lock() | |
| 	defer up.chunksLock.Unlock() | |
| 	for logicChunkIndex, sealedChunk := range up.sealedChunks { | |
| 		sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex)) | |
| 	} | |
| }
 |