diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go index 4b37a64fb..bbb6f6d9a 100644 --- a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go +++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go @@ -40,7 +40,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { if scanner.dat == nil { newDatFile, err := os.Create(filepath.Join(*volumePath, "dat_fixed")) diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index e07704fc6..84a06c625 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -28,7 +28,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t) return nil diff --git a/weed/command/export.go b/weed/command/export.go index 7e94ec11c..d3a765e09 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -102,7 +102,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { needleMap := scanner.needleMap vid := scanner.vid diff --git a/weed/command/fix.go b/weed/command/fix.go index bf33490cc..2fbbca5e6 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -43,7 +43,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 34c55a599..1e0c91274 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe return lastTimestampNs, sendErr } - err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - - isLastChunk := false - - // need to send body by chunks - for i := 0; i < len(needleBody); i += BufferSizeLimit { - stopOffset := i + BufferSizeLimit - if stopOffset >= len(needleBody) { - isLastChunk = true - stopOffset = len(needleBody) - } - - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ - NeedleHeader: needleHeader, - NeedleBody: needleBody[i:stopOffset], - IsLastChunk: isLastChunk, - }) - if sendErr != nil { - return sendErr - } - } - - lastProcessedTimestampNs = needleAppendAtNs - return nil + scanner := &VolumeFileScanner4Tailing{ + stream:stream, + } - }) + err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner) - return + return scanner.lastProcessedTimestampNs, err } @@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv }) } + +// generate the volume idx +type VolumeFileScanner4Tailing struct { + stream volume_server_pb.VolumeServer_VolumeTailSenderServer + lastProcessedTimestampNs uint64 +} + +func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error { + return nil + +} +func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + scanner.lastProcessedTimestampNs = n.AppendAtNs + return nil +} diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 86d13da7a..f48ccbb68 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -251,7 +251,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { if n.Size > 0 && n.Size != TombstoneFileSize { return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 767a318c8..7a216b77e 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -179,7 +179,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) { type VolumeFileScanner interface { VisitSuperBlock(SuperBlock) error ReadNeedleBody() bool - VisitNeedle(n *needle.Needle, offset int64) error + VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error } func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, @@ -202,7 +202,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, } func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, _, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) + n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) if e != nil { if e == io.EOF { return nil @@ -210,14 +210,15 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e) } for n != nil { + var needleBody []byte if volumeFileScanner.ReadNeedleBody() { - if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil { + if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil { glog.V(0).Infof("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err) //return } } - err := volumeFileScanner.VisitNeedle(n, offset) + err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) if err == io.EOF { return nil } @@ -237,36 +238,3 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } return nil } - -func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) - if e != nil { - if e == io.EOF { - return nil - } - return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e) - } - for n != nil { - var needleBody []byte - if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil { - glog.V(0).Infof("cannot read needle body: %v", err) - //err = fmt.Errorf("cannot read needle body: %v", err) - //return - } - err = fn(nh, needleBody, n.AppendAtNs) - if err != nil { - glog.V(0).Infof("visit needle error: %v", err) - return - } - offset += NeedleHeaderSize + rest - glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) - } - glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) - } - return nil -} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 522d227c0..73314f022 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -286,7 +286,7 @@ func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { if n.HasTtl() && scanner.now >= n.LastModified+uint64(scanner.v.Ttl.Minutes()*60) { return nil }