You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
10 KiB

6 years ago
6 years ago
6 years ago
6 years ago
3 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/storage"
  8. "github.com/seaweedfs/seaweedfs/weed/cluster"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/stats"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. )
  20. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  21. resp := &volume_server_pb.DeleteCollectionResponse{}
  22. err := vs.store.DeleteCollection(req.Collection)
  23. if err != nil {
  24. glog.Errorf("delete collection %s: %v", req.Collection, err)
  25. } else {
  26. glog.V(2).Infof("delete collection %v", req)
  27. }
  28. return resp, err
  29. }
  30. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  31. resp := &volume_server_pb.AllocateVolumeResponse{}
  32. err := vs.store.AddVolume(
  33. needle.VolumeId(req.VolumeId),
  34. req.Collection,
  35. vs.needleMapKind,
  36. req.Replication,
  37. req.Ttl,
  38. req.Preallocate,
  39. req.MemoryMapMaxSizeMb,
  40. types.ToDiskType(req.DiskType),
  41. vs.ldbTimout,
  42. )
  43. if err != nil {
  44. glog.Errorf("assign volume %v: %v", req, err)
  45. } else {
  46. glog.V(2).Infof("assign volume %v", req)
  47. }
  48. return resp, err
  49. }
  50. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  51. resp := &volume_server_pb.VolumeMountResponse{}
  52. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  53. if err != nil {
  54. glog.Errorf("volume mount %v: %v", req, err)
  55. } else {
  56. glog.V(2).Infof("volume mount %v", req)
  57. }
  58. return resp, err
  59. }
  60. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  61. resp := &volume_server_pb.VolumeUnmountResponse{}
  62. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  63. if err != nil {
  64. glog.Errorf("volume unmount %v: %v", req, err)
  65. } else {
  66. glog.V(2).Infof("volume unmount %v", req)
  67. }
  68. return resp, err
  69. }
  70. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  71. resp := &volume_server_pb.VolumeDeleteResponse{}
  72. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  73. if err != nil {
  74. glog.Errorf("volume delete %v: %v", req, err)
  75. } else {
  76. glog.V(2).Infof("volume delete %v", req)
  77. }
  78. return resp, err
  79. }
  80. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  81. resp := &volume_server_pb.VolumeConfigureResponse{}
  82. // check replication format
  83. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  84. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  85. return resp, nil
  86. }
  87. // unmount
  88. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  89. glog.Errorf("volume configure unmount %v: %v", req, err)
  90. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  91. return resp, nil
  92. }
  93. // modify the volume info file
  94. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  95. glog.Errorf("volume configure %v: %v", req, err)
  96. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  97. return resp, nil
  98. }
  99. // mount
  100. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  101. glog.Errorf("volume configure mount %v: %v", req, err)
  102. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  103. return resp, nil
  104. }
  105. return resp, nil
  106. }
  107. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  108. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  109. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  110. if v == nil {
  111. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  112. }
  113. // step 1: stop master from redirecting traffic here
  114. if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
  115. return resp, err
  116. }
  117. // rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
  118. // step 2: mark local volume as readonly
  119. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
  120. if err != nil {
  121. glog.Errorf("volume mark readonly %v: %v", req, err)
  122. } else {
  123. glog.V(2).Infof("volume mark readonly %v", req)
  124. }
  125. // step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
  126. if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
  127. return resp, err
  128. }
  129. return resp, err
  130. }
  131. func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
  132. if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  133. _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
  134. Ip: vs.store.Ip,
  135. Port: uint32(vs.store.Port),
  136. VolumeId: uint32(v.Id),
  137. Collection: v.Collection,
  138. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  139. Ttl: v.Ttl.ToUint32(),
  140. DiskType: string(v.DiskType()),
  141. IsReadonly: isReadOnly,
  142. })
  143. if err != nil {
  144. return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err)
  145. }
  146. return nil
  147. }); grpcErr != nil {
  148. glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
  149. return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
  150. }
  151. return nil
  152. }
  153. func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
  154. resp := &volume_server_pb.VolumeMarkWritableResponse{}
  155. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  156. if v == nil {
  157. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  158. }
  159. err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
  160. if err != nil {
  161. glog.Errorf("volume mark writable %v: %v", req, err)
  162. } else {
  163. glog.V(2).Infof("volume mark writable %v", req)
  164. }
  165. // enable master to redirect traffic here
  166. if err := vs.notifyMasterVolumeReadonly(v, false); err != nil {
  167. return resp, err
  168. }
  169. return resp, err
  170. }
  171. func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
  172. resp := &volume_server_pb.VolumeStatusResponse{}
  173. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  174. if v == nil {
  175. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  176. }
  177. volumeSize, _, _ := v.DataBackend.GetStat()
  178. resp.IsReadOnly = v.IsReadOnly()
  179. resp.VolumeSize = uint64(volumeSize)
  180. return resp, nil
  181. }
  182. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
  183. resp := &volume_server_pb.VolumeServerStatusResponse{
  184. MemoryStatus: stats.MemStat(),
  185. Version: util.Version(),
  186. DataCenter: vs.dataCenter,
  187. Rack: vs.rack,
  188. }
  189. for _, loc := range vs.store.Locations {
  190. if dir, e := filepath.Abs(loc.Directory); e == nil {
  191. resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
  192. }
  193. }
  194. return resp, nil
  195. }
  196. func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
  197. resp := &volume_server_pb.VolumeServerLeaveResponse{}
  198. vs.StopHeartbeat()
  199. return resp, nil
  200. }
  201. func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
  202. resp := &volume_server_pb.VolumeNeedleStatusResponse{}
  203. volumeId := needle.VolumeId(req.VolumeId)
  204. n := &needle.Needle{
  205. Id: types.NeedleId(req.NeedleId),
  206. }
  207. var count int
  208. var err error
  209. hasVolume := vs.store.HasVolume(volumeId)
  210. if !hasVolume {
  211. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  212. if !hasEcVolume {
  213. return nil, fmt.Errorf("volume not found %d", req.VolumeId)
  214. }
  215. count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
  216. } else {
  217. count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
  218. }
  219. if err != nil {
  220. return nil, err
  221. }
  222. if count < 0 {
  223. return nil, fmt.Errorf("needle not found %d", n.Id)
  224. }
  225. resp.NeedleId = uint64(n.Id)
  226. resp.Cookie = uint32(n.Cookie)
  227. resp.Size = uint32(n.Size)
  228. resp.LastModified = n.LastModified
  229. resp.Crc = n.Checksum.Value()
  230. if n.HasTtl() {
  231. resp.Ttl = n.Ttl.String()
  232. }
  233. return resp, nil
  234. }
  235. func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
  236. resp = &volume_server_pb.PingResponse{
  237. StartTimeNs: time.Now().UnixNano(),
  238. }
  239. if req.TargetType == cluster.FilerType {
  240. pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  241. pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
  242. if pingResp != nil {
  243. resp.RemoteTimeNs = pingResp.StartTimeNs
  244. }
  245. return err
  246. })
  247. }
  248. if req.TargetType == cluster.VolumeServerType {
  249. pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  250. pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
  251. if pingResp != nil {
  252. resp.RemoteTimeNs = pingResp.StartTimeNs
  253. }
  254. return err
  255. })
  256. }
  257. if req.TargetType == cluster.MasterType {
  258. pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  259. pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
  260. if pingResp != nil {
  261. resp.RemoteTimeNs = pingResp.StartTimeNs
  262. }
  263. return err
  264. })
  265. }
  266. if pingErr != nil {
  267. pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
  268. }
  269. resp.StopTimeNs = time.Now().UnixNano()
  270. return
  271. }