Browse Source

able to update file content

having some issue when vi reports file changed.
pull/664/head
Chris Lu 7 years ago
parent
commit
f07482382b
  1. 2
      weed/filer2/embedded/embedded_store.go
  2. 52
      weed/filer2/filechunks.go
  3. 33
      weed/filer2/filechunks_test.go
  4. 4
      weed/filer2/filer.go
  5. 2
      weed/filer2/filer_structure.go
  6. 5
      weed/filer2/memdb/memdb_store.go
  7. 2
      weed/filesys/dir.go
  8. 42
      weed/filesys/file.go
  9. 6
      weed/pb/filer.proto
  10. 154
      weed/pb/filer_pb/filer.pb.go
  11. 31
      weed/server/filer_grpc_server.go

2
weed/filer2/embedded/embedded_store.go

@ -26,7 +26,7 @@ func (filer *EmbeddedStore) AddDirectoryLink(directory *filer2.Entry, delta int3
return nil
}
func (filer *EmbeddedStore) AppendFileChunk(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) {
func (filer *EmbeddedStore) SetFileChunks(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) {
return nil
}

52
weed/filer2/filechunks.go

@ -19,12 +19,27 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
}
func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles := nonOverlappingVisibleIntervals(chunks)
fileIds := make(map[string]bool)
for _, interval := range visibles {
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
if found := fileIds[chunk.FileId]; found {
compacted = append(compacted, chunk)
} else {
garbage = append(garbage, chunk)
}
}
return
}
func logPrintf(name string, visibles []*visibleInterval) {
return
// return
log.Printf("%s len %d", name, len(visibles))
for _, v := range visibles {
@ -52,7 +67,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
var minStopInterval, upToDateInterval *visibleInterval
watermarkStart := chunks[0].Offset
for _, chunk := range chunks {
// log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
logPrintf("parallelIntervals", parallelIntervals)
for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
logPrintf("parallelIntervals loop 1", parallelIntervals)
@ -72,12 +87,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
var remaining []*visibleInterval
for _, interval := range parallelIntervals {
if interval.stop != watermarkStart {
remaining = append(remaining, newVisibleInterval(
interval.start,
interval.stop,
interval.fileId,
interval.modifiedTime,
))
remaining = append(remaining, interval)
}
}
parallelIntervals = remaining
@ -108,12 +118,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
var remaining []*visibleInterval
for _, interval := range parallelIntervals {
if interval.stop != watermarkStart {
remaining = append(remaining, newVisibleInterval(
interval.start,
interval.stop,
interval.fileId,
interval.modifiedTime,
))
remaining = append(remaining, interval)
}
}
parallelIntervals = remaining
@ -122,11 +127,12 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
logPrintf("intervals", intervals)
// merge connected intervals, now the intervals are non-intersecting
var lastInterval *visibleInterval
var lastIntervalIndex int
var prevIntervalIndex int
for i, interval := range intervals {
if i == 0 {
prevIntervalIndex = i
lastIntervalIndex = i
continue
}
if intervals[i-1].fileId != interval.fileId ||
@ -139,18 +145,16 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
))
prevIntervalIndex = i
}
lastInterval = intervals[i]
lastIntervalIndex = i
logPrintf("intervals loop 1 visibles", visibles)
}
if lastInterval != nil {
visibles = append(visibles, newVisibleInterval(
intervals[prevIntervalIndex].start,
lastInterval.stop,
intervals[prevIntervalIndex].fileId,
intervals[prevIntervalIndex].modifiedTime,
))
}
visibles = append(visibles, newVisibleInterval(
intervals[prevIntervalIndex].start,
intervals[lastIntervalIndex].stop,
intervals[prevIntervalIndex].fileId,
intervals[prevIntervalIndex].modifiedTime,
))
logPrintf("visibles", visibles)

33
weed/filer2/filechunks_test.go

