You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
5.6 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  14. resp := &volume_server_pb.DeleteCollectionResponse{}
  15. err := vs.store.DeleteCollection(req.Collection)
  16. if err != nil {
  17. glog.Errorf("delete collection %s: %v", req.Collection, err)
  18. } else {
  19. glog.V(2).Infof("delete collection %v", req)
  20. }
  21. return resp, err
  22. }
  23. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  24. resp := &volume_server_pb.AllocateVolumeResponse{}
  25. err := vs.store.AddVolume(
  26. needle.VolumeId(req.VolumeId),
  27. req.Collection,
  28. vs.needleMapKind,
  29. req.Replication,
  30. req.Ttl,
  31. req.Preallocate,
  32. req.MemoryMapMaxSizeMb,
  33. )
  34. if err != nil {
  35. glog.Errorf("assign volume %v: %v", req, err)
  36. } else {
  37. glog.V(2).Infof("assign volume %v", req)
  38. }
  39. return resp, err
  40. }
  41. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  42. resp := &volume_server_pb.VolumeMountResponse{}
  43. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  44. if err != nil {
  45. glog.Errorf("volume mount %v: %v", req, err)
  46. } else {
  47. glog.V(2).Infof("volume mount %v", req)
  48. }
  49. return resp, err
  50. }
  51. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  52. resp := &volume_server_pb.VolumeUnmountResponse{}
  53. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  54. if err != nil {
  55. glog.Errorf("volume unmount %v: %v", req, err)
  56. } else {
  57. glog.V(2).Infof("volume unmount %v", req)
  58. }
  59. return resp, err
  60. }
  61. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  62. resp := &volume_server_pb.VolumeDeleteResponse{}
  63. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  64. if err != nil {
  65. glog.Errorf("volume delete %v: %v", req, err)
  66. } else {
  67. glog.V(2).Infof("volume delete %v", req)
  68. }
  69. return resp, err
  70. }
  71. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  72. resp := &volume_server_pb.VolumeConfigureResponse{}
  73. // check replication format
  74. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  75. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  76. return resp, nil
  77. }
  78. // unmount
  79. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  80. glog.Errorf("volume configure unmount %v: %v", req, err)
  81. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  82. return resp, nil
  83. }
  84. // modify the volume info file
  85. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  86. glog.Errorf("volume configure %v: %v", req, err)
  87. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  88. return resp, nil
  89. }
  90. // mount
  91. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  92. glog.Errorf("volume configure mount %v: %v", req, err)
  93. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  94. return resp, nil
  95. }
  96. return resp, nil
  97. }
  98. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  99. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  100. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
  101. if err != nil {
  102. glog.Errorf("volume mark readonly %v: %v", req, err)
  103. } else {
  104. glog.V(2).Infof("volume mark readonly %v", req)
  105. }
  106. return resp, err
  107. }
  108. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
  109. resp := &volume_server_pb.VolumeServerStatusResponse{}
  110. for _, loc := range vs.store.Locations {
  111. if dir, e := filepath.Abs(loc.Directory); e == nil {
  112. resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
  113. }
  114. }
  115. resp.MemoryStatus = stats.MemStat()
  116. return resp, nil
  117. }
  118. func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
  119. resp := &volume_server_pb.VolumeNeedleStatusResponse{}
  120. volumeId := needle.VolumeId(req.VolumeId)
  121. n := &needle.Needle{
  122. Id: types.NeedleId(req.NeedleId),
  123. }
  124. var count int
  125. var err error
  126. hasVolume := vs.store.HasVolume(volumeId)
  127. if !hasVolume {
  128. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  129. if !hasEcVolume {
  130. return nil, fmt.Errorf("volume not found %d", req.VolumeId)
  131. }
  132. count, err = vs.store.ReadEcShardNeedle(volumeId, n)
  133. } else {
  134. count, err = vs.store.ReadVolumeNeedle(volumeId, n)
  135. }
  136. if err != nil {
  137. return nil, err
  138. }
  139. if count < 0 {
  140. return nil, fmt.Errorf("needle not found %d", n.Id)
  141. }
  142. resp.NeedleId = uint64(n.Id)
  143. resp.Cookie = uint32(n.Cookie)
  144. resp.Size = uint32(n.Size)
  145. resp.LastModified = n.LastModified
  146. resp.Crc = n.Checksum.Value()
  147. if n.HasTtl() {
  148. resp.Ttl = n.Ttl.String()
  149. }
  150. return resp, nil
  151. }