Browse Source
Include meta in ReadAllNeedles (#3991 )
This is useful for doing backups on the data so we can accurately store the
last modified time, the compression state, and verify the crc.
Previously we were doing VolumeNeedleStatus and then an HTTP request which
needlessly read from the dat file twice.
pull/3995/head
James Hartig
2 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with
736 additions and
698 deletions
unmaintained/stream_read_volume/stream_read_volume.go
weed/pb/volume_server.proto
weed/pb/volume_server_pb/volume_server.pb.go
weed/storage/volume_read_all.go
@ -5,13 +5,14 @@ import (
"errors"
"errors"
"flag"
"flag"
"fmt"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc"
"io"
)
)
var (
var (
@ -29,7 +30,7 @@ func main() {
vid := uint32 ( * volumeId )
vid := uint32 ( * volumeId )
eachNeedleFunc := func ( resp * volume_server_pb . ReadAllNeedlesResponse ) error {
eachNeedleFunc := func ( resp * volume_server_pb . ReadAllNeedlesResponse ) error {
fmt . Printf ( "%d,%x%08x %d\n" , resp . VolumeId , resp . NeedleId , resp . Cookie , len ( resp . NeedleBlob ) )
fmt . Printf ( "%d,%x%08x %d %v %d %x \n" , resp . VolumeId , resp . NeedleId , resp . Cookie , len ( resp . NeedleBlob ) , resp . NeedleBlobCompressed , resp . LastModified , resp . Crc )
return nil
return nil
}
}
@ -317,6 +317,9 @@ message ReadAllNeedlesResponse {
uint64 needle_id = 2 ;
uint64 needle_id = 2 ;
uint32 cookie = 3 ;
uint32 cookie = 3 ;
bytes needle_blob = 5 ;
bytes needle_blob = 5 ;
bool needle_blob_compressed = 6 ;
uint64 last_modified = 7 ;
uint32 crc = 8 ;
}
}
message VolumeTailSenderRequest {
message VolumeTailSenderRequest {
@ -30,10 +30,13 @@ func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset i
}
}
sendErr := scanner . Stream . Send ( & volume_server_pb . ReadAllNeedlesResponse {
sendErr := scanner . Stream . Send ( & volume_server_pb . ReadAllNeedlesResponse {
VolumeId : uint32 ( scanner . V . Id ) ,
NeedleId : uint64 ( n . Id ) ,
Cookie : uint32 ( n . Cookie ) ,
NeedleBlob : n . Data ,
VolumeId : uint32 ( scanner . V . Id ) ,
NeedleId : uint64 ( n . Id ) ,
Cookie : uint32 ( n . Cookie ) ,
NeedleBlob : n . Data ,
NeedleBlobCompressed : n . IsCompressed ( ) ,
LastModified : n . LastModified ,
Crc : n . Checksum . Value ( ) ,
} )
} )
if sendErr != nil {
if sendErr != nil {
return sendErr
return sendErr