Browse Source

refactor

pull/7411/head
chrislu 2 months ago
parent
commit
7df898e00b
  1. 81
      weed/server/filer_grpc_server.go
  2. 71
      weed/wdclient/masterclient.go

81
weed/server/filer_grpc_server.go

@ -5,8 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@ -96,65 +94,17 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
LocationsMap: make(map[string]*filer_pb.Locations),
}
// Collect volume IDs that are not in cache for batch lookup
var vidsToLookup []string
for _, vidString := range req.VolumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
glog.V(1).InfofCtx(ctx, "Invalid volume id (must be uint32): %s", vidString)
return nil, err
}
// Check cache first
locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
if found && len(locations) > 0 {
// Found in cache
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: wdclientLocationsToPb(locations),
}
} else {
// Not in cache, need to query master
vidsToLookup = append(vidsToLookup, vidString)
}
// Use master client's lookup with fallback - it handles cache and master query
vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
if err != nil {
// Return partial results even on error
glog.V(0).InfofCtx(ctx, "failed to lookup some volumes: %v", err)
}
// Query master for volumes not in cache
if len(vidsToLookup) > 0 {
glog.V(2).InfofCtx(ctx, "Looking up %d volumes from master: %v", len(vidsToLookup), vidsToLookup)
err := operation.WithMasterServerClient(false, fs.filer.GetMaster(ctx), fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
masterResp, err := masterClient.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: vidsToLookup,
})
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
}
// Process master response
for _, vidLoc := range masterResp.VolumeIdLocations {
if vidLoc.Error != "" {
glog.V(0).InfofCtx(ctx, "volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
vidString := vidLoc.VolumeOrFileId
// Parse volume ID from response (could be "123" or "123,abc")
parts := strings.Split(vidString, ",")
vidOnly := parts[0]
locs := masterLocationsToPb(vidLoc.Locations)
if len(locs) > 0 {
resp.LocationsMap[vidOnly] = &filer_pb.Locations{
Locations: locs,
}
}
}
return nil
})
if err != nil {
glog.V(0).InfofCtx(ctx, "failed to lookup volumes from master: %v", err)
// Don't return error, return partial results
// Convert wdclient.Location to filer_pb.Location
for vidString, locations := range vidLocations {
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: wdclientLocationsToPb(locations),
}
}
@ -174,19 +124,6 @@ func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
return locs
}
func masterLocationsToPb(masterLocs []*master_pb.Location) []*filer_pb.Location {
var locs []*filer_pb.Location
for _, masterLoc := range masterLocs {
locs = append(locs, &filer_pb.Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: masterLoc.GrpcPort,
DataCenter: masterLoc.DataCenter,
})
}
return locs
}
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {

71
weed/wdclient/masterclient.go

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"
@ -94,6 +96,75 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
return
}
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var missingVids []string
// Check cache first
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
}
locations, found := mc.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
missingVids = append(missingVids, vidString)
}
}
// Query master for missing volumes
if len(missingVids) > 0 {
glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids)
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: missingVids,
})
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
}
for _, vidLoc := range resp.VolumeIdLocations {
if vidLoc.Error != "" {
glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
vidString := vidLoc.VolumeOrFileId
// Parse volume ID from response (could be "123" or "123,abc")
parts := strings.Split(vidString, ",")
vidOnly := parts[0]
vid, _ := strconv.ParseUint(vidOnly, 10, 32)
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
mc.vidMap.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
result[vidOnly] = locations
}
}
return nil
})
if err != nil {
return result, err
}
}
return result, nil
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()

Loading…
Cancel
Save