248 lines
6.7 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  14. resp := &volume_server_pb.DeleteCollectionResponse{}
  15. err := vs.store.DeleteCollection(req.Collection)
  16. if err != nil {
  17. glog.Errorf("delete collection %s: %v", req.Collection, err)
  18. } else {
  19. glog.V(2).Infof("delete collection %v", req)
  20. }
  21. return resp, err
  22. }
  23. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  24. resp := &volume_server_pb.AllocateVolumeResponse{}
  25. err := vs.store.AddVolume(
  26. needle.VolumeId(req.VolumeId),
  27. req.Collection,
  28. vs.needleMapKind,
  29. req.Replication,
  30. req.Ttl,
  31. req.Preallocate,
  32. req.MemoryMapMaxSizeMb,
  33. )
  34. if err != nil {
  35. glog.Errorf("assign volume %v: %v", req, err)
  36. } else {
  37. glog.V(2).Infof("assign volume %v", req)
  38. }
  39. return resp, err
  40. }
  41. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  42. resp := &volume_server_pb.VolumeMountResponse{}
  43. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  44. if err != nil {
  45. glog.Errorf("volume mount %v: %v", req, err)
  46. } else {
  47. glog.V(2).Infof("volume mount %v", req)
  48. }
  49. return resp, err
  50. }
  51. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  52. resp := &volume_server_pb.VolumeUnmountResponse{}
  53. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  54. if err != nil {
  55. glog.Errorf("volume unmount %v: %v", req, err)
  56. } else {
  57. glog.V(2).Infof("volume unmount %v", req)
  58. }
  59. return resp, err
  60. }
  61. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  62. resp := &volume_server_pb.VolumeDeleteResponse{}
  63. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  64. if err != nil {
  65. glog.Errorf("volume delete %v: %v", req, err)
  66. } else {
  67. glog.V(2).Infof("volume delete %v", req)
  68. }
  69. return resp, err
  70. }
  71. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  72. resp := &volume_server_pb.VolumeConfigureResponse{}
  73. // check replication format
  74. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  75. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  76. return resp, nil
  77. }
  78. // unmount
  79. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  80. glog.Errorf("volume configure unmount %v: %v", req, err)
  81. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  82. return resp, nil
  83. }
  84. // modify the volume info file
  85. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  86. glog.Errorf("volume configure %v: %v", req, err)
  87. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  88. return resp, nil
  89. }
  90. // mount
  91. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  92. glog.Errorf("volume configure mount %v: %v", req, err)
  93. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  94. return resp, nil
  95. }
  96. return resp, nil
  97. }
  98. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  99. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  100. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
  101. if err != nil {
  102. glog.Errorf("volume mark readonly %v: %v", req, err)
  103. } else {
  104. glog.V(2).Infof("volume mark readonly %v", req)
  105. }
  106. return resp, err
  107. }
  108. func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
  109. resp := &volume_server_pb.VolumeMarkWritableResponse{}
  110. err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
  111. if err != nil {
  112. glog.Errorf("volume mark writable %v: %v", req, err)
  113. } else {
  114. glog.V(2).Infof("volume mark writable %v", req)
  115. }
  116. return resp, err
  117. }
  118. func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
  119. resp := &volume_server_pb.VolumeStatusResponse{}
  120. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  121. if v == nil {
  122. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  123. }
  124. resp.IsReadOnly = v.IsReadOnly()
  125. return resp, nil
  126. }
  127. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
  128. resp := &volume_server_pb.VolumeServerStatusResponse{}
  129. for _, loc := range vs.store.Locations {
  130. if dir, e := filepath.Abs(loc.Directory); e == nil {
  131. resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
  132. }
  133. }
  134. resp.MemoryStatus = stats.MemStat()
  135. return resp, nil
  136. }
  137. func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
  138. resp := &volume_server_pb.VolumeServerLeaveResponse{}
  139. vs.StopHeartbeat()
  140. return resp, nil
  141. }
  142. func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
  143. resp := &volume_server_pb.VolumeNeedleStatusResponse{}
  144. volumeId := needle.VolumeId(req.VolumeId)
  145. n := &needle.Needle{
  146. Id: types.NeedleId(req.NeedleId),
  147. }
  148. var count int
  149. var err error
  150. hasVolume := vs.store.HasVolume(volumeId)
  151. if !hasVolume {
  152. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  153. if !hasEcVolume {
  154. return nil, fmt.Errorf("volume not found %d", req.VolumeId)
  155. }
  156. count, err = vs.store.ReadEcShardNeedle(volumeId, n)
  157. } else {
  158. count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
  159. }
  160. if err != nil {
  161. return nil, err
  162. }
  163. if count < 0 {
  164. return nil, fmt.Errorf("needle not found %d", n.Id)
  165. }
  166. resp.NeedleId = uint64(n.Id)
  167. resp.Cookie = uint32(n.Cookie)
  168. resp.Size = uint32(n.Size)
  169. resp.LastModified = n.LastModified
  170. resp.Crc = n.Checksum.Value()
  171. if n.HasTtl() {
  172. resp.Ttl = n.Ttl.String()
  173. }
  174. return resp, nil
  175. }