|
|
@ -91,7 +91,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b |
|
|
|
} |
|
|
|
|
|
|
|
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
|
|
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
if entry.IsDirectory { |
|
|
@ -106,19 +105,20 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
|
totalSize := filer2.TotalSize(entry.Chunks) |
|
|
|
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) |
|
|
|
|
|
|
|
var parts []*s3.CompletedPart |
|
|
|
parts := make([]*s3.CompletedPart, len(chunkViews)) |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
for chunkIndex, chunk := range chunkViews { |
|
|
|
partId := chunkIndex + 1 |
|
|
|
wg.Add(1) |
|
|
|
go func(chunk *filer2.ChunkView) { |
|
|
|
go func(chunk *filer2.ChunkView, index int) { |
|
|
|
defer wg.Done() |
|
|
|
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { |
|
|
|
err = uploadErr |
|
|
|
} else { |
|
|
|
parts = append(parts, part) |
|
|
|
parts[index] = part |
|
|
|
} |
|
|
|
}(chunk) |
|
|
|
}(chunk, chunkIndex) |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
|
|
|
|