Browse Source

s3: add option to fsync buckets

pull/1273/head
Chris Lu 5 years ago
parent
commit
1c65656fb4
  1. 4
      weed/command/scaffold.go
  2. 1
      weed/filer2/filer.go
  3. 19
      weed/filer2/filer_buckets.go
  4. 2
      weed/server/filer_grpc_server.go
  5. 3
      weed/server/filer_server.go
  6. 17
      weed/server/filer_server_handlers_write.go
  7. 8
      weed/server/filer_server_handlers_write_autochunk.go
  8. 4
      weed/server/filer_server_handlers_write_cipher.go

4
weed/command/scaffold.go

@ -76,6 +76,10 @@ const (
recursive_delete = false recursive_delete = false
# directories under this folder will be automatically creating a separate bucket # directories under this folder will be automatically creating a separate bucket
buckets_folder = "/buckets" buckets_folder = "/buckets"
buckets_fsync = [ # a list of buckets with all write requests fsync=true
"important_bucket",
"should_always_fsync",
]
# directories under this folder will be store message queue data # directories under this folder will be store message queue data
queues_folder = "/queues" queues_folder = "/queues"

1
weed/filer2/filer.go

@ -33,6 +33,7 @@ type Filer struct {
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption
DirBucketsPath string DirBucketsPath string
DirQueuesPath string DirQueuesPath string
FsyncBuckets []string
buckets *FilerBuckets buckets *FilerBuckets
Cipher bool Cipher bool
metaLogBuffer *log_buffer.LogBuffer metaLogBuffer *log_buffer.LogBuffer

19
weed/filer2/filer_buckets.go

@ -13,6 +13,7 @@ type BucketName string
type BucketOption struct { type BucketOption struct {
Name BucketName Name BucketName
Replication string Replication string
fsync bool
} }
type FilerBuckets struct { type FilerBuckets struct {
dirBucketsPath string dirBucketsPath string
@ -20,36 +21,42 @@ type FilerBuckets struct {
sync.RWMutex sync.RWMutex
} }
func (f *Filer) LoadBuckets(dirBucketsPath string) {
func (f *Filer) LoadBuckets() {
f.buckets = &FilerBuckets{ f.buckets = &FilerBuckets{
buckets: make(map[BucketName]*BucketOption), buckets: make(map[BucketName]*BucketOption),
} }
f.DirBucketsPath = dirBucketsPath
limit := math.MaxInt32 limit := math.MaxInt32
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit)
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit)
if err != nil { if err != nil {
glog.V(1).Infof("no buckets found: %v", err) glog.V(1).Infof("no buckets found: %v", err)
return return
} }
shouldFsyncMap := make(map[string]bool)
for _, bucket := range f.FsyncBuckets {
shouldFsyncMap[bucket] = true
}
glog.V(1).Infof("buckets found: %d", len(entries)) glog.V(1).Infof("buckets found: %d", len(entries))
f.buckets.Lock() f.buckets.Lock()
for _, entry := range entries { for _, entry := range entries {
_, shouldFsnyc := shouldFsyncMap[entry.Name()]
f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
Name: BucketName(entry.Name()), Name: BucketName(entry.Name()),
Replication: entry.Replication, Replication: entry.Replication,
fsync: shouldFsnyc,
} }
} }
f.buckets.Unlock() f.buckets.Unlock()
} }
func (f *Filer) ReadBucketOption(buketName string) (replication string) {
func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
f.buckets.RLock() f.buckets.RLock()
defer f.buckets.RUnlock() defer f.buckets.RUnlock()
@ -57,9 +64,9 @@ func (f *Filer) ReadBucketOption(buketName string) (replication string) {
option, found := f.buckets.buckets[BucketName(buketName)] option, found := f.buckets.buckets[BucketName(buketName)]
if !found { if !found {
return ""
return "", false
} }
return option.Replication
return option.Replication, option.fsync
} }

2
weed/server/filer_grpc_server.go

@ -232,7 +232,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 { if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec)) ttlStr = strconv.Itoa(int(req.TtlSec))
} }
collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest var altRequest *operation.VolumeAssignRequest

3
weed/server/filer_server.go

@ -95,6 +95,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
v.SetDefault("filer.options.queues_folder", "/queues") v.SetDefault("filer.options.queues_folder", "/queues")
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
fs.filer.DirQueuesPath = v.GetString("filer.options.queues_folder") fs.filer.DirQueuesPath = v.GetString("filer.options.queues_folder")
fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v) fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.") notification.LoadConfiguration(v, "notification.")
@ -107,7 +108,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
} }
fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
fs.filer.LoadBuckets()
util.OnInterrupt(func() { util.OnInterrupt(func() {
fs.filer.Shutdown() fs.filer.Shutdown()

17
weed/server/filer_server_handlers_write.go

@ -40,7 +40,7 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"` Url string `json:"url,omitempty"`
} }
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc() stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now() start := time.Now()
@ -73,6 +73,9 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
} }
fileId = assignResult.Fid fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
if fsync {
urlLocation += "?fsync=true"
}
auth = assignResult.Auth auth = assignResult.Auth
return return
} }
@ -82,7 +85,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background() ctx := context.Background()
query := r.URL.Query() query := r.URL.Query()
collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter") dataCenter := query.Get("dataCenter")
if dataCenter == "" { if dataCenter == "" {
dataCenter = fs.option.DataCenter dataCenter = fs.option.DataCenter
@ -96,12 +99,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ttlSeconds = int32(ttl.Minutes()) * 60 ttlSeconds = int32(ttl.Minutes()) * 60
} }
if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString); autoChunked {
if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
return return
} }
if fs.option.Cipher { if fs.option.Cipher {
reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString)
reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil { } else if reply != nil {
@ -111,7 +114,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" { if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
@ -327,7 +330,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) {
func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
// default // default
collection = fs.option.Collection collection = fs.option.Collection
replication = fs.option.DefaultReplication replication = fs.option.DefaultReplication
@ -350,7 +353,7 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
if t > 0 { if t > 0 {
collection = bucketAndObjectKey[:t] collection = bucketAndObjectKey[:t]
} }
replication = fs.filer.ReadBucketOption(collection)
replication, fsync = fs.filer.ReadBucketOption(collection)
} }
return return

8
weed/server/filer_server_handlers_write_autochunk.go

@ -21,7 +21,7 @@ import (
) )
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
replication string, collection string, dataCenter string, ttlSec int32, ttlString string) bool {
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
if r.Method != "POST" { if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method) glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false return false
@ -57,7 +57,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
return false return false
} }
reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString)
reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil { } else if reply != nil {
@ -67,7 +67,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
} }
func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string) (filerResult *FilerPostResult, replyerr error) {
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
start := time.Now() start := time.Now()
@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
limitedReader := io.LimitReader(partReader, int64(chunkSize)) limitedReader := io.LimitReader(partReader, int64(chunkSize))
// assign one file id for one chunk // assign one file id for one chunk
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if assignErr != nil { if assignErr != nil {
return nil, assignErr return nil, assignErr
} }

4
weed/server/filer_server_handlers_write_cipher.go

@ -17,9 +17,9 @@ import (
// handling single chunk POST or PUT upload // handling single chunk POST or PUT upload
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string) (filerResult *FilerPostResult, err error) {
replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" { if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)

Loading…
Cancel
Save