168 lines
5.1 KiB

3 years ago
4 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/topology"
  13. )
  14. func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
  15. volumeLocations = make(map[string]operation.LookupResult)
  16. for _, vid := range vids {
  17. commaSep := strings.Index(vid, ",")
  18. if commaSep > 0 {
  19. vid = vid[0:commaSep]
  20. }
  21. if _, ok := volumeLocations[vid]; ok {
  22. continue
  23. }
  24. volumeLocations[vid] = ms.findVolumeLocation(collection, vid)
  25. }
  26. return
  27. }
  28. // If "fileId" is provided, this returns the fileId location and a JWT to update or delete the file.
  29. // If "volumeId" is provided, this only returns the volumeId location
  30. func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) {
  31. vid := r.FormValue("volumeId")
  32. if vid != "" {
  33. // backward compatible
  34. commaSep := strings.Index(vid, ",")
  35. if commaSep > 0 {
  36. vid = vid[0:commaSep]
  37. }
  38. }
  39. fileId := r.FormValue("fileId")
  40. if fileId != "" {
  41. commaSep := strings.Index(fileId, ",")
  42. if commaSep > 0 {
  43. vid = fileId[0:commaSep]
  44. }
  45. }
  46. collection := r.FormValue("collection") // optional, but can be faster if too many collections
  47. location := ms.findVolumeLocation(collection, vid)
  48. httpStatus := http.StatusOK
  49. if location.Error != "" || location.Locations == nil {
  50. httpStatus = http.StatusNotFound
  51. } else {
  52. forRead := r.FormValue("read")
  53. isRead := forRead == "yes"
  54. ms.maybeAddJwtAuthorization(w, fileId, !isRead)
  55. }
  56. writeJsonQuiet(w, r, httpStatus, location)
  57. }
  58. // findVolumeLocation finds the volume location from master topo if it is leader,
  59. // or from master client if not leader
  60. func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.LookupResult {
  61. var locations []operation.Location
  62. var err error
  63. if ms.Topo.IsLeader() {
  64. volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
  65. if newVolumeIdErr != nil {
  66. err = fmt.Errorf("Unknown volume id %s", vid)
  67. } else {
  68. machines := ms.Topo.Lookup(collection, volumeId)
  69. for _, loc := range machines {
  70. locations = append(locations, operation.Location{
  71. Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(),
  72. })
  73. }
  74. }
  75. } else {
  76. machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
  77. for _, loc := range machines {
  78. locations = append(locations, operation.Location{
  79. Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter,
  80. })
  81. }
  82. err = getVidLocationsErr
  83. }
  84. if len(locations) == 0 && err == nil {
  85. err = fmt.Errorf("volume id %s not found", vid)
  86. }
  87. ret := operation.LookupResult{
  88. VolumeOrFileId: vid,
  89. Locations: locations,
  90. }
  91. if err != nil {
  92. ret.Error = err.Error()
  93. }
  94. return ret
  95. }
  96. func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
  97. stats.AssignRequest()
  98. requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64)
  99. if e != nil || requestedCount == 0 {
  100. requestedCount = 1
  101. }
  102. writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
  103. if e != nil {
  104. writableVolumeCount = 0
  105. }
  106. option, err := ms.getVolumeGrowOption(r)
  107. if err != nil {
  108. writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
  109. return
  110. }
  111. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
  112. fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl)
  113. if shouldGrow && !vl.HasGrowRequest() {
  114. // if picked volume is almost full, trigger a volume-grow request
  115. glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
  116. if ms.Topo.AvailableSpaceFor(option) <= 0 {
  117. writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
  118. return
  119. }
  120. errCh := make(chan error, 1)
  121. vl.AddGrowRequest()
  122. ms.vgCh <- &topology.VolumeGrowRequest{
  123. Option: option,
  124. Count: writableVolumeCount,
  125. ErrCh: errCh,
  126. }
  127. if err := <-errCh; err != nil {
  128. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err))
  129. return
  130. }
  131. }
  132. if err == nil {
  133. ms.maybeAddJwtAuthorization(w, fid, true)
  134. dn := dnList.Head()
  135. writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
  136. } else {
  137. writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
  138. }
  139. }
  140. func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) {
  141. if fileId == "" {
  142. return
  143. }
  144. var encodedJwt security.EncodedJwt
  145. if isWrite {
  146. encodedJwt = security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
  147. } else {
  148. encodedJwt = security.GenJwtForVolumeServer(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId)
  149. }
  150. if encodedJwt == "" {
  151. return
  152. }
  153. w.Header().Set("Authorization", "BEARER "+string(encodedJwt))
  154. }