Browse Source

[Filer] post add param:saveInside (#4434)

* fix:mount deadlock

* feat: filer http upload to metadata

* feat: /etc save inside

---------

Co-authored-by: zemul <zhouzemiao@ihuman.com>
pull/4439/head
zemul 2 years ago
committed by GitHub
parent
commit
e9fda774f4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      weed/operation/assign_file_id.go
  2. 12
      weed/server/filer_server_handlers_write.go
  3. 11
      weed/server/filer_server_handlers_write_autochunk.go

1
weed/operation/assign_file_id.go

@ -137,6 +137,7 @@ type StorageOption struct {
TtlSeconds int32 TtlSeconds int32
Fsync bool Fsync bool
VolumeGrowthCount uint32 VolumeGrowthCount uint32
SaveInside bool
} }
func (so *StorageOption) TtlString() string { func (so *StorageOption) TtlString() string {

12
weed/server/filer_server_handlers_write.go

@ -87,6 +87,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
query.Get("dataCenter"), query.Get("dataCenter"),
query.Get("rack"), query.Get("rack"),
query.Get("dataNode"), query.Get("dataNode"),
query.Get("saveInside"),
) )
if err != nil { if err != nil {
if err == ErrReadOnly { if err == ErrReadOnly {
@ -103,6 +104,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
so.DiskType = fs.option.DiskType so.DiskType = fs.option.DiskType
} }
if strings.HasPrefix(r.URL.Path, "/etc") {
so.SaveInside = true
}
if query.Has("mv.from") { if query.Has("mv.from") {
fs.move(ctx, w, r, so) fs.move(ctx, w, r, so)
} else { } else {
@ -246,7 +251,7 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}, nil }, nil
} }
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl) ttl, err := needle.ReadTTL(qTtl)
if err != nil { if err != nil {
@ -260,6 +265,11 @@ func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplicatio
} else if fsync == "true" { } else if fsync == "true" {
so.Fsync = true so.Fsync = true
} }
if saveInside == "true" {
so.SaveInside = true
} else {
so.SaveInside = false
}
} }
return so, err return so, err

11
weed/server/filer_server_handlers_write_autochunk.go

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
//"github.com/seaweedfs/seaweedfs/weed/s3api" //"github.com/seaweedfs/seaweedfs/weed/s3api"
@ -64,7 +65,6 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
} }
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader() multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil { if multipartReaderErr != nil {
return nil, nil, multipartReaderErr return nil, nil, multipartReaderErr
@ -84,6 +84,15 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
contentType = "" contentType = ""
} }
if so.SaveInside {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
buf.ReadFrom(part1)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, nil, nil, 0, buf.Bytes())
bufPool.Put(buf)
return
}
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

Loading…
Cancel
Save