You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
454 lines
14 KiB
454 lines
14 KiB
package logstore
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"github.com/parquet-go/parquet-go"
|
|
"github.com/parquet-go/parquet-go/compress/zstd"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
|
"google.golang.org/protobuf/proto"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
SW_COLUMN_NAME_TS = "_ts_ns"
|
|
SW_COLUMN_NAME_KEY = "_key"
|
|
)
|
|
|
|
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
|
|
// list the topic partition versions
|
|
topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
|
|
if err != nil {
|
|
return fmt.Errorf("list topic files: %v", err)
|
|
}
|
|
|
|
// compact the partitions
|
|
for _, topicVersion := range topicVersions {
|
|
partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
|
|
if err != nil {
|
|
return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
|
|
}
|
|
for _, partition := range partitions {
|
|
err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
|
|
if err != nil {
|
|
return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
|
|
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
t, err := topic.ParseTopicVersion(entry.Name)
|
|
if err != nil {
|
|
// skip non-partition directories
|
|
return nil
|
|
}
|
|
if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
|
|
partitionVersions = append(partitionVersions, t)
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
|
|
version := topicVersion.Format(topic.PartitionGenerationFormat)
|
|
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if !entry.IsDirectory {
|
|
return nil
|
|
}
|
|
start, stop := topic.ParsePartitionBoundary(entry.Name)
|
|
if start != stop {
|
|
partitions = append(partitions, topic.Partition{
|
|
RangeStart: start,
|
|
RangeStop: stop,
|
|
RingSize: topic.PartitionCount,
|
|
UnixTimeNs: topicVersion.UnixNano(),
|
|
})
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
|
|
partitionDir := topic.PartitionDir(t, partition)
|
|
|
|
// compact the partition directory
|
|
return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
|
|
}
|
|
|
|
func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
|
|
// read all existing parquet files
|
|
minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// read all log files
|
|
logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(logFiles) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// divide log files into groups of 128MB
|
|
logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
|
|
|
|
// write to parquet file
|
|
parquetLevels, err := schema.ToParquetLevels(recordType)
|
|
if err != nil {
|
|
return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
|
|
}
|
|
|
|
// create a parquet schema
|
|
parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
|
|
if err != nil {
|
|
return fmt.Errorf("ToParquetSchema failed: %v", err)
|
|
}
|
|
|
|
// TODO parallelize the writing
|
|
for _, logFileGroup := range logFileGroups {
|
|
if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
|
|
var logFileGroup []*filer_pb.Entry
|
|
var groupSize int64
|
|
for _, logFile := range logFiles {
|
|
if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
|
|
logFileGroups = append(logFileGroups, logFileGroup)
|
|
logFileGroup = nil
|
|
groupSize = 0
|
|
}
|
|
logFileGroup = append(logFileGroup, logFile)
|
|
groupSize += int64(logFile.Attributes.FileSize)
|
|
}
|
|
if len(logFileGroup) > 0 {
|
|
logFileGroups = append(logFileGroups, logFileGroup)
|
|
}
|
|
return
|
|
}
|
|
|
|
func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
|
|
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if strings.HasSuffix(entry.Name, ".parquet") {
|
|
return nil
|
|
}
|
|
if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
|
|
return nil
|
|
}
|
|
logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
|
|
if err != nil {
|
|
// glog.Warningf("parse log time %s: %v", entry.Name, err)
|
|
return nil
|
|
}
|
|
if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
|
|
return nil
|
|
}
|
|
logFiles = append(logFiles, entry)
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
|
|
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if !strings.HasSuffix(entry.Name, ".parquet") {
|
|
return nil
|
|
}
|
|
if len(entry.Extended) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// read min ts
|
|
minTsBytes := entry.Extended["min"]
|
|
if len(minTsBytes) != 8 {
|
|
return nil
|
|
}
|
|
minTs := int64(binary.BigEndian.Uint64(minTsBytes))
|
|
if minTsNs == 0 || minTs < minTsNs {
|
|
minTsNs = minTs
|
|
}
|
|
|
|
// read max ts
|
|
maxTsBytes := entry.Extended["max"]
|
|
if len(maxTsBytes) != 8 {
|
|
return nil
|
|
}
|
|
maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
|
|
if maxTsNs == 0 || maxTs > maxTsNs {
|
|
maxTsNs = maxTs
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
|
|
|
|
tempFile, err := os.CreateTemp(".", "t*.parquet")
|
|
if err != nil {
|
|
return fmt.Errorf("create temp file: %v", err)
|
|
}
|
|
defer func() {
|
|
tempFile.Close()
|
|
os.Remove(tempFile.Name())
|
|
}()
|
|
|
|
writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
|
|
rowBuilder := parquet.NewRowBuilder(parquetSchema)
|
|
|
|
var startTsNs, stopTsNs int64
|
|
|
|
for _, logFile := range logFileGroups {
|
|
fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
|
|
var rows []parquet.Row
|
|
if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
|
|
|
|
if startTsNs == 0 {
|
|
startTsNs = entry.TsNs
|
|
}
|
|
stopTsNs = entry.TsNs
|
|
|
|
if len(entry.Key) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// write to parquet file
|
|
rowBuilder.Reset()
|
|
|
|
record := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(entry.Data, record); err != nil {
|
|
return fmt.Errorf("unmarshal record value: %v", err)
|
|
}
|
|
|
|
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{
|
|
Int64Value: entry.TsNs,
|
|
},
|
|
}
|
|
record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{
|
|
BytesValue: entry.Key,
|
|
},
|
|
}
|
|
|
|
if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
|
|
return fmt.Errorf("add record value: %v", err)
|
|
}
|
|
|
|
rows = append(rows, rowBuilder.Row())
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
|
|
}
|
|
|
|
fmt.Printf("processed %d rows\n", len(rows))
|
|
|
|
if _, err := writer.WriteRows(rows); err != nil {
|
|
return fmt.Errorf("write rows: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := writer.Close(); err != nil {
|
|
return fmt.Errorf("close writer: %v", err)
|
|
}
|
|
|
|
// write to parquet file to partitionDir
|
|
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
|
|
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
|
|
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
|
|
uploader, err := operation.NewUploader()
|
|
if err != nil {
|
|
return fmt.Errorf("new uploader: %v", err)
|
|
}
|
|
|
|
// get file size
|
|
fileInfo, err := sourceFile.Stat()
|
|
if err != nil {
|
|
return fmt.Errorf("stat source file: %v", err)
|
|
}
|
|
|
|
// upload file in chunks
|
|
chunkSize := int64(4 * 1024 * 1024)
|
|
chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
|
|
entry := &filer_pb.Entry{
|
|
Name: parquetFileName,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Crtime: time.Now().Unix(),
|
|
Mtime: time.Now().Unix(),
|
|
FileMode: uint32(os.FileMode(0644)),
|
|
FileSize: uint64(fileInfo.Size()),
|
|
Mime: "application/vnd.apache.parquet",
|
|
},
|
|
}
|
|
entry.Extended = make(map[string][]byte)
|
|
minTsBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
|
|
entry.Extended["min"] = minTsBytes
|
|
maxTsBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
|
|
entry.Extended["max"] = maxTsBytes
|
|
|
|
for i := int64(0); i < chunkCount; i++ {
|
|
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
|
|
filerClient,
|
|
&filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: preference.Replication,
|
|
Collection: preference.Collection,
|
|
TtlSec: 0, // TODO set ttl
|
|
DiskType: preference.DiskType,
|
|
Path: partitionDir + "/" + parquetFileName,
|
|
},
|
|
&operation.UploadOption{
|
|
Filename: parquetFileName,
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "application/vnd.apache.parquet",
|
|
PairMap: nil,
|
|
},
|
|
func(host, fileId string) string {
|
|
return fmt.Sprintf("http://%s/%s", host, fileId)
|
|
},
|
|
io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upload chunk %d: %v", i, err)
|
|
}
|
|
if uploadResult.Error != "" {
|
|
return fmt.Errorf("upload result: %v", uploadResult.Error)
|
|
}
|
|
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
|
|
}
|
|
|
|
// write the entry to partitionDir
|
|
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
|
|
Directory: partitionDir,
|
|
Entry: entry,
|
|
})
|
|
}); err != nil {
|
|
return fmt.Errorf("create entry: %v", err)
|
|
}
|
|
fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
|
|
lookupFn := filer.LookupFn(filerClient)
|
|
_, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
|
|
if err := eachLogEntryFn(logEntry); err != nil {
|
|
return true, err
|
|
}
|
|
return false, nil
|
|
})
|
|
return err
|
|
}
|
|
|
|
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
|
|
if len(entry.Content) > 0 {
|
|
// skip .offset files
|
|
return
|
|
}
|
|
var urlStrings []string
|
|
for _, chunk := range entry.Chunks {
|
|
if chunk.Size == 0 {
|
|
continue
|
|
}
|
|
if chunk.IsChunkManifest {
|
|
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
|
|
return
|
|
}
|
|
urlStrings, err = lookupFileIdFn(chunk.FileId)
|
|
if err != nil {
|
|
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
|
|
return
|
|
}
|
|
if len(urlStrings) == 0 {
|
|
err = fmt.Errorf("no url found for %s", chunk.FileId)
|
|
return
|
|
}
|
|
|
|
// try one of the urlString until util.Get(urlString) succeeds
|
|
var processed bool
|
|
for _, urlString := range urlStrings {
|
|
var data []byte
|
|
if data, _, err = util_http.Get(urlString); err == nil {
|
|
processed = true
|
|
if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
|
|
return
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if !processed {
|
|
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
|
|
return
|
|
}
|
|
|
|
}
|
|
return
|
|
}
|
|
|
|
func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
|
|
for pos := 0; pos+4 < len(buf); {
|
|
|
|
size := util.BytesToUint32(buf[pos : pos+4])
|
|
if pos+4+int(size) > len(buf) {
|
|
err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
|
|
return
|
|
}
|
|
entryData := buf[pos+4 : pos+4+int(size)]
|
|
|
|
logEntry := &filer_pb.LogEntry{}
|
|
if err = proto.Unmarshal(entryData, logEntry); err != nil {
|
|
pos += 4 + int(size)
|
|
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
|
|
return
|
|
}
|
|
|
|
if _, err = eachLogEntryFn(logEntry); err != nil {
|
|
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
|
|
return
|
|
}
|
|
|
|
processedTsNs = logEntry.TsNs
|
|
|
|
pos += 4 + int(size)
|
|
|
|
}
|
|
|
|
return
|
|
}
|