You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
134 lines
4.1 KiB
134 lines
4.1 KiB
package broker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
|
|
return b.appendToFileWithBufferIndex(targetFile, data, 0)
|
|
}
|
|
|
|
func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error {
|
|
|
|
fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
|
|
// find out existing entry
|
|
fullpath := util.FullPath(targetFile)
|
|
dir, name := fullpath.DirAndName()
|
|
entry, err := filer_pb.GetEntry(context.Background(), b, fullpath)
|
|
var offset int64 = 0
|
|
if err == filer_pb.ErrNotFound {
|
|
entry = &filer_pb.Entry{
|
|
Name: name,
|
|
IsDirectory: false,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Crtime: time.Now().Unix(),
|
|
Mtime: time.Now().Unix(),
|
|
FileMode: uint32(os.FileMode(0644)),
|
|
Uid: uint32(os.Getuid()),
|
|
Gid: uint32(os.Getgid()),
|
|
},
|
|
}
|
|
|
|
// Add buffer start index for deduplication tracking (binary format)
|
|
if bufferIndex != 0 {
|
|
entry.Extended = make(map[string][]byte)
|
|
bufferStartBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
|
|
entry.Extended["buffer_start"] = bufferStartBytes
|
|
}
|
|
} else if err != nil {
|
|
return fmt.Errorf("find %s: %v", fullpath, err)
|
|
} else {
|
|
offset = int64(filer.TotalSize(entry.GetChunks()))
|
|
|
|
// Verify buffer index continuity for existing files (append operations)
|
|
if bufferIndex != 0 {
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
// Check for existing buffer start (binary format)
|
|
if existingData, exists := entry.Extended["buffer_start"]; exists {
|
|
if len(existingData) == 8 {
|
|
existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
|
|
|
|
// Verify that the new buffer index is consecutive
|
|
// Expected index = start + number of existing chunks
|
|
expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
|
|
if bufferIndex != expectedIndex {
|
|
// This shouldn't happen in normal operation
|
|
// Log warning but continue (don't crash the system)
|
|
glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d",
|
|
fullpath, expectedIndex, bufferIndex)
|
|
}
|
|
// Note: We don't update the start index - it stays the same
|
|
}
|
|
} else {
|
|
// No existing buffer start, create new one (shouldn't happen for existing files)
|
|
bufferStartBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
|
|
entry.Extended["buffer_start"] = bufferStartBytes
|
|
}
|
|
}
|
|
}
|
|
|
|
// append to existing chunks
|
|
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
|
|
|
|
// update the entry
|
|
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
|
|
Directory: dir,
|
|
Entry: entry,
|
|
})
|
|
})
|
|
}
|
|
|
|
func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
|
|
|
|
reader := util.NewBytesReader(data)
|
|
|
|
uploader, err := operation.NewUploader()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
fileId, uploadResult, err, _ = uploader.UploadWithRetry(
|
|
b,
|
|
&filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: b.option.DefaultReplication,
|
|
Collection: "topics",
|
|
// TtlSec: wfs.option.TtlSec,
|
|
// DiskType: string(wfs.option.DiskType),
|
|
DataCenter: b.option.DataCenter,
|
|
Path: targetFile,
|
|
},
|
|
&operation.UploadOption{
|
|
Cipher: b.option.Cipher,
|
|
},
|
|
func(host, fileId string) string {
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
|
if b.option.VolumeServerAccess == "filerProxy" {
|
|
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
|
|
}
|
|
return fileUrl
|
|
},
|
|
reader,
|
|
)
|
|
return
|
|
}
|