Browse Source

move some volume lookup operations to grpc

jwt related lookup will come in next commit
pull/2274/head
Chris Lu 4 years ago
parent
commit
d1d1fc772c
  1. 2
      weed/command/backup.go
  2. 17
      weed/operation/lookup.go
  3. 2
      weed/operation/tail_volume.go
  4. 2
      weed/server/volume_server_handlers_read.go
  5. 4
      weed/server/volume_server_handlers_write.go
  6. 16
      weed/topology/store_replicate.go

2
weed/command/backup.go

@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId) vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info // find volume location, replication, ttl info
lookup, err := operation.Lookup(func() string { return *s.master }, vid.String())
lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String())
if err != nil { if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err) fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true return true

17
weed/operation/lookup.go

@ -79,16 +79,21 @@ func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err erro
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
} }
func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error) {
results, err := LookupVolumeIds(masterFn, grpcDialOption, []string{vid})
return results[vid], err
}
// LookupVolumeIds find volume locations by cache and actual lookup // LookupVolumeIds find volume locations by cache and actual lookup
func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error) {
ret := make(map[string]*LookupResult)
var unknown_vids []string var unknown_vids []string
//check vid cache first //check vid cache first
for _, vid := range vids { for _, vid := range vids {
locations, cache_err := vc.Get(vid)
if cache_err == nil {
ret[vid] = LookupResult{VolumeId: vid, Locations: locations}
locations, cacheErr := vc.Get(vid)
if cacheErr == nil {
ret[vid] = &LookupResult{VolumeId: vid, Locations: locations}
} else { } else {
unknown_vids = append(unknown_vids, vid) unknown_vids = append(unknown_vids, vid)
} }
@ -122,7 +127,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
if vidLocations.Error != "" { if vidLocations.Error != "" {
vc.Set(vidLocations.VolumeId, locations, 10*time.Minute) vc.Set(vidLocations.VolumeId, locations, 10*time.Minute)
} }
ret[vidLocations.VolumeId] = LookupResult{
ret[vidLocations.VolumeId] = &LookupResult{
VolumeId: vidLocations.VolumeId, VolumeId: vidLocations.VolumeId,
Locations: locations, Locations: locations,
Error: vidLocations.Error, Error: vidLocations.Error,

2
weed/operation/tail_volume.go

@ -13,7 +13,7 @@ import (
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info // find volume location, replication, ttl info
lookup, err := Lookup(masterFn, vid.String())
lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
if err != nil { if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err) return fmt.Errorf("look up volume %d: %v", vid, err)
} }

2
weed/server/volume_server_handlers_read.go

@ -65,7 +65,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err != nil || len(lookupResult.Locations) <= 0 { if err != nil || len(lookupResult.Locations) <= 0 {
glog.V(0).Infoln("lookup error:", err, r.URL.Path) glog.V(0).Infoln("lookup error:", err, r.URL.Path)

4
weed/server/volume_server_handlers_write.go

@ -53,7 +53,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r)
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r)
// http 204 status code does not allow body // http 204 status code does not allow body
if writeError == nil && isUnchanged { if writeError == nil && isUnchanged {
@ -146,7 +146,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
_, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r)
_, err := topology.ReplicatedDelete(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, n, r)
writeDeleteResult(err, count, w, r) writeDeleteResult(err, count, w, r)

16
weed/topology/store_replicate.go

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"google.golang.org/grpc"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
@ -18,7 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
@ -27,7 +28,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
// this is the initial request // this is the initial request
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn)
remoteLocations, err = getWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
if err != nil { if err != nil {
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -92,16 +93,14 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId
return return
} }
func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size types.Size, err error) {
func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size types.Size, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn)
remoteLocations, err = getWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn)
if err != nil { if err != nil {
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -161,8 +160,7 @@ func DistributedOperation(locations []operation.Location, op func(location opera
return ret.Error() return ret.Error()
} }
func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (
remoteLocations []operation.Location, err error) {
func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) {
v := s.GetVolume(volumeId) v := s.GetVolume(volumeId)
if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 {
@ -170,7 +168,7 @@ func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, m
} }
// not on local store, or has replications // not on local store, or has replications
lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String())
lookupResult, lookupErr := operation.LookupVolumeId(masterFn, grpcDialOption, volumeId.String())
if lookupErr == nil { if lookupErr == nil {
selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
for _, location := range lookupResult.Locations { for _, location := range lookupResult.Locations {

Loading…
Cancel
Save