|
|
@ -60,7 +60,16 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. |
|
|
|
if err := client.CreateBucket(entry.Name); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
|
|
|
|
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|
|
|
remoteLocation := &remote_pb.RemoteStorageLocation{ |
|
|
|
Name: *option.createBucketAt, |
|
|
|
Bucket: entry.Name, |
|
|
|
Path: "/", |
|
|
|
} |
|
|
|
|
|
|
|
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) |
|
|
|
|
|
|
|
} |
|
|
|
handleDeleteBucket := func(entry *filer_pb.Entry) error { |
|
|
|
if !entry.IsDirectory { |
|
|
@ -76,27 +85,39 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. |
|
|
|
if err := client.DeleteBucket(entry.Name); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
|
|
|
|
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|
|
|
|
|
|
|
return filer.DeleteMountMapping(option, string(bucketPath)) |
|
|
|
} |
|
|
|
|
|
|
|
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|
|
|
message := resp.EventNotification |
|
|
|
if message.NewEntry == nil { |
|
|
|
return nil |
|
|
|
} |
|
|
|
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|
|
|
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|
|
|
if readErr != nil { |
|
|
|
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|
|
|
if message.NewEntry != nil { |
|
|
|
// update
|
|
|
|
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|
|
|
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|
|
|
if readErr != nil { |
|
|
|
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|
|
|
} |
|
|
|
option.mappings = newMappings |
|
|
|
} |
|
|
|
option.mappings = newMappings |
|
|
|
} |
|
|
|
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|
|
|
conf := &remote_pb.RemoteConf{} |
|
|
|
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|
|
|
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|
|
|
conf := &remote_pb.RemoteConf{} |
|
|
|
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|
|
|
} |
|
|
|
option.remoteConfs[conf.Name] = conf |
|
|
|
} |
|
|
|
} else if message.OldEntry != nil { |
|
|
|
// deletion
|
|
|
|
if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|
|
|
conf := &remote_pb.RemoteConf{} |
|
|
|
if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { |
|
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) |
|
|
|
} |
|
|
|
delete(option.remoteConfs, conf.Name) |
|
|
|
} |
|
|
|
option.remoteConfs[conf.Name] = conf |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|