Browse Source

refactor

pull/3645/head
chrislu 2 years ago
parent
commit
b834027c5a
  1. 3
      weed/command/filer_remote_gateway_buckets.go
  2. 12
      weed/command/filer_remote_sync_dir.go

3
weed/command/filer_remote_gateway_buckets.go

@ -9,7 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"math" "math"
@ -183,7 +182,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
if message.NewParentPath == option.bucketsDir { if message.NewParentPath == option.bucketsDir {
return handleCreateBucket(message.NewEntry) return handleCreateBucket(message.NewEntry)
} }
if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/"+s3_constants.MultipartUploadsFolder+"/") {
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
return nil return nil
} }
if !filer.HasData(message.NewEntry) { if !filer.HasData(message.NewEntry) {

12
weed/command/filer_remote_sync_dir.go

@ -113,7 +113,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
return nil return nil
} }
if filer_pb.IsCreate(resp) { if filer_pb.IsCreate(resp) {
if strings.Contains(message.NewParentPath, "/"+s3_constants.MultipartUploadsFolder+"/") {
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
return nil return nil
} }
if !filer.HasData(message.NewEntry) { if !filer.HasData(message.NewEntry) {
@ -165,8 +165,8 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
glog.V(2).Infof("update: %+v", resp) glog.V(2).Infof("update: %+v", resp)
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
if err := client.DeleteFile(oldDest); err != nil { if err := client.DeleteFile(oldDest); err != nil {
if !strings.Contains(resp.Directory, "/"+s3_constants.MultipartUploadsFolder+"/") {
return err
if isMultipartUploadFile(resp.Directory, message.OldEntry.Name) {
return nil
} }
} }
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest) remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
@ -258,3 +258,9 @@ func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer
return err return err
}) })
} }
func isMultipartUploadFile(dir string, name string) bool {
return strings.HasPrefix(dir, "/buckets/") &&
strings.Contains(dir, "/"+s3_constants.MultipartUploadsFolder+"/") &&
strings.HasSuffix(name, ".part")
}
Loading…
Cancel
Save