diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 6c3157e6c..b5876df82 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -40,7 +40,7 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file fileIds[interval.fileId] = true } for _, chunk := range chunks { - if found := fileIds[chunk.FileId]; found { + if _, found := fileIds[chunk.GetFileIdString()]; found { compacted = append(compacted, chunk) } else { garbage = append(garbage, chunk) @@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) { +func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) - for _, interval := range newChunks { - fileIds[interval.FileId] = true + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true } - for _, chunk := range oldChunks { - if found := fileIds[chunk.FileId]; !found { - unused = append(unused, chunk) + for _, chunk := range as { + if _, found := fileIds[chunk.GetFileIdString()]; !found { + delta = append(delta, chunk) } } @@ -123,7 +123,7 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. newV := newVisibleInterval( chunk.Offset, chunk.Offset+int64(chunk.Size), - chunk.FileId, + chunk.GetFileIdString(), chunk.Mtime, true, ) diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index f8b09adf6..fea93d57f 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -54,7 +54,7 @@ func (f *Filer) loopProcessingDeletion() { func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String()) - f.fileIdDeletionChan <- chunk.FileId + f.fileIdDeletionChan <- chunk.GetFileIdString() } } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 1378b6122..6e586b7df 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -17,7 +17,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu var fileIds []string for _, chunk := range chunks { - fileIds = append(fileIds, chunk.FileId) + fileIds = append(fileIds, chunk.GetFileIdString()) } wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d24770e3d..97e9671a3 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -39,7 +39,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p fileId, err := fs.fetchAndWrite(ctx, sourceChunk) if err != nil { - return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } return &filer_pb.FileChunk{ @@ -48,15 +48,15 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p Size: sourceChunk.Size, Mtime: sourceChunk.Mtime, ETag: sourceChunk.ETag, - SourceFileId: sourceChunk.FileId, + SourceFileId: sourceChunk.GetFileIdString(), }, nil } func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) { - filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.FileId) + filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString()) if err != nil { - return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err) + return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } defer readCloser.Close() diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index ff0fe8b74..f99c7fdf6 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -179,7 +179,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -207,23 +207,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks) - return -} - -func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { - for _, a := range as { - found := false - for _, b := range bs { - if a.FileId == b.FileId { - found = true - break - } - } - if !found { - delta = append(delta, a) - } - } + deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) + newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) return } diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index e6af085fd..0739b2220 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -69,7 +69,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { for _, chunk := range entry.Chunks { p := &filer_pb.FileChunk{ - FileId: chunk.FileId, + FileId: chunk.GetFileIdString(), Offset: offset, Size: chunk.Size, Mtime: chunk.Mtime, diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index bcac4dbde..f8bb9cf07 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -140,7 +140,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } // remove old chunks if not included in the new ones - unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks) + unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 1d18bd75e..c7cb4f0cc 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -78,7 +78,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { - fileId := entry.Chunks[0].FileId + fileId := entry.Chunks[0].GetFileIdString() urlString, err := fs.filer.MasterClient.LookupFileId(fileId) if err != nil {