package page_writer

import (
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"sync"
	"sync/atomic"
	"time"
)

type LogicChunkIndex int

type UploadPipeline struct {
	uploaderCount        int32
	uploaderCountCond    *sync.Cond
	filepath             util.FullPath
	ChunkSize            int64
	writableChunks       map[LogicChunkIndex]PageChunk
	writableChunksLock   sync.Mutex
	sealedChunks         map[LogicChunkIndex]*SealedChunk
	sealedChunksLock     sync.Mutex
	uploaders            *util.LimitedConcurrentExecutor
	saveToStorageFn      SaveToStorageFunc
	activeReadChunks     map[LogicChunkIndex]int
	activeReadChunksLock sync.Mutex
	writableChunkLimit   int
	swapFile             *SwapFile
}

type SealedChunk struct {
	chunk            PageChunk
	referenceCounter int // track uploading or reading processes
}

func (sc *SealedChunk) FreeReference(messageOnFree string) {
	sc.referenceCounter--
	if sc.referenceCounter == 0 {
		glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
		sc.chunk.FreeResource()
	}
}

func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
	return &UploadPipeline{
		ChunkSize:          chunkSize,
		writableChunks:     make(map[LogicChunkIndex]PageChunk),
		sealedChunks:       make(map[LogicChunkIndex]*SealedChunk),
		uploaders:          writers,
		uploaderCountCond:  sync.NewCond(&sync.Mutex{}),
		saveToStorageFn:    saveToStorageFn,
		activeReadChunks:   make(map[LogicChunkIndex]int),
		writableChunkLimit: bufferChunkLimit,
		swapFile:           NewSwapFile(swapFileDir, chunkSize),
	}
}

func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
	up.writableChunksLock.Lock()
	defer up.writableChunksLock.Unlock()

	logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)

	pageChunk, found := up.writableChunks[logicChunkIndex]
	if !found {
		if len(up.writableChunks) > up.writableChunkLimit {
			// if current file chunks is over the per file buffer count limit
			fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
			for lci, mc := range up.writableChunks {
				chunkFullness := mc.WrittenSize()
				if fullness < chunkFullness {
					fullestChunkIndex = lci
					fullness = chunkFullness
				}
			}
			up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
			delete(up.writableChunks, fullestChunkIndex)
			// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
		}
		if isSequential &&
			len(up.writableChunks) < up.writableChunkLimit &&
			atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
			pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
		} else {
			pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
		}
		up.writableChunks[logicChunkIndex] = pageChunk
	}
	n = pageChunk.WriteDataAt(p, off)
	up.maybeMoveToSealed(pageChunk, logicChunkIndex)

	return
}

func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
	logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)

	// read from sealed chunks first
	up.sealedChunksLock.Lock()
	sealedChunk, found := up.sealedChunks[logicChunkIndex]
	if found {
		sealedChunk.referenceCounter++
	}
	up.sealedChunksLock.Unlock()
	if found {
		maxStop = sealedChunk.chunk.ReadDataAt(p, off)
		glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
		sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
	}

	// read from writable chunks last
	up.writableChunksLock.Lock()
	defer up.writableChunksLock.Unlock()
	writableChunk, found := up.writableChunks[logicChunkIndex]
	if !found {
		return
	}
	writableMaxStop := writableChunk.ReadDataAt(p, off)
	glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
	maxStop = max(maxStop, writableMaxStop)

	return
}

func (up *UploadPipeline) FlushAll() {
	up.writableChunksLock.Lock()
	defer up.writableChunksLock.Unlock()

	for logicChunkIndex, memChunk := range up.writableChunks {
		up.moveToSealed(memChunk, logicChunkIndex)
	}

	up.waitForCurrentWritersToComplete()
}

func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
	if memChunk.IsComplete() {
		up.moveToSealed(memChunk, logicChunkIndex)
	}
}

func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
	atomic.AddInt32(&up.uploaderCount, 1)
	glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)

	up.sealedChunksLock.Lock()

	if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
		oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
	}
	sealedChunk := &SealedChunk{
		chunk:            memChunk,
		referenceCounter: 1, // default 1 is for uploading process
	}
	up.sealedChunks[logicChunkIndex] = sealedChunk
	delete(up.writableChunks, logicChunkIndex)

	up.sealedChunksLock.Unlock()

	up.uploaders.Execute(func() {
		// first add to the file chunks
		sealedChunk.chunk.SaveContent(up.saveToStorageFn)

		// notify waiting process
		atomic.AddInt32(&up.uploaderCount, -1)
		glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
		// Lock and Unlock are not required,
		// but it may signal multiple times during one wakeup,
		// and the waiting goroutine may miss some of them!
		up.uploaderCountCond.L.Lock()
		up.uploaderCountCond.Broadcast()
		up.uploaderCountCond.L.Unlock()

		// wait for readers
		for up.IsLocked(logicChunkIndex) {
			time.Sleep(59 * time.Millisecond)
		}

		// then remove from sealed chunks
		up.sealedChunksLock.Lock()
		defer up.sealedChunksLock.Unlock()
		delete(up.sealedChunks, logicChunkIndex)
		sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))

	})
}

func (up *UploadPipeline) Shutdown() {
	up.swapFile.FreeResource()

	up.sealedChunksLock.Lock()
	defer up.sealedChunksLock.Unlock()
	for logicChunkIndex, sealedChunk := range up.sealedChunks {
		sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
	}
}