249 lines
6.7 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  14. resp := &volume_server_pb.DeleteCollectionResponse{}
  15. err := vs.store.DeleteCollection(req.Collection)
  16. if err != nil {
  17. glog.Errorf("delete collection %s: %v", req.Collection, err)
  18. } else {
  19. glog.V(2).Infof("delete collection %v", req)
  20. }
  21. return resp, err
  22. }
  23. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  24. resp := &volume_server_pb.AllocateVolumeResponse{}
  25. err := vs.store.AddVolume(
  26. needle.VolumeId(req.VolumeId),
  27. req.Collection,
  28. vs.needleMapKind,
  29. req.Replication,
  30. req.Ttl,
  31. req.Preallocate,
  32. req.MemoryMapMaxSizeMb,
  33. types.ToDiskType(req.DiskType),
  34. )
  35. if err != nil {
  36. glog.Errorf("assign volume %v: %v", req, err)
  37. } else {
  38. glog.V(2).Infof("assign volume %v", req)
  39. }
  40. return resp, err
  41. }
  42. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  43. resp := &volume_server_pb.VolumeMountResponse{}
  44. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  45. if err != nil {
  46. glog.Errorf("volume mount %v: %v", req, err)
  47. } else {
  48. glog.V(2).Infof("volume mount %v", req)
  49. }
  50. return resp, err
  51. }
  52. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  53. resp := &volume_server_pb.VolumeUnmountResponse{}
  54. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  55. if err != nil {
  56. glog.Errorf("volume unmount %v: %v", req, err)
  57. } else {
  58. glog.V(2).Infof("volume unmount %v", req)
  59. }
  60. return resp, err
  61. }
  62. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  63. resp := &volume_server_pb.VolumeDeleteResponse{}
  64. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  65. if err != nil {
  66. glog.Errorf("volume delete %v: %v", req, err)
  67. } else {
  68. glog.V(2).Infof("volume delete %v", req)
  69. }
  70. return resp, err
  71. }
  72. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  73. resp := &volume_server_pb.VolumeConfigureResponse{}
  74. // check replication format
  75. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  76. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  77. return resp, nil
  78. }
  79. // unmount
  80. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  81. glog.Errorf("volume configure unmount %v: %v", req, err)
  82. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  83. return resp, nil
  84. }
  85. // modify the volume info file
  86. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  87. glog.Errorf("volume configure %v: %v", req, err)
  88. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  89. return resp, nil
  90. }
  91. // mount
  92. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  93. glog.Errorf("volume configure mount %v: %v", req, err)
  94. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  95. return resp, nil
  96. }
  97. return resp, nil
  98. }
  99. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  100. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  101. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
  102. if err != nil {
  103. glog.Errorf("volume mark readonly %v: %v", req, err)
  104. } else {
  105. glog.V(2).Infof("volume mark readonly %v", req)
  106. }
  107. return resp, err
  108. }
  109. func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
  110. resp := &volume_server_pb.VolumeMarkWritableResponse{}
  111. err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
  112. if err != nil {
  113. glog.Errorf("volume mark writable %v: %v", req, err)
  114. } else {
  115. glog.V(2).Infof("volume mark writable %v", req)
  116. }
  117. return resp, err
  118. }
  119. func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
  120. resp := &volume_server_pb.VolumeStatusResponse{}
  121. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  122. if v == nil {
  123. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  124. }
  125. resp.IsReadOnly = v.IsReadOnly()
  126. return resp, nil
  127. }
  128. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
  129. resp := &volume_server_pb.VolumeServerStatusResponse{}
  130. for _, loc := range vs.store.Locations {
  131. if dir, e := filepath.Abs(loc.Directory); e == nil {
  132. resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
  133. }
  134. }
  135. resp.MemoryStatus = stats.MemStat()
  136. return resp, nil
  137. }
  138. func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
  139. resp := &volume_server_pb.VolumeServerLeaveResponse{}
  140. vs.StopHeartbeat()
  141. return resp, nil
  142. }
  143. func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
  144. resp := &volume_server_pb.VolumeNeedleStatusResponse{}
  145. volumeId := needle.VolumeId(req.VolumeId)
  146. n := &needle.Needle{
  147. Id: types.NeedleId(req.NeedleId),
  148. }
  149. var count int
  150. var err error
  151. hasVolume := vs.store.HasVolume(volumeId)
  152. if !hasVolume {
  153. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  154. if !hasEcVolume {
  155. return nil, fmt.Errorf("volume not found %d", req.VolumeId)
  156. }
  157. count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
  158. } else {
  159. count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
  160. }
  161. if err != nil {
  162. return nil, err
  163. }
  164. if count < 0 {
  165. return nil, fmt.Errorf("needle not found %d", n.Id)
  166. }
  167. resp.NeedleId = uint64(n.Id)
  168. resp.Cookie = uint32(n.Cookie)
  169. resp.Size = uint32(n.Size)
  170. resp.LastModified = n.LastModified
  171. resp.Crc = n.Checksum.Value()
  172. if n.HasTtl() {
  173. resp.Ttl = n.Ttl.String()
  174. }
  175. return resp, nil
  176. }