Browse Source

use public url also for deletions

pull/1255/head
Chris Lu 5 years ago
parent
commit
d848d08944
  1. 2
      weed/command/benchmark.go
  2. 2
      weed/command/filer_copy.go
  3. 4
      weed/operation/chunked_file.go
  4. 14
      weed/operation/delete_content.go
  5. 10
      weed/operation/submit.go
  6. 2
      weed/server/volume_server_handlers_write.go

2
weed/command/benchmark.go

@ -241,7 +241,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" { if !isSecure && assignResult.Auth != "" {
isSecure = true isSecure = true
} }
if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage { if random.Intn(100) < *b.deletePercentage {
s.total++ s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}

2
weed/command/filer_copy.go

@ -459,7 +459,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks { for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId) fileIds = append(fileIds, chunk.FileId)
} }
operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds)
return uploadError return uploadError
} }

4
weed/operation/chunked_file.go

@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm) return json.Marshal(cm)
} }
func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string var fileIds []string
for _, ci := range cm.Chunks { for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid) fileIds = append(fileIds, ci.Fid)
} }
results, err := DeleteFiles(master, grpcDialOption, fileIds)
results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds)
if err != nil { if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err) glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err) return fmt.Errorf("chunk delete: %v", err)

14
weed/operation/delete_content.go

@ -29,10 +29,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
} }
// DeleteFiles batch deletes a list of fileIds // DeleteFiles batch deletes a list of fileIds
func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
return LookupVolumeIds(master, grpcDialOption, vids)
lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
results, err = LookupVolumeIds(master, grpcDialOption, vids)
if err == nil && usePublicUrl {
for _, result := range results {
for _, loc := range result.Locations {
loc.Url = loc.PublicUrl
}
}
}
return
} }
return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)

10
weed/operation/submit.go

@ -68,7 +68,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Replication = replication file.Replication = replication
file.Collection = collection file.Collection = collection
file.DataCenter = dataCenter file.DataCenter = dataCenter
results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil { if err != nil {
results[index].Error = err.Error() results[index].Error = err.Error()
} }
@ -111,7 +111,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil return ret, nil
} }
func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 { if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@ -155,7 +155,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
ret, err = Assign(master, grpcDialOption, ar) ret, err = Assign(master, grpcDialOption, ar)
if err != nil { if err != nil {
// delete all uploaded chunks // delete all uploaded chunks
cm.DeleteChunks(master, grpcDialOption)
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return return
} }
id = ret.Fid id = ret.Fid
@ -173,7 +173,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
ret.Auth) ret.Auth)
if e != nil { if e != nil {
// delete all uploaded chunks // delete all uploaded chunks
cm.DeleteChunks(master, grpcDialOption)
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return 0, e return 0, e
} }
cm.Chunks = append(cm.Chunks, cm.Chunks = append(cm.Chunks,
@ -188,7 +188,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
err = upload_chunked_file_manifest(fileUrl, &cm, jwt) err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil { if err != nil {
// delete all uploaded chunks // delete all uploaded chunks
cm.DeleteChunks(master, grpcDialOption)
cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
} }
} else { } else {
ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)

2
weed/server/volume_server_handlers_write.go

@ -126,7 +126,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// make sure all chunks had deleted before delete manifest // make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return return
} }

Loading…
Cancel
Save