From f07482382b067b84e917e4d1c3581fe1373957de Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 21 May 2018 00:00:28 -0700 Subject: [PATCH] able to update file content having some issue when vi reports file changed. --- weed/filer2/embedded/embedded_store.go | 2 +- weed/filer2/filechunks.go | 52 +++++---- weed/filer2/filechunks_test.go | 33 ++++++ weed/filer2/filer.go | 4 +- weed/filer2/filer_structure.go | 2 +- weed/filer2/memdb/memdb_store.go | 5 +- weed/filesys/dir.go | 2 +- weed/filesys/file.go | 42 +++++-- weed/pb/filer.proto | 6 +- weed/pb/filer_pb/filer.pb.go | 154 ++++++++++++------------- weed/server/filer_grpc_server.go | 31 ++++- 11 files changed, 209 insertions(+), 124 deletions(-) diff --git a/weed/filer2/embedded/embedded_store.go b/weed/filer2/embedded/embedded_store.go index f871f18e2..d0cfac3f9 100644 --- a/weed/filer2/embedded/embedded_store.go +++ b/weed/filer2/embedded/embedded_store.go @@ -26,7 +26,7 @@ func (filer *EmbeddedStore) AddDirectoryLink(directory *filer2.Entry, delta int3 return nil } -func (filer *EmbeddedStore) AppendFileChunk(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) { +func (filer *EmbeddedStore) SetFileChunks(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) { return nil } diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 88a2660cb..714bdab9c 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -19,12 +19,27 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { } func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { + + visibles := nonOverlappingVisibleIntervals(chunks) + + fileIds := make(map[string]bool) + for _, interval := range visibles { + fileIds[interval.fileId] = true + } + for _, chunk := range chunks { + if found := fileIds[chunk.FileId]; found { + compacted = append(compacted, chunk) + } else { + garbage = append(garbage, chunk) + } + } + return } func logPrintf(name string, visibles []*visibleInterval) { - return + // return log.Printf("%s len %d", name, len(visibles)) for _, v := range visibles { @@ -52,7 +67,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v var minStopInterval, upToDateInterval *visibleInterval watermarkStart := chunks[0].Offset for _, chunk := range chunks { - // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) + log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) logPrintf("parallelIntervals", parallelIntervals) for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset { logPrintf("parallelIntervals loop 1", parallelIntervals) @@ -72,12 +87,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v var remaining []*visibleInterval for _, interval := range parallelIntervals { if interval.stop != watermarkStart { - remaining = append(remaining, newVisibleInterval( - interval.start, - interval.stop, - interval.fileId, - interval.modifiedTime, - )) + remaining = append(remaining, interval) } } parallelIntervals = remaining @@ -108,12 +118,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v var remaining []*visibleInterval for _, interval := range parallelIntervals { if interval.stop != watermarkStart { - remaining = append(remaining, newVisibleInterval( - interval.start, - interval.stop, - interval.fileId, - interval.modifiedTime, - )) + remaining = append(remaining, interval) } } parallelIntervals = remaining @@ -122,11 +127,12 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v logPrintf("intervals", intervals) // merge connected intervals, now the intervals are non-intersecting - var lastInterval *visibleInterval + var lastIntervalIndex int var prevIntervalIndex int for i, interval := range intervals { if i == 0 { prevIntervalIndex = i + lastIntervalIndex = i continue } if intervals[i-1].fileId != interval.fileId || @@ -139,18 +145,16 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v )) prevIntervalIndex = i } - lastInterval = intervals[i] + lastIntervalIndex = i logPrintf("intervals loop 1 visibles", visibles) } - if lastInterval != nil { - visibles = append(visibles, newVisibleInterval( - intervals[prevIntervalIndex].start, - lastInterval.stop, - intervals[prevIntervalIndex].fileId, - intervals[prevIntervalIndex].modifiedTime, - )) - } + visibles = append(visibles, newVisibleInterval( + intervals[prevIntervalIndex].start, + intervals[lastIntervalIndex].stop, + intervals[prevIntervalIndex].fileId, + intervals[prevIntervalIndex].modifiedTime, + )) logPrintf("visibles", visibles) diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index a1614641a..b87b61d3b 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -7,6 +7,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) +func TestCompactFileChunks(t *testing.T) { + chunks := []*filer_pb.FileChunk{ + {Offset:10, Size:100, FileId:"abc", Mtime:50}, + {Offset:100, Size:100, FileId:"def", Mtime:100}, + {Offset:200, Size:100, FileId:"ghi", Mtime:200}, + {Offset:110, Size:200, FileId:"jkl", Mtime:300}, + } + + compacted, garbarge := CompactFileChunks(chunks) + + log.Printf("Compacted: %+v", compacted) + log.Printf("Garbage : %+v", garbarge) + + if len(compacted) != 3 { + t.Fatalf("unexpected compacted: %d", len(compacted)) + } + if len(garbarge) != 1 { + t.Fatalf("unexpected garbarge: %d", len(garbarge)) + } + +} + func TestIntervalMerging(t *testing.T) { testcases := []struct { @@ -84,6 +106,17 @@ func TestIntervalMerging(t *testing.T) { {start: 200, stop: 220, fileId: "abc"}, }, }, + // case 6: same updates + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 100, fileId: "abc"}, + }, + }, } for i, testcase := range testcases { diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 2d5920cd8..fd27760bf 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -113,8 +113,8 @@ func (f *Filer) CreateEntry(entry *Entry) (error) { return nil } -func (f *Filer) AppendFileChunk(p FullPath, chunks []*filer_pb.FileChunk) (err error) { - return f.store.AppendFileChunk(p, chunks) +func (f *Filer) SetFileChunks(p FullPath, chunks []*filer_pb.FileChunk) (err error) { + return f.store.SetFileChunks(p, chunks) } func (f *Filer) FindEntry(p FullPath) (found bool, entry *Entry, err error) { diff --git a/weed/filer2/filer_structure.go b/weed/filer2/filer_structure.go index 1d2da752d..51de09718 100644 --- a/weed/filer2/filer_structure.go +++ b/weed/filer2/filer_structure.go @@ -68,7 +68,7 @@ var ErrNotFound = errors.New("filer: no entry is found in filer store") type FilerStore interface { InsertEntry(*Entry) (error) - AppendFileChunk(FullPath, []*filer_pb.FileChunk) (err error) + SetFileChunks(FullPath, []*filer_pb.FileChunk) (err error) FindEntry(FullPath) (found bool, entry *Entry, err error) DeleteEntry(FullPath) (fileEntry *Entry, err error) diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go index 7e886f4ef..1e888cc45 100644 --- a/weed/filer2/memdb/memdb_store.go +++ b/weed/filer2/memdb/memdb_store.go @@ -33,14 +33,13 @@ func (filer *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (filer *MemDbStore) AppendFileChunk(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) { +func (filer *MemDbStore) SetFileChunks(fullpath filer2.FullPath, fileChunks []*filer_pb.FileChunk) (err error) { found, entry, err := filer.FindEntry(fullpath) if !found { return fmt.Errorf("No such file: %s", fullpath) } - entry.Chunks = append(entry.Chunks, fileChunks...) + entry.Chunks = fileChunks entry.Mtime = time.Now() - println("appending to entry", entry.Name(), len(entry.Chunks)) filer.tree.ReplaceOrInsert(Entry{entry}) return nil } diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 732912a99..50b37f883 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -169,7 +169,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { } else { dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) - dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry.Attributes, 3*time.Second) + dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry.Attributes, 300*time.Millisecond) } } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 230ff272f..d4a9c8b05 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -13,6 +13,7 @@ import ( "time" "bytes" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/filer2" ) var _ = fs.Node(&File{}) @@ -24,6 +25,7 @@ var _ = fs.HandleReadAller(&File{}) var _ = fs.HandleFlusher(&File{}) var _ = fs.HandleWriter(&File{}) var _ = fs.HandleReleaser(&File{}) +var _ = fs.NodeSetattrer(&File{}) type File struct { Chunks []*filer_pb.FileChunk @@ -74,9 +76,32 @@ func (file *File) Attr(context context.Context, attr *fuse.Attr) error { } +func (file *File) xOpen(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { + fullPath := filepath.Join(file.dir.Path, file.Name) + + fmt.Printf("Open %v %+v\n", fullPath, req) + + return file, nil + +} + +func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { + fullPath := filepath.Join(file.dir.Path, file.Name) + + fmt.Printf("Setattr %v %+v\n", fullPath, req) + if req.Valid.Size() && req.Size == 0 { + fmt.Printf("truncate %v \n", fullPath) + file.Chunks = nil + resp.Attr.Size = 0 + } + + return nil + +} + func (file *File) ReadAll(ctx context.Context) (content []byte, err error) { - // fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name) + fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name) if len(file.Chunks) == 0 { glog.V(0).Infof("empty file %v/%v", file.dir.Path, file.Name) @@ -86,11 +111,14 @@ func (file *File) ReadAll(ctx context.Context) (content []byte, err error) { err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { // FIXME: need to either use Read() or implement differently + chunks, _ := filer2.CompactFileChunks(file.Chunks) + glog.V(1).Infof("read file %v/%v %d/%d chunks", file.dir.Path, file.Name, len(chunks), len(file.Chunks)) request := &filer_pb.GetFileContentRequest{ - FileId: file.Chunks[0].FileId, + FileId: chunks[0].FileId, } - glog.V(1).Infof("read file content: %v", request) + glog.V(1).Infof("read file content %d chunk %s [%d,%d): %v", len(chunks), + chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request) resp, err := client.GetFileContent(ctx, request) if err != nil { return err @@ -124,7 +152,7 @@ func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error { err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.AppendFileChunksRequest{ + request := &filer_pb.SetFileChunksRequest{ Directory: file.dir.Path, Entry: &filer_pb.Entry{ Name: file.Name, @@ -133,7 +161,7 @@ func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error { } glog.V(1).Infof("append chunks: %v", request) - if _, err := client.AppendFileChunks(ctx, request); err != nil { + if _, err := client.SetFileChunks(ctx, request); err != nil { return fmt.Errorf("create file: %v", err) } @@ -180,7 +208,7 @@ func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. return fmt.Errorf("upload result: %v", uploadResult.Error) } - glog.V(1).Infof("uploaded %s/%s to: %v", file.dir.Path, file.Name, fileUrl) + resp.Size = int(uploadResult.Size) file.Chunks = append(file.Chunks, &filer_pb.FileChunk{ FileId: fileId, @@ -189,7 +217,7 @@ func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. Mtime: time.Now().UnixNano(), }) - resp.Size = int(uploadResult.Size) + glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", file.dir.Path, file.Name, fileUrl, req.Offset, req.Offset+int64(resp.Size)) return nil } diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 461a76a58..36b633899 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -21,7 +21,7 @@ service SeaweedFiler { rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) { } - rpc AppendFileChunks (AppendFileChunksRequest) returns (AppendFileChunksResponse) { + rpc SetFileChunks (SetFileChunksRequest) returns (SetFileChunksResponse) { } rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { @@ -121,9 +121,9 @@ message AssignVolumeResponse { int32 count = 4; } -message AppendFileChunksRequest { +message SetFileChunksRequest { string directory = 1; Entry entry = 2; } -message AppendFileChunksResponse { +message SetFileChunksResponse { } diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index fc3bb65c6..6bcecb08c 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -26,8 +26,8 @@ It has these top-level messages: DeleteEntryResponse AssignVolumeRequest AssignVolumeResponse - AppendFileChunksRequest - AppendFileChunksResponse + SetFileChunksRequest + SetFileChunksResponse */ package filer_pb @@ -475,37 +475,37 @@ func (m *AssignVolumeResponse) GetCount() int32 { return 0 } -type AppendFileChunksRequest struct { +type SetFileChunksRequest struct { Directory string `protobuf:"bytes,1,opt,name=directory" json:"directory,omitempty"` Entry *Entry `protobuf:"bytes,2,opt,name=entry" json:"entry,omitempty"` } -func (m *AppendFileChunksRequest) Reset() { *m = AppendFileChunksRequest{} } -func (m *AppendFileChunksRequest) String() string { return proto.CompactTextString(m) } -func (*AppendFileChunksRequest) ProtoMessage() {} -func (*AppendFileChunksRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } +func (m *SetFileChunksRequest) Reset() { *m = SetFileChunksRequest{} } +func (m *SetFileChunksRequest) String() string { return proto.CompactTextString(m) } +func (*SetFileChunksRequest) ProtoMessage() {} +func (*SetFileChunksRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } -func (m *AppendFileChunksRequest) GetDirectory() string { +func (m *SetFileChunksRequest) GetDirectory() string { if m != nil { return m.Directory } return "" } -func (m *AppendFileChunksRequest) GetEntry() *Entry { +func (m *SetFileChunksRequest) GetEntry() *Entry { if m != nil { return m.Entry } return nil } -type AppendFileChunksResponse struct { +type SetFileChunksResponse struct { } -func (m *AppendFileChunksResponse) Reset() { *m = AppendFileChunksResponse{} } -func (m *AppendFileChunksResponse) String() string { return proto.CompactTextString(m) } -func (*AppendFileChunksResponse) ProtoMessage() {} -func (*AppendFileChunksResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } +func (m *SetFileChunksResponse) Reset() { *m = SetFileChunksResponse{} } +func (m *SetFileChunksResponse) String() string { return proto.CompactTextString(m) } +func (*SetFileChunksResponse) ProtoMessage() {} +func (*SetFileChunksResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } func init() { proto.RegisterType((*LookupDirectoryEntryRequest)(nil), "filer_pb.LookupDirectoryEntryRequest") @@ -525,8 +525,8 @@ func init() { proto.RegisterType((*DeleteEntryResponse)(nil), "filer_pb.DeleteEntryResponse") proto.RegisterType((*AssignVolumeRequest)(nil), "filer_pb.AssignVolumeRequest") proto.RegisterType((*AssignVolumeResponse)(nil), "filer_pb.AssignVolumeResponse") - proto.RegisterType((*AppendFileChunksRequest)(nil), "filer_pb.AppendFileChunksRequest") - proto.RegisterType((*AppendFileChunksResponse)(nil), "filer_pb.AppendFileChunksResponse") + proto.RegisterType((*SetFileChunksRequest)(nil), "filer_pb.SetFileChunksRequest") + proto.RegisterType((*SetFileChunksResponse)(nil), "filer_pb.SetFileChunksResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -545,7 +545,7 @@ type SeaweedFilerClient interface { GetFileAttributes(ctx context.Context, in *GetFileAttributesRequest, opts ...grpc.CallOption) (*GetFileAttributesResponse, error) GetFileContent(ctx context.Context, in *GetFileContentRequest, opts ...grpc.CallOption) (*GetFileContentResponse, error) CreateEntry(ctx context.Context, in *CreateEntryRequest, opts ...grpc.CallOption) (*CreateEntryResponse, error) - AppendFileChunks(ctx context.Context, in *AppendFileChunksRequest, opts ...grpc.CallOption) (*AppendFileChunksResponse, error) + SetFileChunks(ctx context.Context, in *SetFileChunksRequest, opts ...grpc.CallOption) (*SetFileChunksResponse, error) DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error) AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error) } @@ -603,9 +603,9 @@ func (c *seaweedFilerClient) CreateEntry(ctx context.Context, in *CreateEntryReq return out, nil } -func (c *seaweedFilerClient) AppendFileChunks(ctx context.Context, in *AppendFileChunksRequest, opts ...grpc.CallOption) (*AppendFileChunksResponse, error) { - out := new(AppendFileChunksResponse) - err := grpc.Invoke(ctx, "/filer_pb.SeaweedFiler/AppendFileChunks", in, out, c.cc, opts...) +func (c *seaweedFilerClient) SetFileChunks(ctx context.Context, in *SetFileChunksRequest, opts ...grpc.CallOption) (*SetFileChunksResponse, error) { + out := new(SetFileChunksResponse) + err := grpc.Invoke(ctx, "/filer_pb.SeaweedFiler/SetFileChunks", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -638,7 +638,7 @@ type SeaweedFilerServer interface { GetFileAttributes(context.Context, *GetFileAttributesRequest) (*GetFileAttributesResponse, error) GetFileContent(context.Context, *GetFileContentRequest) (*GetFileContentResponse, error) CreateEntry(context.Context, *CreateEntryRequest) (*CreateEntryResponse, error) - AppendFileChunks(context.Context, *AppendFileChunksRequest) (*AppendFileChunksResponse, error) + SetFileChunks(context.Context, *SetFileChunksRequest) (*SetFileChunksResponse, error) DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error) AssignVolume(context.Context, *AssignVolumeRequest) (*AssignVolumeResponse, error) } @@ -737,20 +737,20 @@ func _SeaweedFiler_CreateEntry_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _SeaweedFiler_AppendFileChunks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(AppendFileChunksRequest) +func _SeaweedFiler_SetFileChunks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SetFileChunksRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SeaweedFilerServer).AppendFileChunks(ctx, in) + return srv.(SeaweedFilerServer).SetFileChunks(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/filer_pb.SeaweedFiler/AppendFileChunks", + FullMethod: "/filer_pb.SeaweedFiler/SetFileChunks", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SeaweedFilerServer).AppendFileChunks(ctx, req.(*AppendFileChunksRequest)) + return srv.(SeaweedFilerServer).SetFileChunks(ctx, req.(*SetFileChunksRequest)) } return interceptor(ctx, in, info, handler) } @@ -816,8 +816,8 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{ Handler: _SeaweedFiler_CreateEntry_Handler, }, { - MethodName: "AppendFileChunks", - Handler: _SeaweedFiler_AppendFileChunks_Handler, + MethodName: "SetFileChunks", + Handler: _SeaweedFiler_SetFileChunks_Handler, }, { MethodName: "DeleteEntry", @@ -835,53 +835,53 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("filer.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 767 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x56, 0x6d, 0x4f, 0xdb, 0x48, - 0x10, 0xc6, 0x38, 0x09, 0x64, 0x12, 0x38, 0x6e, 0x13, 0xc0, 0x17, 0x5e, 0x2e, 0xec, 0x89, 0x13, - 0xa7, 0x93, 0xd0, 0x29, 0xf7, 0xa5, 0x1f, 0x8b, 0x80, 0x56, 0x95, 0xa8, 0x90, 0x8c, 0xa8, 0x54, - 0x55, 0x22, 0x4a, 0xec, 0x49, 0xba, 0xc2, 0xb1, 0x5d, 0x7b, 0xdd, 0x8a, 0x7e, 0xee, 0x5f, 0xe9, - 0x5f, 0xe9, 0xef, 0xaa, 0x76, 0xbd, 0xb1, 0xd7, 0xd8, 0x49, 0x8b, 0xd4, 0x6f, 0xde, 0x79, 0x79, - 0xe6, 0xd9, 0x99, 0x79, 0x36, 0x81, 0xd6, 0x84, 0x79, 0x18, 0x9d, 0x86, 0x51, 0xc0, 0x03, 0xb2, - 0x2e, 0x0f, 0xc3, 0x70, 0x4c, 0xaf, 0x61, 0xef, 0x2a, 0x08, 0xee, 0x93, 0xf0, 0x82, 0x45, 0xe8, - 0xf0, 0x20, 0x7a, 0xb8, 0xf4, 0x79, 0xf4, 0x60, 0xe3, 0x87, 0x04, 0x63, 0x4e, 0xf6, 0xa1, 0xe9, - 0xce, 0x1d, 0x96, 0xd1, 0x37, 0x4e, 0x9a, 0x76, 0x6e, 0x20, 0x04, 0x6a, 0xfe, 0x68, 0x86, 0xd6, - 0xaa, 0x74, 0xc8, 0x6f, 0x7a, 0x09, 0xfb, 0xd5, 0x80, 0x71, 0x18, 0xf8, 0x31, 0x92, 0x63, 0xa8, - 0xa3, 0x30, 0x48, 0xb4, 0xd6, 0xe0, 0xb7, 0xd3, 0x39, 0x95, 0xd3, 0x34, 0x2e, 0xf5, 0xd2, 0x01, - 0x90, 0x2b, 0x16, 0x73, 0x61, 0x63, 0x18, 0xff, 0x14, 0x1d, 0xfa, 0x1c, 0x3a, 0x85, 0x1c, 0x55, - 0xf1, 0x1f, 0x58, 0xc3, 0xd4, 0x64, 0x19, 0x7d, 0xb3, 0xaa, 0xe6, 0xdc, 0x4f, 0xbf, 0x1a, 0x50, - 0x97, 0xa6, 0xec, 0x6a, 0x46, 0x7e, 0x35, 0x72, 0x04, 0x6d, 0x16, 0x0f, 0x73, 0x02, 0xe2, 0xda, - 0xeb, 0x76, 0x8b, 0xc5, 0xd9, 0x55, 0xc9, 0xbf, 0xd0, 0x70, 0xde, 0x27, 0xfe, 0x7d, 0x6c, 0x99, - 0xb2, 0x54, 0x27, 0x2f, 0xf5, 0x82, 0x79, 0x78, 0x2e, 0x7c, 0xb6, 0x0a, 0x21, 0xcf, 0x00, 0x46, - 0x9c, 0x47, 0x6c, 0x9c, 0x70, 0x8c, 0xad, 0x9a, 0xec, 0x87, 0xa5, 0x25, 0x24, 0x31, 0x9e, 0x65, - 0x7e, 0x5b, 0x8b, 0xa5, 0x13, 0x68, 0x66, 0x70, 0x64, 0x17, 0xd6, 0x44, 0xce, 0x90, 0xb9, 0x8a, - 0x6d, 0x43, 0x1c, 0x5f, 0xb9, 0x64, 0x07, 0x1a, 0xc1, 0x64, 0x12, 0x23, 0x97, 0x4c, 0x4d, 0x5b, - 0x9d, 0xc4, 0xdd, 0x62, 0xf6, 0x19, 0x2d, 0xb3, 0x6f, 0x9c, 0xd4, 0x6c, 0xf9, 0x4d, 0xba, 0x50, - 0x9f, 0x71, 0x36, 0x43, 0x49, 0xc3, 0xb4, 0xd3, 0x03, 0xfd, 0x62, 0xc0, 0x66, 0x91, 0x06, 0xd9, - 0x83, 0xa6, 0xac, 0x26, 0x11, 0x0c, 0x89, 0x20, 0xb7, 0xe9, 0xa6, 0x80, 0xb2, 0xaa, 0xa1, 0x64, - 0x29, 0xb3, 0xc0, 0x4d, 0x8b, 0x6e, 0xa4, 0x29, 0xaf, 0x03, 0x17, 0xc9, 0x16, 0x98, 0x09, 0x73, - 0x65, 0xd9, 0x0d, 0x5b, 0x7c, 0x0a, 0xcb, 0x94, 0xb9, 0x56, 0x3d, 0xb5, 0x4c, 0x99, 0x4b, 0x27, - 0x60, 0xbd, 0x44, 0x2e, 0x6e, 0xac, 0xf5, 0x43, 0xad, 0x44, 0xd5, 0xa0, 0x0e, 0x00, 0xc2, 0x51, - 0x84, 0x3e, 0x17, 0xc3, 0x52, 0xdb, 0xd9, 0x4c, 0x2d, 0x17, 0x2c, 0xd2, 0x1b, 0x66, 0xea, 0x0d, - 0xa3, 0xb7, 0xf0, 0x47, 0x45, 0x1d, 0xb5, 0x46, 0xc5, 0x69, 0x19, 0x4f, 0x98, 0xd6, 0x7f, 0xb0, - 0xad, 0x60, 0xcf, 0x03, 0x9f, 0xa3, 0xcf, 0xe7, 0xdc, 0x17, 0x4d, 0x8e, 0x0e, 0x60, 0xe7, 0x71, - 0x86, 0x62, 0x61, 0xc1, 0x9a, 0x93, 0x9a, 0x64, 0x4a, 0xdb, 0x9e, 0x1f, 0xe9, 0x5b, 0x20, 0xe7, - 0x11, 0x8e, 0x38, 0x3e, 0x41, 0xc0, 0x99, 0x18, 0x57, 0x97, 0x8a, 0x71, 0x1b, 0x3a, 0x05, 0xe8, - 0x94, 0x0b, 0x65, 0x40, 0x2e, 0xd0, 0xc3, 0x27, 0x55, 0xac, 0x78, 0x32, 0x4a, 0xba, 0x32, 0x4b, - 0xba, 0x12, 0x0c, 0x0a, 0xa5, 0x14, 0x83, 0x19, 0x74, 0xce, 0xe2, 0x98, 0x4d, 0xfd, 0x37, 0x81, - 0x97, 0xcc, 0x70, 0x4e, 0xa1, 0x0b, 0x75, 0x27, 0x48, 0x54, 0x8b, 0xea, 0x76, 0x7a, 0x20, 0x87, - 0x00, 0x4e, 0xe0, 0x79, 0xe8, 0x70, 0x16, 0xf8, 0x8a, 0x80, 0x66, 0x21, 0x7d, 0x68, 0x45, 0x18, - 0x7a, 0xcc, 0x19, 0xc9, 0x80, 0x74, 0x35, 0x74, 0x13, 0xfd, 0x08, 0xdd, 0x62, 0x39, 0x35, 0x94, - 0x85, 0x0a, 0x14, 0xcb, 0x1d, 0x79, 0xaa, 0x96, 0xf8, 0x94, 0xab, 0x99, 0x8c, 0x3d, 0xe6, 0x0c, - 0x85, 0xc3, 0x54, 0xab, 0x29, 0x2d, 0xb7, 0x91, 0x97, 0x33, 0xaf, 0x69, 0xcc, 0xe9, 0x1d, 0xec, - 0x9e, 0x85, 0x21, 0xfa, 0x6e, 0x26, 0xfa, 0xf8, 0x97, 0xce, 0xb7, 0x07, 0x56, 0x19, 0x3f, 0xbd, - 0xdb, 0xe0, 0x5b, 0x1d, 0xda, 0x37, 0x38, 0xfa, 0x84, 0x28, 0xbd, 0x11, 0x99, 0x42, 0xb7, 0xea, - 0x81, 0x27, 0xc7, 0x39, 0xf8, 0x92, 0x5f, 0x94, 0xde, 0xdf, 0x3f, 0x0a, 0x53, 0xa3, 0x5d, 0x21, - 0x57, 0xd0, 0xd2, 0x9e, 0x73, 0xb2, 0xaf, 0x25, 0x96, 0x7e, 0x19, 0x7a, 0x07, 0x0b, 0xbc, 0x19, - 0xda, 0x1d, 0xfc, 0x5e, 0xd2, 0x36, 0xa1, 0x79, 0xd6, 0xa2, 0x07, 0xa6, 0xf7, 0xd7, 0xd2, 0x98, - 0x0c, 0xff, 0x16, 0x36, 0x8b, 0x92, 0x25, 0x7f, 0x96, 0x12, 0x8b, 0xf2, 0xef, 0xf5, 0x17, 0x07, - 0xe8, 0x4d, 0xd0, 0xa4, 0xa7, 0x37, 0xa1, 0x2c, 0x76, 0xbd, 0x09, 0x55, 0x7a, 0x5d, 0x21, 0xef, - 0x60, 0xeb, 0xf1, 0xa0, 0xc9, 0x51, 0x9e, 0xb4, 0x60, 0xc9, 0x7a, 0x74, 0x59, 0x88, 0x4e, 0x55, - 0xd3, 0xa8, 0x4e, 0xb5, 0xfc, 0x4a, 0xe8, 0x54, 0xab, 0x84, 0xbd, 0x42, 0xae, 0xa1, 0xad, 0x6b, - 0x8d, 0x68, 0x09, 0x15, 0x92, 0xef, 0x1d, 0x2e, 0x72, 0xcf, 0x01, 0xc7, 0x0d, 0xf9, 0xd7, 0xe7, - 0xff, 0xef, 0x01, 0x00, 0x00, 0xff, 0xff, 0x01, 0x52, 0xae, 0x82, 0x09, 0x09, 0x00, 0x00, + // 763 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x56, 0xdd, 0x6e, 0xd3, 0x4c, + 0x10, 0xad, 0xeb, 0x24, 0x6d, 0x26, 0x69, 0xbf, 0x8f, 0x4d, 0xda, 0x9a, 0xf4, 0x2f, 0x2c, 0x2a, + 0x2a, 0x42, 0xaa, 0x50, 0xb8, 0xe1, 0x92, 0xaa, 0x2d, 0x08, 0xa9, 0xa8, 0x92, 0xab, 0x22, 0x21, + 0x24, 0xa2, 0xc4, 0x9e, 0x84, 0x55, 0x1d, 0x3b, 0x78, 0xd7, 0xa0, 0x72, 0xcd, 0xab, 0xf0, 0x18, + 0xbc, 0x1b, 0xda, 0xf5, 0xc6, 0x5e, 0x37, 0x4e, 0xa1, 0x12, 0x77, 0xde, 0x99, 0x9d, 0x33, 0x67, + 0x7e, 0xce, 0x26, 0xd0, 0x18, 0xb1, 0x00, 0xe3, 0xa3, 0x69, 0x1c, 0x89, 0x88, 0xac, 0xaa, 0x43, + 0x7f, 0x3a, 0xa4, 0x17, 0xb0, 0x7d, 0x1e, 0x45, 0xd7, 0xc9, 0xf4, 0x94, 0xc5, 0xe8, 0x89, 0x28, + 0xbe, 0x39, 0x0b, 0x45, 0x7c, 0xe3, 0xe2, 0x97, 0x04, 0xb9, 0x20, 0x3b, 0x50, 0xf7, 0x67, 0x0e, + 0xc7, 0xea, 0x5a, 0x87, 0x75, 0x37, 0x37, 0x10, 0x02, 0x95, 0x70, 0x30, 0x41, 0x67, 0x59, 0x39, + 0xd4, 0x37, 0x3d, 0x83, 0x9d, 0x72, 0x40, 0x3e, 0x8d, 0x42, 0x8e, 0xe4, 0x00, 0xaa, 0x28, 0x0d, + 0x0a, 0xad, 0xd1, 0xfb, 0xef, 0x68, 0x46, 0xe5, 0x28, 0xbd, 0x97, 0x7a, 0x69, 0x0f, 0xc8, 0x39, + 0xe3, 0x42, 0xda, 0x18, 0xf2, 0xbf, 0xa2, 0x43, 0x5f, 0x41, 0xab, 0x10, 0xa3, 0x33, 0x3e, 0x85, + 0x15, 0x4c, 0x4d, 0x8e, 0xd5, 0xb5, 0xcb, 0x72, 0xce, 0xfc, 0xf4, 0xa7, 0x05, 0x55, 0x65, 0xca, + 0x4a, 0xb3, 0xf2, 0xd2, 0xc8, 0x23, 0x68, 0x32, 0xde, 0xcf, 0x09, 0xc8, 0xb2, 0x57, 0xdd, 0x06, + 0xe3, 0x59, 0xa9, 0xe4, 0x19, 0xd4, 0xbc, 0xcf, 0x49, 0x78, 0xcd, 0x1d, 0x5b, 0xa5, 0x6a, 0xe5, + 0xa9, 0x5e, 0xb3, 0x00, 0x4f, 0xa4, 0xcf, 0xd5, 0x57, 0xc8, 0x4b, 0x80, 0x81, 0x10, 0x31, 0x1b, + 0x26, 0x02, 0xb9, 0x53, 0x51, 0xfd, 0x70, 0x8c, 0x80, 0x84, 0xe3, 0x71, 0xe6, 0x77, 0x8d, 0xbb, + 0x74, 0x04, 0xf5, 0x0c, 0x8e, 0x6c, 0xc1, 0x8a, 0x8c, 0xe9, 0x33, 0x5f, 0xb3, 0xad, 0xc9, 0xe3, + 0x5b, 0x9f, 0x6c, 0x42, 0x2d, 0x1a, 0x8d, 0x38, 0x0a, 0xc5, 0xd4, 0x76, 0xf5, 0x49, 0xd6, 0xc6, + 0xd9, 0x77, 0x74, 0xec, 0xae, 0x75, 0x58, 0x71, 0xd5, 0x37, 0x69, 0x43, 0x75, 0x22, 0xd8, 0x04, + 0x15, 0x0d, 0xdb, 0x4d, 0x0f, 0xf4, 0x87, 0x05, 0xeb, 0x45, 0x1a, 0x64, 0x1b, 0xea, 0x2a, 0x9b, + 0x42, 0xb0, 0x14, 0x82, 0xda, 0xa6, 0xcb, 0x02, 0xca, 0xb2, 0x81, 0x92, 0x85, 0x4c, 0x22, 0x3f, + 0x4d, 0xba, 0x96, 0x86, 0xbc, 0x8b, 0x7c, 0x24, 0xff, 0x83, 0x9d, 0x30, 0x5f, 0xa5, 0x5d, 0x73, + 0xe5, 0xa7, 0xb4, 0x8c, 0x99, 0xef, 0x54, 0x53, 0xcb, 0x98, 0xf9, 0x74, 0x04, 0xce, 0x1b, 0x14, + 0xb2, 0x62, 0xa3, 0x1f, 0x7a, 0x25, 0xca, 0x06, 0xb5, 0x0b, 0x30, 0x1d, 0xc4, 0x18, 0x0a, 0x39, + 0x2c, 0xbd, 0x9d, 0xf5, 0xd4, 0x72, 0xca, 0x62, 0xb3, 0x61, 0xb6, 0xd9, 0x30, 0x7a, 0x05, 0x0f, + 0x4b, 0xf2, 0xe8, 0x35, 0x2a, 0x4e, 0xcb, 0xba, 0xc7, 0xb4, 0x9e, 0xc3, 0x86, 0x86, 0x3d, 0x89, + 0x42, 0x81, 0xa1, 0x98, 0x71, 0x5f, 0x34, 0x39, 0xda, 0x83, 0xcd, 0xdb, 0x11, 0x9a, 0x85, 0x03, + 0x2b, 0x5e, 0x6a, 0x52, 0x21, 0x4d, 0x77, 0x76, 0xa4, 0x1f, 0x80, 0x9c, 0xc4, 0x38, 0x10, 0x78, + 0x0f, 0x01, 0x67, 0x62, 0x5c, 0xbe, 0x53, 0x8c, 0x1b, 0xd0, 0x2a, 0x40, 0xa7, 0x5c, 0x28, 0x03, + 0x72, 0x8a, 0x01, 0xde, 0x2b, 0x63, 0xc9, 0x93, 0x31, 0xa7, 0x2b, 0x7b, 0x4e, 0x57, 0x92, 0x41, + 0x21, 0x95, 0x66, 0x30, 0x81, 0xd6, 0x31, 0xe7, 0x6c, 0x1c, 0xbe, 0x8f, 0x82, 0x64, 0x82, 0x33, + 0x0a, 0x6d, 0xa8, 0x7a, 0x51, 0xa2, 0x5b, 0x54, 0x75, 0xd3, 0x03, 0xd9, 0x03, 0xf0, 0xa2, 0x20, + 0x40, 0x4f, 0xb0, 0x28, 0xd4, 0x04, 0x0c, 0x0b, 0xe9, 0x42, 0x23, 0xc6, 0x69, 0xc0, 0xbc, 0x81, + 0xba, 0x90, 0xae, 0x86, 0x69, 0xa2, 0x5f, 0xa1, 0x5d, 0x4c, 0xa7, 0x87, 0xb2, 0x50, 0x81, 0x72, + 0xb9, 0xe3, 0x40, 0xe7, 0x92, 0x9f, 0x6a, 0x35, 0x93, 0x61, 0xc0, 0xbc, 0xbe, 0x74, 0xd8, 0x7a, + 0x35, 0x95, 0xe5, 0x2a, 0x0e, 0x72, 0xe6, 0x15, 0x83, 0x39, 0xfd, 0x08, 0xed, 0x4b, 0xbd, 0x0e, + 0xea, 0xe5, 0xf8, 0xa7, 0xc3, 0xdd, 0x82, 0x8d, 0x5b, 0xe0, 0x69, 0x55, 0xbd, 0x5f, 0x55, 0x68, + 0x5e, 0xe2, 0xe0, 0x1b, 0xa2, 0x2f, 0xbd, 0x31, 0x19, 0x43, 0xbb, 0xec, 0x69, 0x27, 0x07, 0x39, + 0xf2, 0x1d, 0xbf, 0x25, 0x9d, 0x27, 0x7f, 0xba, 0xa6, 0x87, 0xba, 0x44, 0xce, 0xa1, 0x61, 0x3c, + 0xe4, 0x64, 0xc7, 0x08, 0x9c, 0xfb, 0x4d, 0xe8, 0xec, 0x2e, 0xf0, 0x66, 0x68, 0x9f, 0xe0, 0xc1, + 0x9c, 0xaa, 0x09, 0xcd, 0xa3, 0x16, 0x3d, 0x2d, 0x9d, 0xc7, 0x77, 0xde, 0xc9, 0xf0, 0xaf, 0x60, + 0xbd, 0x28, 0x56, 0xb2, 0x3f, 0x17, 0x58, 0x14, 0x7e, 0xa7, 0xbb, 0xf8, 0x82, 0xd9, 0x04, 0x43, + 0x74, 0x66, 0x13, 0xe6, 0x65, 0x6e, 0x36, 0xa1, 0x4c, 0xa9, 0x4b, 0xc4, 0x85, 0xb5, 0xc2, 0x94, + 0xc9, 0x5e, 0x1e, 0x51, 0xb6, 0x5b, 0x9d, 0xfd, 0x85, 0x7e, 0x93, 0xa1, 0x21, 0x4a, 0x93, 0xe1, + 0xfc, 0xb3, 0x60, 0x32, 0x2c, 0x53, 0xf2, 0x12, 0xb9, 0x80, 0xa6, 0x29, 0x2e, 0x62, 0x04, 0x94, + 0x68, 0xbc, 0xb3, 0xb7, 0xc8, 0x3d, 0x03, 0x1c, 0xd6, 0xd4, 0x7f, 0x9d, 0x17, 0xbf, 0x03, 0x00, + 0x00, 0xff, 0xff, 0x2a, 0xc2, 0xff, 0xcb, 0xfa, 0x08, 0x00, 0x00, } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 6cc5021d6..a279b7dd7 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -116,13 +116,34 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr return &filer_pb.CreateEntryResponse{}, err } -func (fs *FilerServer) AppendFileChunks(ctx context.Context, req *filer_pb.AppendFileChunksRequest) (*filer_pb.AppendFileChunksResponse, error) { - err := fs.filer.AppendFileChunk( - filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)), - req.Entry.Chunks, +func (fs *FilerServer) SetFileChunks(ctx context.Context, req *filer_pb.SetFileChunksRequest) (*filer_pb.SetFileChunksResponse, error) { + + fullpath := filepath.Join(req.Directory, req.Entry.Name) + found, entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath)) + if err != nil { + return &filer_pb.SetFileChunksResponse{}, err + } + if !found { + return &filer_pb.SetFileChunksResponse{}, fmt.Errorf("file not found: %s", fullpath) + } + + chunks := append(entry.Chunks, req.Entry.Chunks...) + + chunks, garbages := filer2.CompactFileChunks(chunks) + + err = fs.filer.SetFileChunks( + filer2.FullPath(fullpath), + chunks, ) - return &filer_pb.AppendFileChunksResponse{}, err + if err == nil { + for _, garbage := range garbages { + glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) + operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId)) + } + } + + return &filer_pb.SetFileChunksResponse{}, err } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {