Browse Source

Merge pull request #3318 from kmlebedev/issues/3310

Use fallback if urls are not found
pull/3339/head
Chris Lu 3 years ago
committed by GitHub
parent
commit
9667588af0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      weed/filer/stream.go
  2. 6
      weed/wdclient/masterclient.go

22
weed/filer/stream.go

@ -18,6 +18,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
) )
var getLookupFileIdBackoffSchedule = []time.Duration{
150 * time.Millisecond,
600 * time.Millisecond,
1800 * time.Millisecond,
}
func HasData(entry *filer_pb.Entry) bool { func HasData(entry *filer_pb.Entry) bool {
if len(entry.Content) > 0 { if len(entry.Content) > 0 {
@ -69,14 +75,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url := make(map[string][]string) fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews { for _, chunkView := range chunkViews {
urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err == nil && len(urlStrings) > 0 {
time.Sleep(backoff)
break
}
}
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err
} else if len(urlStrings) == 0 { } else if len(urlStrings) == 0 {
glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
glog.Error(errUrlNotFound)
return errUrlNotFound
} }
fileId2Url[chunkView.FileId] = urlStrings fileId2Url[chunkView.FileId] = urlStrings
} }

6
weed/wdclient/masterclient.go

@ -2,6 +2,7 @@ package wdclient
import ( import (
"context" "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/stats"
"math/rand" "math/rand"
"time" "time"
@ -44,7 +45,7 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) { func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
fullUrls, err = mc.vidMap.LookupFileId(fileId) fullUrls, err = mc.vidMap.LookupFileId(fileId)
if err == nil {
if err == nil && len(fullUrls) > 0 {
return return
} }
err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
@ -52,7 +53,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
VolumeOrFileIds: []string{fileId}, VolumeOrFileIds: []string{fileId},
}) })
if err != nil { if err != nil {
return err
return fmt.Errorf("LookupVolume failed: %v", err)
} }
for vid, vidLocation := range resp.VolumeIdLocations { for vid, vidLocation := range resp.VolumeIdLocations {
for _, vidLoc := range vidLocation.Locations { for _, vidLoc := range vidLocation.Locations {
@ -65,7 +66,6 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId) fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
} }
} }
return nil return nil
}) })
return return

Loading…
Cancel
Save