Chris Lu
5 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with
12 additions and
9 deletions
-
weed/replication/sink/filersink/fetch_write.go
-
weed/replication/sink/s3sink/s3_sink.go
|
|
@ -19,17 +19,20 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str |
|
|
|
if len(sourceChunks) == 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks)) |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
for _, sourceChunk := range sourceChunks { |
|
|
|
for chunkIndex, sourceChunk := range sourceChunks { |
|
|
|
wg.Add(1) |
|
|
|
go func(chunk *filer_pb.FileChunk) { |
|
|
|
go func(chunk *filer_pb.FileChunk, index int) { |
|
|
|
defer wg.Done() |
|
|
|
replicatedChunk, e := fs.replicateOneChunk(chunk, dir) |
|
|
|
if e != nil { |
|
|
|
err = e |
|
|
|
} |
|
|
|
replicatedChunks = append(replicatedChunks, replicatedChunk) |
|
|
|
}(sourceChunk) |
|
|
|
replicatedChunks[index] = replicatedChunk |
|
|
|
}(sourceChunk, chunkIndex) |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
|
|
@ -91,7 +91,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b |
|
|
|
} |
|
|
|
|
|
|
|
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
|
|
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
if entry.IsDirectory { |
|
|
@ -106,19 +105,20 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
|
totalSize := filer2.TotalSize(entry.Chunks) |
|
|
|
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) |
|
|
|
|
|
|
|
var parts []*s3.CompletedPart |
|
|
|
parts := make([]*s3.CompletedPart, len(chunkViews)) |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
for chunkIndex, chunk := range chunkViews { |
|
|
|
partId := chunkIndex + 1 |
|
|
|
wg.Add(1) |
|
|
|
go func(chunk *filer2.ChunkView) { |
|
|
|
go func(chunk *filer2.ChunkView, index int) { |
|
|
|
defer wg.Done() |
|
|
|
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { |
|
|
|
err = uploadErr |
|
|
|
} else { |
|
|
|
parts = append(parts, part) |
|
|
|
parts[index] = part |
|
|
|
} |
|
|
|
}(chunk) |
|
|
|
}(chunk, chunkIndex) |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
|
|
|
|