@ -7,6 +7,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func TestCompactFileChunks(t *testing.T) {
chunks := []*filer_pb.FileChunk{
{Offset:10, Size:100, FileId:"abc", Mtime:50},
{Offset:100, Size:100, FileId:"def", Mtime:100},
{Offset:200, Size:100, FileId:"ghi", Mtime:200},
{Offset:110, Size:200, FileId:"jkl", Mtime:300},
}
compacted, garbarge := CompactFileChunks(chunks)
log.Printf("Compacted: %+v", compacted)
log.Printf("Garbage : %+v", garbarge)
if len(compacted) != 3 {
t.Fatalf("unexpected compacted: %d", len(compacted))
}
if len(garbarge) != 1 {
t.Fatalf("unexpected garbarge: %d", len(garbarge))
}
}
func TestIntervalMerging(t *testing.T) {
testcases := []struct {
@ -84,6 +106,17 @@ func TestIntervalMerging(t *testing.T) {
{start: 200, stop: 220, fileId: "abc"},
},
},
// case 6: same updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
{Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
{Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
},
Expected: []*visibleInterval{
{start: 0, stop: 100, fileId: "abc"},
},
},
}
for i, testcase := range testcases {

4
weed/filer2/filer.go

@ -113,8 +113,8 @@ func (f *Filer) CreateEntry(entry *Entry) (error) {
return nil
}
func (f *Filer) AppendFileChunk(p FullPath, chunks []*filer_pb.FileChunk) (err error) {
return f.store.AppendFileChunk(p, chunks)
func (f *Filer) SetFileChunks(p FullPath, chunks []*filer_pb.FileChunk) (err error) {
return f.store.SetFileChunks(p, chunks)
}
func (f *Filer) FindEntry(p FullPath) (found bool, entry *Entry, err error) {

2
weed/filer2/filer_structure.go

@ -68,7 +68,7 @@ var ErrNotFound = errors.New("filer: no entry is found in filer store")
type FilerStore interface {
InsertEntry(*Entry) (error)
AppendFileChunk(FullPath, []*filer_pb.FileChunk) (err error)
SetFileChunks(FullPath, []*filer_pb.FileChunk) (err error)
FindEntry(FullPath) (found bool, entry *Entry, err error)
DeleteEntry(FullPath) (fileEntry *Entry, err error)

5
weed/filer2/memdb/memdb_store.go

@ -33,14 +33,13 @@ func (filer *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
func (filer *MemDbStore) AppendFileChunk(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) {
func (filer *MemDbStore) SetFileChunks(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) {
found, entry, err := filer.FindEntry(fullpath)
if !found {
return fmt.Errorf("No such file: %s", fullpath)
}
entry.Chunks = append(entry.Chunks, fileChunks...)
entry.Chunks = fileChunks
entry.Mtime = time.Now()
println("appending to entry", entry.Name(), len(entry.Chunks))
filer.tree.ReplaceOrInsert(Entry{entry})
return nil
}

2
weed/filesys/dir.go

@ -169,7 +169,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
} else {
dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry.Attributes, 3*time.Second)
dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry.Attributes, 300*time.Millisecond)
}
}

42
weed/filesys/file.go

@ -13,6 +13,7 @@ import (
"time"
"bytes"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/filer2"
)
var _ = fs.Node(&File{})
@ -24,6 +25,7 @@ var _ = fs.HandleReadAller(&File{})
var _ = fs.HandleFlusher(&File{})
var _ = fs.HandleWriter(&File{})
var _ = fs.HandleReleaser(&File{})
var _ = fs.NodeSetattrer(&File{})
type File struct {
Chunks []*filer_pb.FileChunk
@ -74,9 +76,32 @@ func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
}
func (file *File) xOpen(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
fullPath := filepath.Join(file.dir.Path, file.Name)
fmt.Printf("Open %v %+v\n", fullPath, req)
return file, nil
}
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
fullPath := filepath.Join(file.dir.Path, file.Name)
fmt.Printf("Setattr %v %+v\n", fullPath, req)
if req.Valid.Size() && req.Size == 0 {
fmt.Printf("truncate %v \n", fullPath)
file.Chunks = nil
resp.Attr.Size = 0
}
return nil
}
func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
// fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name)
fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name)
if len(file.Chunks) == 0 {
glog.V(0).Infof("empty file %v/%v", file.dir.Path, file.Name)
@ -86,11 +111,14 @@ func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// FIXME: need to either use Read() or implement differently
chunks, _ := filer2.CompactFileChunks(file.Chunks)
glog.V(1).Infof("read file %v/%v %d/%d chunks", file.dir.Path, file.Name, len(chunks), len(file.Chunks))
request := &filer_pb.GetFileContentRequest{
FileId: file.Chunks[0].FileId,
FileId: chunks[0].FileId,
}
glog.V(1).Infof("read file content: %v", request)
glog.V(1).Infof("read file content %d chunk %s [%d,%d): %v", len(chunks),
chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request)
resp, err := client.GetFileContent(ctx, request)
if err != nil {
return err
@ -124,7 +152,7 @@ func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AppendFileChunksRequest{
request := &filer_pb.SetFileChunksRequest{
Directory: file.dir.Path,
Entry: &filer_pb.Entry{
Name: file.Name,
@ -133,7 +161,7 @@ func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
}
glog.V(1).Infof("append chunks: %v", request)
if _, err := client.AppendFileChunks(ctx, request); err != nil {
if _, err := client.SetFileChunks(ctx, request); err != nil {
return fmt.Errorf("create file: %v", err)
}
@ -180,7 +208,7 @@ func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.
return fmt.Errorf("upload result: %v", uploadResult.Error)
}
glog.V(1).Infof("uploaded %s/%s to: %v", file.dir.Path, file.Name, fileUrl)
resp.Size = int(uploadResult.Size)
file.Chunks = append(file.Chunks, &filer_pb.FileChunk{
FileId: fileId,
@ -189,7 +217,7 @@ func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.
Mtime: time.Now().UnixNano(),
})
resp.Size = int(uploadResult.Size)
glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", file.dir.Path, file.Name, fileUrl, req.Offset, req.Offset+int64(resp.Size))
return nil
}

6
weed/pb/filer.proto

@ -21,7 +21,7 @@ service SeaweedFiler {
rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) {
}
rpc AppendFileChunks (AppendFileChunksRequest) returns (AppendFileChunksResponse) {
rpc SetFileChunks (SetFileChunksRequest) returns (SetFileChunksResponse) {
}
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
@ -121,9 +121,9 @@ message AssignVolumeResponse {
int32 count = 4;
}
message AppendFileChunksRequest {
message SetFileChunksRequest {
string directory = 1;
Entry entry = 2;
}
message AppendFileChunksResponse {
message SetFileChunksResponse {
}

154
weed/pb/filer_pb/filer.pb.go

@ -26,8 +26,8 @@ It has these top-level messages:
DeleteEntryResponse
AssignVolumeRequest
AssignVolumeResponse
AppendFileChunksRequest
AppendFileChunksResponse
SetFileChunksRequest
SetFileChunksResponse
*/
package filer_pb
@ -475,37 +475,37 @@ func (m *AssignVolumeResponse) GetCount() int32 {
return 0
}
type AppendFileChunksRequest struct {
type SetFileChunksRequest struct {
Directory string `protobuf:"bytes,1,opt,name=directory" json:"directory,omitempty"`
Entry *Entry `protobuf:"bytes,2,opt,name=entry" json:"entry,omitempty"`
}
func (m *AppendFileChunksRequest) Reset() { *m = AppendFileChunksRequest{} }
func (m *AppendFileChunksRequest) String() string { return proto.CompactTextString(m) }
func (*AppendFileChunksRequest) ProtoMessage() {}
func (*AppendFileChunksRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} }
func (m *SetFileChunksRequest) Reset() { *m = SetFileChunksRequest{} }
func (m *SetFileChunksRequest) String() string { return proto.CompactTextString(m) }
func (*SetFileChunksRequest) ProtoMessage() {}
func (*SetFileChunksRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} }
func (m *AppendFileChunksRequest) GetDirectory() string {
func (m *SetFileChunksRequest) GetDirectory() string {
if m != nil {
return m.Directory
}
return ""
}
func (m *AppendFileChunksRequest) GetEntry() *Entry {
func (m *SetFileChunksRequest) GetEntry() *Entry {
if m != nil {
return m.Entry
}
return nil
}
type AppendFileChunksResponse struct {
type SetFileChunksResponse struct {
}
func (m *AppendFileChunksResponse) Reset() { *m = AppendFileChunksResponse{} }
func (m *AppendFileChunksResponse) String() string { return proto.CompactTextString(m) }
func (*AppendFileChunksResponse) ProtoMessage() {}
func (*AppendFileChunksResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
func (m *SetFileChunksResponse) Reset() { *m = SetFileChunksResponse{} }
func (m *SetFileChunksResponse) String() string { return proto.CompactTextString(m) }
func (*SetFileChunksResponse) ProtoMessage() {}
func (*SetFileChunksResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
func init() {
proto.RegisterType((*LookupDirectoryEntryRequest)(nil), "filer_pb.LookupDirectoryEntryRequest")
@ -525,8 +525,8 @@ func init() {
proto.RegisterType((*DeleteEntryResponse)(nil), "filer_pb.DeleteEntryResponse")
proto.RegisterType((*AssignVolumeRequest)(nil), "filer_pb.AssignVolumeRequest")
proto.RegisterType((*AssignVolumeResponse)(nil), "filer_pb.AssignVolumeResponse")
proto.RegisterType((*AppendFileChunksRequest)(nil), "filer_pb.AppendFileChunksRequest")
proto.RegisterType((*AppendFileChunksResponse)(nil), "filer_pb.AppendFileChunksResponse")
proto.RegisterType((*SetFileChunksRequest)(nil), "filer_pb.SetFileChunksRequest")
proto.RegisterType((*SetFileChunksResponse)(nil), "filer_pb.SetFileChunksResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -545,7 +545,7 @@ type SeaweedFilerClient interface {
GetFileAttributes(ctx context.Context, in *GetFileAttributesRequest, opts ...grpc.CallOption) (*GetFileAttributesResponse, error)
GetFileContent(ctx context.Context, in *GetFileContentRequest, opts ...grpc.CallOption) (*GetFileContentResponse, error)
CreateEntry(ctx context.Context, in *CreateEntryRequest, opts ...grpc.CallOption) (*CreateEntryResponse, error)
AppendFileChunks(ctx context.Context, in *AppendFileChunksRequest, opts ...grpc.CallOption) (*AppendFileChunksResponse, error)
SetFileChunks(ctx context.Context, in *SetFileChunksRequest, opts ...grpc.CallOption) (*SetFileChunksResponse, error)
DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error)
AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error)
}
@ -603,9 +603,9 @@ func (c *seaweedFilerClient) CreateEntry(ctx context.Context, in *CreateEntryReq
return out, nil
}
func (c *seaweedFilerClient) AppendFileChunks(ctx context.Context, in *AppendFileChunksRequest, opts ...grpc.CallOption) (*AppendFileChunksResponse, error) {
out := new(AppendFileChunksResponse)
err := grpc.Invoke(ctx, "/filer_pb.SeaweedFiler/AppendFileChunks", in, out, c.cc, opts...)
func (c *seaweedFilerClient) SetFileChunks(ctx context.Context, in *SetFileChunksRequest, opts ...grpc.CallOption) (*SetFileChunksResponse, error) {
out := new(SetFileChunksResponse)
err := grpc.Invoke(ctx, "/filer_pb.SeaweedFiler/SetFileChunks", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
@ -638,7 +638,7 @@ type SeaweedFilerServer interface {
GetFileAttributes(context.Context, *GetFileAttributesRequest) (*GetFileAttributesResponse, error)
GetFileContent(context.Context, *GetFileContentRequest) (*GetFileContentResponse, error)
CreateEntry(context.Context, *CreateEntryRequest) (*CreateEntryResponse, error)
AppendFileChunks(context.Context, *AppendFileChunksRequest) (*AppendFileChunksResponse, error)
SetFileChunks(context.Context, *SetFileChunksRequest) (*SetFileChunksResponse, error)
DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error)
AssignVolume(context.Context, *AssignVolumeRequest) (*AssignVolumeResponse, error)
}
@ -737,20 +737,20 @@ func _SeaweedFiler_CreateEntry_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_AppendFileChunks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppendFileChunksRequest)
func _SeaweedFiler_SetFileChunks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetFileChunksRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).AppendFileChunks(ctx, in)
return srv.(SeaweedFilerServer).SetFileChunks(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer_pb.SeaweedFiler/AppendFileChunks",
FullMethod: "/filer_pb.SeaweedFiler/SetFileChunks",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).AppendFileChunks(ctx, req.(*AppendFileChunksRequest))
return srv.(SeaweedFilerServer).SetFileChunks(ctx, req.(*SetFileChunksRequest))
}
return interceptor(ctx, in, info, handler)
}
@ -816,8 +816,8 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
Handler: _SeaweedFiler_CreateEntry_Handler,
},
{
MethodName: "AppendFileChunks",
Handler: _SeaweedFiler_AppendFileChunks_Handler,
MethodName: "SetFileChunks",
Handler: _SeaweedFiler_SetFileChunks_Handler,
},
{
MethodName: "DeleteEntry",
@ -835,53 +835,53 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("filer.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 767 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x56, 0x6d, 0x4f, 0xdb, 0x48,
0x10, 0xc6, 0x38, 0x09, 0x64, 0x12, 0x38, 0x6e, 0x13, 0xc0, 0x17, 0x5e, 0x2e, 0xec, 0x89, 0x13,
0xa7, 0x93, 0xd0, 0x29, 0xf7, 0xa5, 0x1f, 0x8b, 0x80, 0x56, 0x95, 0xa8, 0x90, 0x8c, 0xa8, 0x54,
0x55, 0x22, 0x4a, 0xec, 0x49, 0xba, 0xc2, 0xb1, 0x5d, 0x7b, 0xdd, 0x8a, 0x7e, 0xee, 0x5f, 0xe9,
0x5f, 0xe9, 0xef, 0xaa, 0x76, 0xbd, 0xb1, 0xd7, 0xd8, 0x49, 0x8b, 0xd4, 0x6f, 0xde, 0x79, 0x79,
0xe6, 0xd9, 0x99, 0x79, 0x36, 0x81, 0xd6, 0x84, 0x79, 0x18, 0x9d, 0x86, 0x51, 0xc0, 0x03, 0xb2,
0x2e, 0x0f, 0xc3, 0x70, 0x4c, 0xaf, 0x61, 0xef, 0x2a, 0x08, 0xee, 0x93, 0xf0, 0x82, 0x45, 0xe8,
0xf0, 0x20, 0x7a, 0xb8, 0xf4, 0x79, 0xf4, 0x60, 0xe3, 0x87, 0x04, 0x63, 0x4e, 0xf6, 0xa1, 0xe9,
0xce, 0x1d, 0x96, 0xd1, 0x37, 0x4e, 0x9a, 0x76, 0x6e, 0x20, 0x04, 0x6a, 0xfe, 0x68, 0x86, 0xd6,
0xaa, 0x74, 0xc8, 0x6f, 0x7a, 0x09, 0xfb, 0xd5, 0x80, 0x71, 0x18, 0xf8, 0x31, 0x92, 0x63, 0xa8,
0xa3, 0x30, 0x48, 0xb4, 0xd6, 0xe0, 0xb7, 0xd3, 0x39, 0x95, 0xd3, 0x34, 0x2e, 0xf5, 0xd2, 0x01,
0x90, 0x2b, 0x16, 0x73, 0x61, 0x63, 0x18, 0xff, 0x14, 0x1d, 0xfa, 0x1c, 0x3a, 0x85, 0x1c, 0x55,
0xf1, 0x1f, 0x58, 0xc3, 0xd4, 0x64, 0x19, 0x7d, 0xb3, 0xaa, 0xe6, 0xdc, 0x4f, 0xbf, 0x1a, 0x50,
0x97, 0xa6, 0xec, 0x6a, 0x46, 0x7e, 0x35, 0x72, 0x04, 0x6d, 0x16, 0x0f, 0x73, 0x02, 0xe2, 0xda,
0xeb, 0x76, 0x8b, 0xc5, 0xd9, 0x55, 0xc9, 0xbf, 0xd0, 0x70, 0xde, 0x27, 0xfe, 0x7d, 0x6c, 0x99,
0xb2, 0x54, 0x27, 0x2f, 0xf5, 0x82, 0x79, 0x78, 0x2e, 0x7c, 0xb6, 0x0a, 0x21, 0xcf, 0x00, 0x46,
0x9c, 0x47, 0x6c, 0x9c, 0x70, 0x8c, 0xad, 0x9a, 0xec, 0x87, 0xa5, 0x25, 0x24, 0x31, 0x9e, 0x65,
0x7e, 0x5b, 0x8b, 0xa5, 0x13, 0x68, 0x66, 0x70, 0x64, 0x17, 0xd6, 0x44, 0xce, 0x90, 0xb9, 0x8a,
0x6d, 0x43, 0x1c, 0x5f, 0xb9, 0x64, 0x07, 0x1a, 0xc1, 0x64, 0x12, 0x23, 0x97, 0x4c, 0x4d, 0x5b,
0x9d, 0xc4, 0xdd, 0x62, 0xf6, 0x19, 0x2d, 0xb3, 0x6f, 0x9c, 0xd4, 0x6c, 0xf9, 0x4d, 0xba, 0x50,
0x9f, 0x71, 0x36, 0x43, 0x49, 0xc3, 0xb4, 0xd3, 0x03, 0xfd, 0x62, 0xc0, 0x66, 0x91, 0x06, 0xd9,
0x83, 0xa6, 0xac, 0x26, 0x11, 0x0c, 0x89, 0x20, 0xb7, 0xe9, 0xa6, 0x80, 0xb2, 0xaa, 0xa1, 0x64,
0x29, 0xb3, 0xc0, 0x4d, 0x8b, 0x6e, 0xa4, 0x29, 0xaf, 0x03, 0x17, 0xc9, 0x16, 0x98, 0x09, 0x73,
0x65, 0xd9, 0x0d, 0x5b, 0x7c, 0x0a, 0xcb, 0x94, 0xb9, 0x56, 0x3d, 0xb5, 0x4c, 0x99, 0x4b, 0x27,
0x60, 0xbd, 0x44, 0x2e, 0x6e, 0xac, 0xf5, 0x43, 0xad, 0x44, 0xd5, 0xa0, 0x0e, 0x00, 0xc2, 0x51,
0x84, 0x3e, 0x17, 0xc3, 0x52, 0xdb, 0xd9, 0x4c, 0x2d, 0x17, 0x2c, 0xd2, 0x1b, 0x66, 0xea, 0x0d,
0xa3, 0xb7, 0xf0, 0x47, 0x45, 0x1d, 0xb5, 0x46, 0xc5, 0x69, 0x19, 0x4f, 0x98, 0xd6, 0x7f, 0xb0,
0xad, 0x60, 0xcf, 0x03, 0x9f, 0xa3, 0xcf, 0xe7, 0xdc, 0x17, 0x4d, 0x8e, 0x0e, 0x60, 0xe7, 0x71,
0x86, 0x62, 0x61, 0xc1, 0x9a, 0x93, 0x9a, 0x64, 0x4a, 0xdb, 0x9e, 0x1f, 0xe9, 0x5b, 0x20, 0xe7,
0x11, 0x8e, 0x38, 0x3e, 0x41, 0xc0, 0x99, 0x18, 0x57, 0x97, 0x8a, 0x71, 0x1b, 0x3a, 0x05, 0xe8,
0x94, 0x0b, 0x65, 0x40, 0x2e, 0xd0, 0xc3, 0x27, 0x55, 0xac, 0x78, 0x32, 0x4a, 0xba, 0x32, 0x4b,
0xba, 0x12, 0x0c, 0x0a, 0xa5, 0x14, 0x83, 0x19, 0x74, 0xce, 0xe2, 0x98, 0x4d, 0xfd, 0x37, 0x81,
0x97, 0xcc, 0x70, 0x4e, 0xa1, 0x0b, 0x75, 0x27, 0x48, 0x54, 0x8b, 0xea, 0x76, 0x7a, 0x20, 0x87,
0x00, 0x4e, 0xe0, 0x79, 0xe8, 0x70, 0x16, 0xf8, 0x8a, 0x80, 0x66, 0x21, 0x7d, 0x68, 0x45, 0x18,
0x7a, 0xcc, 0x19, 0xc9, 0x80, 0x74, 0x35, 0x74, 0x13, 0xfd, 0x08, 0xdd, 0x62, 0x39, 0x35, 0x94,
0x85, 0x0a, 0x14, 0xcb, 0x1d, 0x79, 0xaa, 0x96, 0xf8, 0x94, 0xab, 0x99, 0x8c, 0x3d, 0xe6, 0x0c,
0x85, 0xc3, 0x54, 0xab, 0x29, 0x2d, 0xb7, 0x91, 0x97, 0x33, 0xaf, 0x69, 0xcc, 0xe9, 0x1d, 0xec,
0x9e, 0x85, 0x21, 0xfa, 0x6e, 0x26, 0xfa, 0xf8, 0x97, 0xce, 0xb7, 0x07, 0x56, 0x19, 0x3f, 0xbd,
0xdb, 0xe0, 0x5b, 0x1d, 0xda, 0x37, 0x38, 0xfa, 0x84, 0x28, 0xbd, 0x11, 0x99, 0x42, 0xb7, 0xea,
0x81, 0x27, 0xc7, 0x39, 0xf8, 0x92, 0x5f, 0x94, 0xde, 0xdf, 0x3f, 0x0a, 0x53, 0xa3, 0x5d, 0x21,
0x57, 0xd0, 0xd2, 0x9e, 0x73, 0xb2, 0xaf, 0x25, 0x96, 0x7e, 0x19, 0x7a, 0x07, 0x0b, 0xbc, 0x19,
0xda, 0x1d, 0xfc, 0x5e, 0xd2, 0x36, 0xa1, 0x79, 0xd6, 0xa2, 0x07, 0xa6, 0xf7, 0xd7, 0xd2, 0x98,
0x0c, 0xff, 0x16, 0x36, 0x8b, 0x92, 0x25, 0x7f, 0x96, 0x12, 0x8b, 0xf2, 0xef, 0xf5, 0x17, 0x07,
0xe8, 0x4d, 0xd0, 0xa4, 0xa7, 0x37, 0xa1, 0x2c, 0x76, 0xbd, 0x09, 0x55, 0x7a, 0x5d, 0x21, 0xef,
0x60, 0xeb, 0xf1, 0xa0, 0xc9, 0x51, 0x9e, 0xb4, 0x60, 0xc9, 0x7a, 0x74, 0x59, 0x88, 0x4e, 0x55,
0xd3, 0xa8, 0x4e, 0xb5, 0xfc, 0x4a, 0xe8, 0x54, 0xab, 0x84, 0xbd, 0x42, 0xae, 0xa1, 0xad, 0x6b,
0x8d, 0x68, 0x09, 0x15, 0x92, 0xef, 0x1d, 0x2e, 0x72, 0xcf, 0x01, 0xc7, 0x0d, 0xf9, 0xd7, 0xe7,
0xff, 0xef, 0x01, 0x00, 0x00, 0xff, 0xff, 0x01, 0x52, 0xae, 0x82, 0x09, 0x09, 0x00, 0x00,
// 763 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x56, 0xdd, 0x6e, 0xd3, 0x4c,
0x10, 0xad, 0xeb, 0x24, 0x6d, 0x26, 0x69, 0xbf, 0x8f, 0x4d, 0xda, 0x9a, 0xf4, 0x2f, 0x2c, 0x2a,
0x2a, 0x42, 0xaa, 0x50, 0xb8, 0xe1, 0x92, 0xaa, 0x2d, 0x08, 0xa9, 0xa8, 0x92, 0xab, 0x22, 0x21,
0x24, 0xa2, 0xc4, 0x9e, 0x84, 0x55, 0x1d, 0x3b, 0x78, 0xd7, 0xa0, 0x72, 0xcd, 0xab, 0xf0, 0x18,
0xbc, 0x1b, 0xda, 0xf5, 0xc6, 0x5e, 0x37, 0x4e, 0xa1, 0x12, 0x77, 0xde, 0x99, 0x9d, 0x33, 0x67,
0x7e, 0xce, 0x26, 0xd0, 0x18, 0xb1, 0x00, 0xe3, 0xa3, 0x69, 0x1c, 0x89, 0x88, 0xac, 0xaa, 0x43,
0x7f, 0x3a, 0xa4, 0x17, 0xb0, 0x7d, 0x1e, 0x45, 0xd7, 0xc9, 0xf4, 0x94, 0xc5, 0xe8, 0x89, 0x28,
0xbe, 0x39, 0x0b, 0x45, 0x7c, 0xe3, 0xe2, 0x97, 0x04, 0xb9, 0x20, 0x3b, 0x50, 0xf7, 0x67, 0x0e,
0xc7, 0xea, 0x5a, 0x87, 0x75, 0x37, 0x37, 0x10, 0x02, 0x95, 0x70, 0x30, 0x41, 0x67, 0x59, 0x39,
0xd4, 0x37, 0x3d, 0x83, 0x9d, 0x72, 0x40, 0x3e, 0x8d, 0x42, 0x8e, 0xe4, 0x00, 0xaa, 0x28, 0x0d,
0x0a, 0xad, 0xd1, 0xfb, 0xef, 0x68, 0x46, 0xe5, 0x28, 0xbd, 0x97, 0x7a, 0x69, 0x0f, 0xc8, 0x39,
0xe3, 0x42, 0xda, 0x18, 0xf2, 0xbf, 0xa2, 0x43, 0x5f, 0x41, 0xab, 0x10, 0xa3, 0x33, 0x3e, 0x85,
0x15, 0x4c, 0x4d, 0x8e, 0xd5, 0xb5, 0xcb, 0x72, 0xce, 0xfc, 0xf4, 0xa7, 0x05, 0x55, 0x65, 0xca,
0x4a, 0xb3, 0xf2, 0xd2, 0xc8, 0x23, 0x68, 0x32, 0xde, 0xcf, 0x09, 0xc8, 0xb2, 0x57, 0xdd, 0x06,
0xe3, 0x59, 0xa9, 0xe4, 0x19, 0xd4, 0xbc, 0xcf, 0x49, 0x78, 0xcd, 0x1d, 0x5b, 0xa5, 0x6a, 0xe5,
0xa9, 0x5e, 0xb3, 0x00, 0x4f, 0xa4, 0xcf, 0xd5, 0x57, 0xc8, 0x4b, 0x80, 0x81, 0x10, 0x31, 0x1b,
0x26, 0x02, 0xb9, 0x53, 0x51, 0xfd, 0x70, 0x8c, 0x80, 0x84, 0xe3, 0x71, 0xe6, 0x77, 0x8d, 0xbb,
0x74, 0x04, 0xf5, 0x0c, 0x8e, 0x6c, 0xc1, 0x8a, 0x8c, 0xe9, 0x33, 0x5f, 0xb3, 0xad, 0xc9, 0xe3,
0x5b, 0x9f, 0x6c, 0x42, 0x2d, 0x1a, 0x8d, 0x38, 0x0a, 0xc5, 0xd4, 0x76, 0xf5, 0x49, 0xd6, 0xc6,
0xd9, 0x77, 0x74, 0xec, 0xae, 0x75, 0x58, 0x71, 0xd5, 0x37, 0x69, 0x43, 0x75, 0x22, 0xd8, 0x04,
0x15, 0x0d, 0xdb, 0x4d, 0x0f, 0xf4, 0x87, 0x05, 0xeb, 0x45, 0x1a, 0x64, 0x1b, 0xea, 0x2a, 0x9b,
0x42, 0xb0, 0x14, 0x82, 0xda, 0xa6, 0xcb, 0x02, 0xca, 0xb2, 0x81, 0x92, 0x85, 0x4c, 0x22, 0x3f,
0x4d, 0xba, 0x96, 0x86, 0xbc, 0x8b, 0x7c, 0x24, 0xff, 0x83, 0x9d, 0x30, 0x5f, 0xa5, 0x5d, 0x73,
0xe5, 0xa7, 0xb4, 0x8c, 0x99, 0xef, 0x54, 0x53, 0xcb, 0x98, 0xf9, 0x74, 0x04, 0xce, 0x1b, 0x14,
0xb2, 0x62, 0xa3, 0x1f, 0x7a, 0x25, 0xca, 0x06, 0xb5, 0x0b, 0x30, 0x1d, 0xc4, 0x18, 0x0a, 0x39,
0x2c, 0xbd, 0x9d, 0xf5, 0xd4, 0x72, 0xca, 0x62, 0xb3, 0x61, 0xb6, 0xd9, 0x30, 0x7a, 0x05, 0x0f,
0x4b, 0xf2, 0xe8, 0x35, 0x2a, 0x4e, 0xcb, 0xba, 0xc7, 0xb4, 0x9e, 0xc3, 0x86, 0x86, 0x3d, 0x89,
0x42, 0x81, 0xa1, 0x98, 0x71, 0x5f, 0x34, 0x39, 0xda, 0x83, 0xcd, 0xdb, 0x11, 0x9a, 0x85, 0x03,
0x2b, 0x5e, 0x6a, 0x52, 0x21, 0x4d, 0x77, 0x76, 0xa4, 0x1f, 0x80, 0x9c, 0xc4, 0x38, 0x10, 0x78,
0x0f, 0x01, 0x67, 0x62, 0x5c, 0xbe, 0x53, 0x8c, 0x1b, 0xd0, 0x2a, 0x40, 0xa7, 0x5c, 0x28, 0x03,
0x72, 0x8a, 0x01, 0xde, 0x2b, 0x63, 0xc9, 0x93, 0x31, 0xa7, 0x2b, 0x7b, 0x4e, 0x57, 0x92, 0x41,
0x21, 0x95, 0x66, 0x30, 0x81, 0xd6, 0x31, 0xe7, 0x6c, 0x1c, 0xbe, 0x8f, 0x82, 0x64, 0x82, 0x33,
0x0a, 0x6d, 0xa8, 0x7a, 0x51, 0xa2, 0x5b, 0x54, 0x75, 0xd3, 0x03, 0xd9, 0x03, 0xf0, 0xa2, 0x20,
0x40, 0x4f, 0xb0, 0x28, 0xd4, 0x04, 0x0c, 0x0b, 0xe9, 0x42, 0x23, 0xc6, 0x69, 0xc0, 0xbc, 0x81,
0xba, 0x90, 0xae, 0x86, 0x69, 0xa2, 0x5f, 0xa1, 0x5d, 0x4c, 0xa7, 0x87, 0xb2, 0x50, 0x81, 0x72,
0xb9, 0xe3, 0x40, 0xe7, 0x92, 0x9f, 0x6a, 0x35, 0x93, 0x61, 0xc0, 0xbc, 0xbe, 0x74, 0xd8, 0x7a,
0x35, 0x95, 0xe5, 0x2a, 0x0e, 0x72, 0xe6, 0x15, 0x83, 0x39, 0xfd, 0x08, 0xed, 0x4b, 0xbd, 0x0e,
0xea, 0xe5, 0xf8, 0xa7, 0xc3, 0xdd, 0x82, 0x8d, 0x5b, 0xe0, 0x69, 0x55, 0xbd, 0x5f, 0x55, 0x68,
0x5e, 0xe2, 0xe0, 0x1b, 0xa2, 0x2f, 0xbd, 0x31, 0x19, 0x43, 0xbb, 0xec, 0x69, 0x27, 0x07, 0x39,
0xf2, 0x1d, 0xbf, 0x25, 0x9d, 0x27, 0x7f, 0xba, 0xa6, 0x87, 0xba, 0x44, 0xce, 0xa1, 0x61, 0x3c,
0xe4, 0x64, 0xc7, 0x08, 0x9c, 0xfb, 0x4d, 0xe8, 0xec, 0x2e, 0xf0, 0x66, 0x68, 0x9f, 0xe0, 0xc1,
0x9c, 0xaa, 0x09, 0xcd, 0xa3, 0x16, 0x3d, 0x2d, 0x9d, 0xc7, 0x77, 0xde, 0xc9, 0xf0, 0xaf, 0x60,
0xbd, 0x28, 0x56, 0xb2, 0x3f, 0x17, 0x58, 0x14, 0x7e, 0xa7, 0xbb, 0xf8, 0x82, 0xd9, 0x04, 0x43,
0x74, 0x66, 0x13, 0xe6, 0x65, 0x6e, 0x36, 0xa1, 0x4c, 0xa9, 0x4b, 0xc4, 0x85, 0xb5, 0xc2, 0x94,
0xc9, 0x5e, 0x1e, 0x51, 0xb6, 0x5b, 0x9d, 0xfd, 0x85, 0x7e, 0x93, 0xa1, 0x21, 0x4a, 0x93, 0xe1,
0xfc, 0xb3, 0x60, 0x32, 0x2c, 0x53, 0xf2, 0x12, 0xb9, 0x80, 0xa6, 0x29, 0x2e, 0x62, 0x04, 0x94,
0x68, 0xbc, 0xb3, 0xb7, 0xc8, 0x3d, 0x03, 0x1c, 0xd6, 0xd4, 0x7f, 0x9d, 0x17, 0xbf, 0x03, 0x00,
0x00, 0xff, 0xff, 0x2a, 0xc2, 0xff, 0xcb, 0xfa, 0x08, 0x00, 0x00,
}

31
weed/server/filer_grpc_server.go

@ -116,13 +116,34 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
return &filer_pb.CreateEntryResponse{}, err
}
func (fs *FilerServer) AppendFileChunks(ctx context.Context, req *filer_pb.AppendFileChunksRequest) (*filer_pb.AppendFileChunksResponse, error) {
err := fs.filer.AppendFileChunk(
filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
req.Entry.Chunks,
func (fs *FilerServer) SetFileChunks(ctx context.Context, req *filer_pb.SetFileChunksRequest) (*filer_pb.SetFileChunksResponse, error) {
fullpath := filepath.Join(req.Directory, req.Entry.Name)
found, entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
if err != nil {
return &filer_pb.SetFileChunksResponse{}, err
}
if !found {
return &filer_pb.SetFileChunksResponse{}, fmt.Errorf("file not found: %s", fullpath)
}
chunks := append(entry.Chunks, req.Entry.Chunks...)
chunks, garbages := filer2.CompactFileChunks(chunks)
err = fs.filer.SetFileChunks(
filer2.FullPath(fullpath),
chunks,
)
return &filer_pb.AppendFileChunksResponse{}, err
if err == nil {
for _, garbage := range garbages {
glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId))
}
}
return &filer_pb.SetFileChunksResponse{}, err
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {

Loading…
Cancel
Save