Browse Source
Merge branch 'seaweedfs:master' into feature/suport_dameng_as_filer_store
pull/6061/head
Merge branch 'seaweedfs:master' into feature/suport_dameng_as_filer_store
pull/6061/head
Vegetable540
2 months ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
220 changed files with 5350 additions and 3897 deletions
-
8.github/workflows/binaries_dev.yml
-
4.github/workflows/binaries_release0.yml
-
4.github/workflows/binaries_release1.yml
-
4.github/workflows/binaries_release2.yml
-
4.github/workflows/binaries_release3.yml
-
8.github/workflows/binaries_release4.yml
-
4.github/workflows/binaries_release5.yml
-
4.github/workflows/container_dev.yml
-
4.github/workflows/container_latest.yml
-
4.github/workflows/container_release1.yml
-
4.github/workflows/container_release2.yml
-
4.github/workflows/container_release3.yml
-
6.github/workflows/container_release4.yml
-
6.github/workflows/container_release5.yml
-
2Makefile
-
125go.mod
-
275go.sum
-
4k8s/charts/seaweedfs/Chart.yaml
-
4k8s/charts/seaweedfs/templates/filer-cert.yaml
-
4k8s/charts/seaweedfs/templates/filer-service-client.yaml
-
4k8s/charts/seaweedfs/templates/filer-service.yaml
-
4k8s/charts/seaweedfs/templates/filer-servicemonitor.yaml
-
25k8s/charts/seaweedfs/templates/filer-statefulset.yaml
-
4k8s/charts/seaweedfs/templates/master-cert.yaml
-
4k8s/charts/seaweedfs/templates/master-configmap.yaml
-
3k8s/charts/seaweedfs/templates/master-service.yaml
-
4k8s/charts/seaweedfs/templates/master-servicemonitor.yaml
-
8k8s/charts/seaweedfs/templates/master-statefulset.yaml
-
19k8s/charts/seaweedfs/templates/notification-configmap.yaml
-
5k8s/charts/seaweedfs/templates/s3-deployment.yaml
-
4k8s/charts/seaweedfs/templates/s3-service.yaml
-
4k8s/charts/seaweedfs/templates/s3-servicemonitor.yaml
-
4k8s/charts/seaweedfs/templates/volume-cert.yaml
-
4k8s/charts/seaweedfs/templates/volume-service.yaml
-
4k8s/charts/seaweedfs/templates/volume-servicemonitor.yaml
-
14k8s/charts/seaweedfs/templates/volume-statefulset.yaml
-
63k8s/charts/seaweedfs/values.yaml
-
14other/metrics/grafana_seaweedfs.json
-
4weed/command/benchmark.go
-
2weed/command/filer_copy.go
-
1weed/command/mount_std.go
-
4weed/command/update_full.go
-
18weed/command/upload.go
-
8weed/filer/filechunk_section.go
-
26weed/filer/filer_notify_read.go
-
3weed/filer/filerstore_wrapper.go
-
14weed/filer/reader_at.go
-
8weed/filer/reader_at_test.go
-
1weed/filer/topics.go
-
34weed/filer_client/filer_client_accessor.go
-
2weed/mount/filehandle_read.go
-
53weed/mount/filer_conf.go
-
43weed/mount/meta_cache/meta_cache_subscribe.go
-
2weed/mount/page_writer.go
-
7weed/mount/weedfs.go
-
1weed/mount/weedfs_attr.go
-
9weed/mount/weedfs_file_io.go
-
14weed/mount/weedfs_rename.go
-
3weed/mq/broker/broker_grpc_assign.go
-
2weed/mq/broker/broker_grpc_configure.go
-
5weed/mq/broker/broker_grpc_pub_follow.go
-
9weed/mq/broker/broker_grpc_sub_follow.go
-
5weed/mq/broker/broker_topic_conf_read_write.go
-
137weed/mq/broker/broker_topic_partition_read_write.go
-
13weed/mq/client/cmd/weed_pub_record/publisher_record.go
-
15weed/mq/client/cmd/weed_sub_record/subscriber_record.go
-
12weed/mq/client/sub_client/on_each_partition.go
-
1weed/mq/client/sub_client/subscriber.go
-
454weed/mq/logstore/log_to_parquet.go
-
41weed/mq/logstore/merged_read.go
-
144weed/mq/logstore/read_log_from_disk.go
-
162weed/mq/logstore/read_parquet_to_log.go
-
4weed/mq/pub_balancer/broker_stats.go
-
27weed/mq/schema/schema.go
-
7weed/mq/schema/schema_builder.go
-
10weed/mq/sub_coordinator/partition_consumer_mapping.go
-
14weed/mq/sub_coordinator/partition_list.go
-
2weed/mq/topic/local_manager.go
-
1weed/mq/topic/local_partition.go
-
27weed/mq/topic/partition.go
-
44weed/mq/topic/topic.go
-
2weed/mq/topic/topic_partition.go
-
64weed/operation/submit.go
-
142weed/pb/filer_pb/filer.pb.go
-
322weed/pb/filer_pb/filer_grpc.pb.go
-
14weed/pb/iam_pb/iam.pb.go
-
29weed/pb/iam_pb/iam_grpc.pb.go
-
2weed/pb/master.proto
-
1307weed/pb/master_pb/master.pb.go
-
270weed/pb/master_pb/master_grpc.pb.go
-
10weed/pb/mount_pb/mount.pb.go
-
30weed/pb/mount_pb/mount_grpc.pb.go
-
128weed/pb/mq_pb/mq.pb.go
-
420weed/pb/mq_pb/mq_grpc.pb.go
-
12weed/pb/remote_pb/remote.pb.go
-
14weed/pb/s3_pb/s3.pb.go
-
30weed/pb/s3_pb/s3_grpc.pb.go
-
24weed/pb/schema_pb/schema.pb.go
-
13weed/pb/volume_server.proto
-
1766weed/pb/volume_server_pb/volume_server.pb.go
@ -1,6 +1,6 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "3.73" |
|||
appVersion: "3.79" |
|||
# Dev note: Trigger a helm chart release by `git tag -a helm-<version>` |
|||
version: 4.0.1 |
|||
version: 4.0.379 |
@ -0,0 +1,19 @@ |
|||
{{- if and .Values.filer.enabled .Values.filer.notificationConfig }} |
|||
apiVersion: v1 |
|||
kind: ConfigMap |
|||
metadata: |
|||
name: {{ template "seaweedfs.name" . }}-notification-config |
|||
namespace: {{ .Release.Namespace }} |
|||
labels: |
|||
app.kubernetes.io/name: {{ template "seaweedfs.name" . }} |
|||
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} |
|||
app.kubernetes.io/managed-by: {{ .Release.Service }} |
|||
app.kubernetes.io/instance: {{ .Release.Name }} |
|||
{{- if .Values.filer.annotations }} |
|||
annotations: |
|||
{{- toYaml .Values.filer.annotations | nindent 4 }} |
|||
{{- end }} |
|||
data: |
|||
notification.toml: |- |
|||
{{ .Values.filer.notificationConfig | nindent 4 }} |
|||
{{- end }} |
@ -0,0 +1,454 @@ |
|||
package logstore |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"fmt" |
|||
"github.com/parquet-go/parquet-go" |
|||
"github.com/parquet-go/parquet-go/compress/zstd" |
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/operation" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|||
"google.golang.org/protobuf/proto" |
|||
"io" |
|||
"os" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
SW_COLUMN_NAME_TS = "_ts_ns" |
|||
SW_COLUMN_NAME_KEY = "_key" |
|||
) |
|||
|
|||
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { |
|||
// list the topic partition versions
|
|||
topicVersions, err := collectTopicVersions(filerClient, t, timeAgo) |
|||
if err != nil { |
|||
return fmt.Errorf("list topic files: %v", err) |
|||
} |
|||
|
|||
// compact the partitions
|
|||
for _, topicVersion := range topicVersions { |
|||
partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion) |
|||
if err != nil { |
|||
return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err) |
|||
} |
|||
for _, partition := range partitions { |
|||
err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference) |
|||
if err != nil { |
|||
return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err) |
|||
} |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) { |
|||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
t, err := topic.ParseTopicVersion(entry.Name) |
|||
if err != nil { |
|||
// skip non-partition directories
|
|||
return nil |
|||
} |
|||
if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) { |
|||
partitionVersions = append(partitionVersions, t) |
|||
} |
|||
return nil |
|||
}) |
|||
return |
|||
} |
|||
|
|||
func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) { |
|||
version := topicVersion.Format(topic.PartitionGenerationFormat) |
|||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if !entry.IsDirectory { |
|||
return nil |
|||
} |
|||
start, stop := topic.ParsePartitionBoundary(entry.Name) |
|||
if start != stop { |
|||
partitions = append(partitions, topic.Partition{ |
|||
RangeStart: start, |
|||
RangeStop: stop, |
|||
RingSize: topic.PartitionCount, |
|||
UnixTimeNs: topicVersion.UnixNano(), |
|||
}) |
|||
} |
|||
return nil |
|||
}) |
|||
return |
|||
} |
|||
|
|||
func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error { |
|||
partitionDir := topic.PartitionDir(t, partition) |
|||
|
|||
// compact the partition directory
|
|||
return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference) |
|||
} |
|||
|
|||
func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { |
|||
// read all existing parquet files
|
|||
minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// read all log files
|
|||
logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if len(logFiles) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
// divide log files into groups of 128MB
|
|||
logFileGroups := groupFilesBySize(logFiles, 128*1024*1024) |
|||
|
|||
// write to parquet file
|
|||
parquetLevels, err := schema.ToParquetLevels(recordType) |
|||
if err != nil { |
|||
return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err) |
|||
} |
|||
|
|||
// create a parquet schema
|
|||
parquetSchema, err := schema.ToParquetSchema(topicName, recordType) |
|||
if err != nil { |
|||
return fmt.Errorf("ToParquetSchema failed: %v", err) |
|||
} |
|||
|
|||
// TODO parallelize the writing
|
|||
for _, logFileGroup := range logFileGroups { |
|||
if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) { |
|||
var logFileGroup []*filer_pb.Entry |
|||
var groupSize int64 |
|||
for _, logFile := range logFiles { |
|||
if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize { |
|||
logFileGroups = append(logFileGroups, logFileGroup) |
|||
logFileGroup = nil |
|||
groupSize = 0 |
|||
} |
|||
logFileGroup = append(logFileGroup, logFile) |
|||
groupSize += int64(logFile.Attributes.FileSize) |
|||
} |
|||
if len(logFileGroup) > 0 { |
|||
logFileGroups = append(logFileGroups, logFileGroup) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) { |
|||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if strings.HasSuffix(entry.Name, ".parquet") { |
|||
return nil |
|||
} |
|||
if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) { |
|||
return nil |
|||
} |
|||
logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name) |
|||
if err != nil { |
|||
// glog.Warningf("parse log time %s: %v", entry.Name, err)
|
|||
return nil |
|||
} |
|||
if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs { |
|||
return nil |
|||
} |
|||
logFiles = append(logFiles, entry) |
|||
return nil |
|||
}) |
|||
return |
|||
} |
|||
|
|||
func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) { |
|||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if !strings.HasSuffix(entry.Name, ".parquet") { |
|||
return nil |
|||
} |
|||
if len(entry.Extended) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
// read min ts
|
|||
minTsBytes := entry.Extended["min"] |
|||
if len(minTsBytes) != 8 { |
|||
return nil |
|||
} |
|||
minTs := int64(binary.BigEndian.Uint64(minTsBytes)) |
|||
if minTsNs == 0 || minTs < minTsNs { |
|||
minTsNs = minTs |
|||
} |
|||
|
|||
// read max ts
|
|||
maxTsBytes := entry.Extended["max"] |
|||
if len(maxTsBytes) != 8 { |
|||
return nil |
|||
} |
|||
maxTs := int64(binary.BigEndian.Uint64(maxTsBytes)) |
|||
if maxTsNs == 0 || maxTs > maxTsNs { |
|||
maxTsNs = maxTs |
|||
} |
|||
return nil |
|||
}) |
|||
return |
|||
} |
|||
|
|||
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) { |
|||
|
|||
tempFile, err := os.CreateTemp(".", "t*.parquet") |
|||
if err != nil { |
|||
return fmt.Errorf("create temp file: %v", err) |
|||
} |
|||
defer func() { |
|||
tempFile.Close() |
|||
os.Remove(tempFile.Name()) |
|||
}() |
|||
|
|||
writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel})) |
|||
rowBuilder := parquet.NewRowBuilder(parquetSchema) |
|||
|
|||
var startTsNs, stopTsNs int64 |
|||
|
|||
for _, logFile := range logFileGroups { |
|||
fmt.Printf("compact %s/%s ", partitionDir, logFile.Name) |
|||
var rows []parquet.Row |
|||
if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error { |
|||
|
|||
if startTsNs == 0 { |
|||
startTsNs = entry.TsNs |
|||
} |
|||
stopTsNs = entry.TsNs |
|||
|
|||
if len(entry.Key) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
// write to parquet file
|
|||
rowBuilder.Reset() |
|||
|
|||
record := &schema_pb.RecordValue{} |
|||
if err := proto.Unmarshal(entry.Data, record); err != nil { |
|||
return fmt.Errorf("unmarshal record value: %v", err) |
|||
} |
|||
|
|||
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ |
|||
Kind: &schema_pb.Value_Int64Value{ |
|||
Int64Value: entry.TsNs, |
|||
}, |
|||
} |
|||
record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ |
|||
Kind: &schema_pb.Value_BytesValue{ |
|||
BytesValue: entry.Key, |
|||
}, |
|||
} |
|||
|
|||
if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil { |
|||
return fmt.Errorf("add record value: %v", err) |
|||
} |
|||
|
|||
rows = append(rows, rowBuilder.Row()) |
|||
|
|||
return nil |
|||
|
|||
}); err != nil { |
|||
return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err) |
|||
} |
|||
|
|||
fmt.Printf("processed %d rows\n", len(rows)) |
|||
|
|||
if _, err := writer.WriteRows(rows); err != nil { |
|||
return fmt.Errorf("write rows: %v", err) |
|||
} |
|||
} |
|||
|
|||
if err := writer.Close(); err != nil { |
|||
return fmt.Errorf("close writer: %v", err) |
|||
} |
|||
|
|||
// write to parquet file to partitionDir
|
|||
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) |
|||
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil { |
|||
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) |
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
|||
|
|||
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error { |
|||
uploader, err := operation.NewUploader() |
|||
if err != nil { |
|||
return fmt.Errorf("new uploader: %v", err) |
|||
} |
|||
|
|||
// get file size
|
|||
fileInfo, err := sourceFile.Stat() |
|||
if err != nil { |
|||
return fmt.Errorf("stat source file: %v", err) |
|||
} |
|||
|
|||
// upload file in chunks
|
|||
chunkSize := int64(4 * 1024 * 1024) |
|||
chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize |
|||
entry := &filer_pb.Entry{ |
|||
Name: parquetFileName, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Crtime: time.Now().Unix(), |
|||
Mtime: time.Now().Unix(), |
|||
FileMode: uint32(os.FileMode(0644)), |
|||
FileSize: uint64(fileInfo.Size()), |
|||
Mime: "application/vnd.apache.parquet", |
|||
}, |
|||
} |
|||
entry.Extended = make(map[string][]byte) |
|||
minTsBytes := make([]byte, 8) |
|||
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs)) |
|||
entry.Extended["min"] = minTsBytes |
|||
maxTsBytes := make([]byte, 8) |
|||
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) |
|||
entry.Extended["max"] = maxTsBytes |
|||
|
|||
for i := int64(0); i < chunkCount; i++ { |
|||
fileId, uploadResult, err, _ := uploader.UploadWithRetry( |
|||
filerClient, |
|||
&filer_pb.AssignVolumeRequest{ |
|||
Count: 1, |
|||
Replication: preference.Replication, |
|||
Collection: preference.Collection, |
|||
TtlSec: 0, // TODO set ttl
|
|||
DiskType: preference.DiskType, |
|||
Path: partitionDir + "/" + parquetFileName, |
|||
}, |
|||
&operation.UploadOption{ |
|||
Filename: parquetFileName, |
|||
Cipher: false, |
|||
IsInputCompressed: false, |
|||
MimeType: "application/vnd.apache.parquet", |
|||
PairMap: nil, |
|||
}, |
|||
func(host, fileId string) string { |
|||
return fmt.Sprintf("http://%s/%s", host, fileId) |
|||
}, |
|||
io.NewSectionReader(sourceFile, i*chunkSize, chunkSize), |
|||
) |
|||
if err != nil { |
|||
return fmt.Errorf("upload chunk %d: %v", i, err) |
|||
} |
|||
if uploadResult.Error != "" { |
|||
return fmt.Errorf("upload result: %v", uploadResult.Error) |
|||
} |
|||
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano())) |
|||
} |
|||
|
|||
// write the entry to partitionDir
|
|||
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ |
|||
Directory: partitionDir, |
|||
Entry: entry, |
|||
}) |
|||
}); err != nil { |
|||
return fmt.Errorf("create entry: %v", err) |
|||
} |
|||
fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName) |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error { |
|||
lookupFn := filer.LookupFn(filerClient) |
|||
_, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { |
|||
if err := eachLogEntryFn(logEntry); err != nil { |
|||
return true, err |
|||
} |
|||
return false, nil |
|||
}) |
|||
return err |
|||
} |
|||
|
|||
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { |
|||
if len(entry.Content) > 0 { |
|||
// skip .offset files
|
|||
return |
|||
} |
|||
var urlStrings []string |
|||
for _, chunk := range entry.Chunks { |
|||
if chunk.Size == 0 { |
|||
continue |
|||
} |
|||
if chunk.IsChunkManifest { |
|||
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name) |
|||
return |
|||
} |
|||
urlStrings, err = lookupFileIdFn(chunk.FileId) |
|||
if err != nil { |
|||
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) |
|||
return |
|||
} |
|||
if len(urlStrings) == 0 { |
|||
err = fmt.Errorf("no url found for %s", chunk.FileId) |
|||
return |
|||
} |
|||
|
|||
// try one of the urlString until util.Get(urlString) succeeds
|
|||
var processed bool |
|||
for _, urlString := range urlStrings { |
|||
var data []byte |
|||
if data, _, err = util_http.Get(urlString); err == nil { |
|||
processed = true |
|||
if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil { |
|||
return |
|||
} |
|||
break |
|||
} |
|||
} |
|||
if !processed { |
|||
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) |
|||
return |
|||
} |
|||
|
|||
} |
|||
return |
|||
} |
|||
|
|||
func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { |
|||
for pos := 0; pos+4 < len(buf); { |
|||
|
|||
size := util.BytesToUint32(buf[pos : pos+4]) |
|||
if pos+4+int(size) > len(buf) { |
|||
err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf)) |
|||
return |
|||
} |
|||
entryData := buf[pos+4 : pos+4+int(size)] |
|||
|
|||
logEntry := &filer_pb.LogEntry{} |
|||
if err = proto.Unmarshal(entryData, logEntry); err != nil { |
|||
pos += 4 + int(size) |
|||
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) |
|||
return |
|||
} |
|||
|
|||
if _, err = eachLogEntryFn(logEntry); err != nil { |
|||
err = fmt.Errorf("process log entry %v: %v", logEntry, err) |
|||
return |
|||
} |
|||
|
|||
processedTsNs = logEntry.TsNs |
|||
|
|||
pos += 4 + int(size) |
|||
|
|||
} |
|||
|
|||
return |
|||
} |
@ -0,0 +1,41 @@ |
|||
package logstore |
|||
|
|||
import ( |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|||
) |
|||
|
|||
func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { |
|||
fromParquetFn := GenParquetReadFunc(filerClient, t, p) |
|||
readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p) |
|||
return mergeReadFuncs(fromParquetFn, readLogDirectFn) |
|||
} |
|||
|
|||
func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { |
|||
var exhaustedParquet bool |
|||
var lastProcessedPosition log_buffer.MessagePosition |
|||
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { |
|||
if !exhaustedParquet { |
|||
// glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
|
|||
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) |
|||
// glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
|
|||
if isDone { |
|||
isDone = false |
|||
} |
|||
if err != nil { |
|||
return |
|||
} |
|||
lastProcessedPosition = lastReadPosition |
|||
} |
|||
exhaustedParquet = true |
|||
|
|||
if startPosition.Before(lastProcessedPosition.Time) { |
|||
startPosition = lastProcessedPosition |
|||
} |
|||
|
|||
// glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
|
|||
lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) |
|||
return |
|||
} |
|||
} |
@ -0,0 +1,144 @@ |
|||
package logstore |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|||
"google.golang.org/protobuf/proto" |
|||
"math" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { |
|||
partitionDir := topic.PartitionDir(t, p) |
|||
|
|||
lookupFileIdFn := filer.LookupFn(filerClient) |
|||
|
|||
eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { |
|||
for pos := 0; pos+4 < len(buf); { |
|||
|
|||
size := util.BytesToUint32(buf[pos : pos+4]) |
|||
if pos+4+int(size) > len(buf) { |
|||
err = fmt.Errorf("GenLogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf)) |
|||
return |
|||
} |
|||
entryData := buf[pos+4 : pos+4+int(size)] |
|||
|
|||
logEntry := &filer_pb.LogEntry{} |
|||
if err = proto.Unmarshal(entryData, logEntry); err != nil { |
|||
pos += 4 + int(size) |
|||
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) |
|||
return |
|||
} |
|||
if logEntry.TsNs < starTsNs { |
|||
pos += 4 + int(size) |
|||
continue |
|||
} |
|||
if stopTsNs != 0 && logEntry.TsNs > stopTsNs { |
|||
println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) |
|||
return |
|||
} |
|||
|
|||
// fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
|
|||
if _, err = eachLogEntryFn(logEntry); err != nil { |
|||
err = fmt.Errorf("process log entry %v: %v", logEntry, err) |
|||
return |
|||
} |
|||
|
|||
processedTsNs = logEntry.TsNs |
|||
|
|||
pos += 4 + int(size) |
|||
|
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { |
|||
if len(entry.Content) > 0 { |
|||
// skip .offset files
|
|||
return |
|||
} |
|||
var urlStrings []string |
|||
for _, chunk := range entry.Chunks { |
|||
if chunk.Size == 0 { |
|||
continue |
|||
} |
|||
if chunk.IsChunkManifest { |
|||
glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) |
|||
return |
|||
} |
|||
urlStrings, err = lookupFileIdFn(chunk.FileId) |
|||
if err != nil { |
|||
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) |
|||
return |
|||
} |
|||
if len(urlStrings) == 0 { |
|||
err = fmt.Errorf("no url found for %s", chunk.FileId) |
|||
return |
|||
} |
|||
|
|||
// try one of the urlString until util.Get(urlString) succeeds
|
|||
var processed bool |
|||
for _, urlString := range urlStrings { |
|||
// TODO optimization opportunity: reuse the buffer
|
|||
var data []byte |
|||
// fmt.Printf("reading %s/%s %s\n", partitionDir, entry.Name, urlString)
|
|||
if data, _, err = util_http.Get(urlString); err == nil { |
|||
processed = true |
|||
if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { |
|||
return |
|||
} |
|||
break |
|||
} |
|||
} |
|||
if !processed { |
|||
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) |
|||
return |
|||
} |
|||
|
|||
} |
|||
return |
|||
} |
|||
|
|||
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { |
|||
startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) |
|||
startTsNs := startPosition.Time.UnixNano() |
|||
stopTime := time.Unix(0, stopTsNs) |
|||
var processedTsNs int64 |
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if entry.IsDirectory { |
|||
return nil |
|||
} |
|||
if strings.HasSuffix(entry.Name, ".parquet") { |
|||
return nil |
|||
} |
|||
// FIXME: this is a hack to skip the .offset files
|
|||
if strings.HasSuffix(entry.Name, ".offset") { |
|||
return nil |
|||
} |
|||
if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { |
|||
isDone = true |
|||
return nil |
|||
} |
|||
if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) { |
|||
return nil |
|||
} |
|||
if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
|
|||
}, startFileName, true, math.MaxInt32) |
|||
}) |
|||
lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) |
|||
return |
|||
} |
|||
} |
@ -0,0 +1,162 @@ |
|||
package logstore |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"fmt" |
|||
"github.com/parquet-go/parquet-go" |
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" |
|||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|||
"google.golang.org/protobuf/proto" |
|||
"io" |
|||
"math" |
|||
"strings" |
|||
) |
|||
|
|||
var ( |
|||
chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry
|
|||
) |
|||
|
|||
func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { |
|||
partitionDir := topic.PartitionDir(t, p) |
|||
|
|||
lookupFileIdFn := filer.LookupFn(filerClient) |
|||
|
|||
// read topic conf from filer
|
|||
var topicConf *mq_pb.ConfigureTopicResponse |
|||
var err error |
|||
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
topicConf, err = t.ReadConfFile(client) |
|||
return err |
|||
}); err != nil { |
|||
return nil |
|||
} |
|||
recordType := topicConf.GetRecordType() |
|||
recordType = schema.NewRecordTypeBuilder(recordType). |
|||
WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). |
|||
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). |
|||
RecordTypeEnd() |
|||
|
|||
parquetSchema, err := schema.ToParquetSchema(t.Name, recordType) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
parquetLevels, err := schema.ToParquetLevels(recordType) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
|
|||
// eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry
|
|||
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { |
|||
// create readerAt for the parquet file
|
|||
fileSize := filer.FileSize(entry) |
|||
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) |
|||
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) |
|||
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) |
|||
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) |
|||
|
|||
// create parquet reader
|
|||
parquetReader := parquet.NewReader(readerAt, parquetSchema) |
|||
rows := make([]parquet.Row, 128) |
|||
for { |
|||
rowCount, readErr := parquetReader.ReadRows(rows) |
|||
|
|||
for i := 0; i < rowCount; i++ { |
|||
row := rows[i] |
|||
// convert parquet row to schema_pb.RecordValue
|
|||
recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row) |
|||
if err != nil { |
|||
return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err) |
|||
} |
|||
processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() |
|||
if processedTsNs < starTsNs { |
|||
continue |
|||
} |
|||
if stopTsNs != 0 && processedTsNs >= stopTsNs { |
|||
return processedTsNs, nil |
|||
} |
|||
|
|||
data, marshalErr := proto.Marshal(recordValue) |
|||
if marshalErr != nil { |
|||
return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr) |
|||
} |
|||
|
|||
logEntry := &filer_pb.LogEntry{ |
|||
Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(), |
|||
TsNs: processedTsNs, |
|||
Data: data, |
|||
} |
|||
|
|||
// fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
|
|||
|
|||
if _, err = eachLogEntryFn(logEntry); err != nil { |
|||
return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err) |
|||
} |
|||
} |
|||
|
|||
if readErr != nil { |
|||
if readErr == io.EOF { |
|||
return processedTsNs, nil |
|||
} |
|||
return processedTsNs, readErr |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { |
|||
startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) |
|||
startTsNs := startPosition.Time.UnixNano() |
|||
var processedTsNs int64 |
|||
|
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if entry.IsDirectory { |
|||
return nil |
|||
} |
|||
if !strings.HasSuffix(entry.Name, ".parquet") { |
|||
return nil |
|||
} |
|||
if len(entry.Extended) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
// read minTs from the parquet file
|
|||
minTsBytes := entry.Extended["min"] |
|||
if len(minTsBytes) != 8 { |
|||
return nil |
|||
} |
|||
minTsNs := int64(binary.BigEndian.Uint64(minTsBytes)) |
|||
|
|||
// read max ts
|
|||
maxTsBytes := entry.Extended["max"] |
|||
if len(maxTsBytes) != 8 { |
|||
return nil |
|||
} |
|||
maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes)) |
|||
|
|||
if stopTsNs != 0 && stopTsNs <= minTsNs { |
|||
isDone = true |
|||
return nil |
|||
} |
|||
|
|||
if maxTsNs < startTsNs { |
|||
return nil |
|||
} |
|||
|
|||
if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
|
|||
}, startFileName, true, math.MaxInt32) |
|||
}) |
|||
lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) |
|||
return |
|||
} |
|||
} |
1307
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
1766
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue