diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 363f3e7a9..87540383d 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -119,6 +119,10 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return writeErr } + if len(entry.Content) > 0 { + return writeFunc(entry.Content) + } + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { return err } diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index fe65a9447..52883644b 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -101,13 +101,16 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int targetObject := bucket.Object(key) writer := targetObject.NewWriter(context.Background()) + defer writer.Close() writeFunc := func(data []byte) error { _, writeErr := writer.Write(data) return writeErr } - defer writer.Close() + if len(entry.Content) > 0 { + return writeFunc(entry.Content) + } if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { return err diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 2d48f3682..8c9fd5b15 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -107,6 +107,10 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in return writeErr } + if len(entry.Content) > 0 { + return writeFunc(entry.Content) + } + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { return err } diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index 56e102ebc..5a953353b 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -101,6 +101,10 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa return writeErr } + if len(entry.Content) > 0 { + return writeFunc(entry.Content) + } + if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { return err } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 4fb59fb37..55a56c6f5 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -1,6 +1,7 @@ package S3Sink import ( + "bytes" "context" "fmt" "strings" @@ -121,6 +122,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures } totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) @@ -141,6 +143,17 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures } wg.Wait() + // for small files + if len(entry.Content) > 0 { + parts = make([]*s3.CompletedPart, 1) + if part, uploadErr := s3sink.doUploadPart(key, uploadId, 1, bytes.NewReader(entry.Content)); uploadErr != nil { + err = uploadErr + glog.Errorf("uploadPart: %v", uploadErr) + } else { + parts[0] = part + } + } + if err != nil { s3sink.abortMultipartUpload(key, uploadId) return fmt.Errorf("uploadPart: %v", err) diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 3f0f482fa..480859db8 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -116,6 +116,11 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) } + return s3sink.doUploadPart(key, uploadId, partId, readSeeker) +} + +func (s3sink *S3Sink) doUploadPart(key, uploadId string, partId int, readSeeker io.ReadSeeker) (*s3.CompletedPart, error) { + input := &s3.UploadPartInput{ Body: readSeeker, Bucket: aws.String(s3sink.bucket),