Browse Source

refactoring

pull/991/head
Chris Lu 6 years ago
parent
commit
3fa1f150d9
  1. 16
      weed/filer2/filechunks.go
  2. 2
      weed/filer2/filer_deletion.go
  3. 2
      weed/filesys/wfs_deletion.go
  4. 8
      weed/replication/sink/filersink/fetch_write.go
  5. 22
      weed/replication/sink/filersink/filer_sink.go
  6. 2
      weed/s3api/filer_multipart.go
  7. 2
      weed/server/filer_grpc_server.go
  8. 2
      weed/server/filer_server_handlers_read.go

16
weed/filer2/filechunks.go

@ -40,7 +40,7 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
if found := fileIds[chunk.FileId]; found {
if _, found := fileIds[chunk.GetFileIdString()]; found {
compacted = append(compacted, chunk)
} else {
garbage = append(garbage, chunk)
@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
return
}
func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
for _, interval := range newChunks {
fileIds[interval.FileId] = true
for _, interval := range bs {
fileIds[interval.GetFileIdString()] = true
}
for _, chunk := range oldChunks {
if found := fileIds[chunk.FileId]; !found {
unused = append(unused, chunk)
for _, chunk := range as {
if _, found := fileIds[chunk.GetFileIdString()]; !found {
delta = append(delta, chunk)
}
}
@ -123,7 +123,7 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
newV := newVisibleInterval(
chunk.Offset,
chunk.Offset+int64(chunk.Size),
chunk.FileId,
chunk.GetFileIdString(),
chunk.Mtime,
true,
)

2
weed/filer2/filer_deletion.go

@ -54,7 +54,7 @@ func (f *Filer) loopProcessingDeletion() {
func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
f.fileIdDeletionChan <- chunk.FileId
f.fileIdDeletionChan <- chunk.GetFileIdString()
}
}

2
weed/filesys/wfs_deletion.go

@ -17,7 +17,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu
var fileIds []string
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
fileIds = append(fileIds, chunk.GetFileIdString())
}
wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {

8
weed/replication/sink/filersink/fetch_write.go

@ -39,7 +39,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
fileId, err := fs.fetchAndWrite(ctx, sourceChunk)
if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
@ -48,15 +48,15 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.FileId,
SourceFileId: sourceChunk.GetFileIdString(),
}, nil
}
func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.FileId)
filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString())
if err != nil {
return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
defer readCloser.Close()

22
weed/replication/sink/filersink/filer_sink.go

@ -179,7 +179,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks)
existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
@ -207,23 +207,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
}
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks)
newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}
func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
for _, a := range as {
found := false
for _, b := range bs {
if a.FileId == b.FileId {
found = true
break
}
}
if !found {
delta = append(delta, a)
}
}
deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}

2
weed/s3api/filer_multipart.go

@ -69,7 +69,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
for _, chunk := range entry.Chunks {
p := &filer_pb.FileChunk{
FileId: chunk.FileId,
FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
Mtime: chunk.Mtime,

2
weed/server/filer_grpc_server.go

@ -140,7 +140,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
// remove old chunks if not included in the new ones
unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)

2
weed/server/filer_server_handlers_read.go

@ -78,7 +78,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
fileId := entry.Chunks[0].FileId
fileId := entry.Chunks[0].GetFileIdString()
urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
if err != nil {

Loading…
Cancel
Save