You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
10 KiB

10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. )
  15. const (
  16. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  17. )
  18. type MasterNodes struct {
  19. nodes []string
  20. lastNode int
  21. }
  22. func (mn *MasterNodes) String() string {
  23. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  24. }
  25. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  26. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  27. return
  28. }
  29. func (mn *MasterNodes) Reset() {
  30. glog.V(4).Infof("Resetting master nodes: %v", mn)
  31. if len(mn.nodes) > 1 && mn.lastNode >= 0 {
  32. glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
  33. mn.lastNode = -mn.lastNode - 1
  34. }
  35. }
  36. func (mn *MasterNodes) FindMaster() (string, error) {
  37. if len(mn.nodes) == 0 {
  38. return "", errors.New("No master node found!")
  39. }
  40. if mn.lastNode < 0 {
  41. for _, m := range mn.nodes {
  42. glog.V(4).Infof("Listing masters on %s", m)
  43. if masters, e := operation.ListMasters(m); e == nil {
  44. if len(masters) == 0 {
  45. continue
  46. }
  47. mn.nodes = append(masters, m)
  48. mn.lastNode = rand.Intn(len(mn.nodes))
  49. glog.V(2).Infof("current master nodes is %v", mn)
  50. break
  51. } else {
  52. glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
  53. }
  54. }
  55. }
  56. if mn.lastNode < 0 {
  57. return "", errors.New("No master node available!")
  58. }
  59. return mn.nodes[mn.lastNode], nil
  60. }
  61. /*
  62. * A VolumeServer contains one Store
  63. */
  64. type Store struct {
  65. Ip string
  66. Port int
  67. PublicUrl string
  68. Locations []*DiskLocation
  69. dataCenter string //optional informaton, overwriting master setting if exists
  70. rack string //optional information, overwriting master setting if exists
  71. connected bool
  72. volumeSizeLimit uint64 //read from the master
  73. masterNodes *MasterNodes
  74. }
  75. func (s *Store) String() (str string) {
  76. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  77. return
  78. }
  79. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  80. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  81. s.Locations = make([]*DiskLocation, 0)
  82. for i := 0; i < len(dirnames); i++ {
  83. location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
  84. location.loadExistingVolumes(needleMapKind)
  85. s.Locations = append(s.Locations, location)
  86. }
  87. return
  88. }
  89. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
  90. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  91. if e != nil {
  92. return e
  93. }
  94. ttl, e := ReadTTL(ttlString)
  95. if e != nil {
  96. return e
  97. }
  98. for _, range_string := range strings.Split(volumeListString, ",") {
  99. if strings.Index(range_string, "-") < 0 {
  100. id_string := range_string
  101. id, err := NewVolumeId(id_string)
  102. if err != nil {
  103. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  104. }
  105. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
  106. } else {
  107. pair := strings.Split(range_string, "-")
  108. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  109. if start_err != nil {
  110. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  111. }
  112. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  113. if end_err != nil {
  114. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  115. }
  116. for id := start; id <= end; id++ {
  117. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
  118. e = err
  119. }
  120. }
  121. }
  122. }
  123. return e
  124. }
  125. func (s *Store) DeleteCollection(collection string) (e error) {
  126. for _, location := range s.Locations {
  127. e = location.DeleteCollectionFromDiskLocation(collection)
  128. if e != nil {
  129. return
  130. }
  131. }
  132. return
  133. }
  134. func (s *Store) findVolume(vid VolumeId) *Volume {
  135. for _, location := range s.Locations {
  136. if v, found := location.FindVolume(vid); found {
  137. return v
  138. }
  139. }
  140. return nil
  141. }
  142. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  143. max := 0
  144. for _, location := range s.Locations {
  145. currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
  146. if currentFreeCount > max {
  147. max = currentFreeCount
  148. ret = location
  149. }
  150. }
  151. return ret
  152. }
  153. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  154. if s.findVolume(vid) != nil {
  155. return fmt.Errorf("Volume Id %d already exists!", vid)
  156. }
  157. if location := s.findFreeLocation(); location != nil {
  158. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  159. location.Directory, vid, collection, replicaPlacement, ttl)
  160. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
  161. location.SetVolume(vid, volume)
  162. return nil
  163. } else {
  164. return err
  165. }
  166. }
  167. return fmt.Errorf("No more free space left")
  168. }
  169. func (s *Store) Status() []*VolumeInfo {
  170. var stats []*VolumeInfo
  171. for _, location := range s.Locations {
  172. location.RLock()
  173. for k, v := range location.volumes {
  174. s := &VolumeInfo{
  175. Id: VolumeId(k),
  176. Size: v.ContentSize(),
  177. Collection: v.Collection,
  178. ReplicaPlacement: v.ReplicaPlacement,
  179. Version: v.Version(),
  180. FileCount: v.nm.FileCount(),
  181. DeleteCount: v.nm.DeletedCount(),
  182. DeletedByteCount: v.nm.DeletedSize(),
  183. ReadOnly: v.readOnly,
  184. Ttl: v.Ttl}
  185. stats = append(stats, s)
  186. }
  187. location.RUnlock()
  188. }
  189. sortVolumeInfos(stats)
  190. return stats
  191. }
  192. func (s *Store) SetDataCenter(dataCenter string) {
  193. s.dataCenter = dataCenter
  194. }
  195. func (s *Store) SetRack(rack string) {
  196. s.rack = rack
  197. }
  198. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  199. s.masterNodes = NewMasterNodes(bootstrapMaster)
  200. }
  201. func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
  202. masterNode, e = s.masterNodes.FindMaster()
  203. if e != nil {
  204. return
  205. }
  206. var volumeMessages []*operation.VolumeInformationMessage
  207. maxVolumeCount := 0
  208. var maxFileKey uint64
  209. for _, location := range s.Locations {
  210. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  211. location.Lock()
  212. for k, v := range location.volumes {
  213. if maxFileKey < v.nm.MaxFileKey() {
  214. maxFileKey = v.nm.MaxFileKey()
  215. }
  216. if !v.expired(s.volumeSizeLimit) {
  217. volumeMessage := &operation.VolumeInformationMessage{
  218. Id: proto.Uint32(uint32(k)),
  219. Size: proto.Uint64(uint64(v.Size())),
  220. Collection: proto.String(v.Collection),
  221. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  222. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  223. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  224. ReadOnly: proto.Bool(v.readOnly),
  225. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  226. Version: proto.Uint32(uint32(v.Version())),
  227. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  228. }
  229. volumeMessages = append(volumeMessages, volumeMessage)
  230. } else {
  231. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  232. location.deleteVolumeById(v.Id)
  233. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  234. } else {
  235. glog.V(0).Infoln("volume", v.Id, "is expired.")
  236. }
  237. }
  238. }
  239. location.Unlock()
  240. }
  241. joinMessage := &operation.JoinMessage{
  242. IsInit: proto.Bool(!s.connected),
  243. Ip: proto.String(s.Ip),
  244. Port: proto.Uint32(uint32(s.Port)),
  245. PublicUrl: proto.String(s.PublicUrl),
  246. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  247. MaxFileKey: proto.Uint64(maxFileKey),
  248. DataCenter: proto.String(s.dataCenter),
  249. Rack: proto.String(s.rack),
  250. Volumes: volumeMessages,
  251. }
  252. data, err := proto.Marshal(joinMessage)
  253. if err != nil {
  254. return "", "", err
  255. }
  256. joinUrl := "http://" + masterNode + "/dir/join"
  257. glog.V(4).Infof("Connecting to %s ...", joinUrl)
  258. jsonBlob, err := util.PostBytes(joinUrl, data)
  259. if err != nil {
  260. s.masterNodes.Reset()
  261. return "", "", err
  262. }
  263. var ret operation.JoinResult
  264. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  265. glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
  266. s.masterNodes.Reset()
  267. return masterNode, "", err
  268. }
  269. if ret.Error != "" {
  270. s.masterNodes.Reset()
  271. return masterNode, "", errors.New(ret.Error)
  272. }
  273. s.volumeSizeLimit = ret.VolumeSizeLimit
  274. secretKey = security.Secret(ret.SecretKey)
  275. s.connected = true
  276. return
  277. }
  278. func (s *Store) Close() {
  279. for _, location := range s.Locations {
  280. location.Close()
  281. }
  282. }
  283. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  284. if v := s.findVolume(i); v != nil {
  285. if v.readOnly {
  286. err = fmt.Errorf("Volume %d is read only", i)
  287. return
  288. }
  289. // TODO: count needle size ahead
  290. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  291. size, err = v.writeNeedle(n)
  292. } else {
  293. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  294. }
  295. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  296. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  297. if _, _, e := s.SendHeartbeatToMaster(); e != nil {
  298. glog.V(0).Infoln("error when reporting size:", e)
  299. }
  300. }
  301. return
  302. }
  303. glog.V(0).Infoln("volume", i, "not found!")
  304. err = fmt.Errorf("Volume %d not found!", i)
  305. return
  306. }
  307. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  308. if v := s.findVolume(i); v != nil && !v.readOnly {
  309. return v.deleteNeedle(n)
  310. }
  311. return 0, nil
  312. }
  313. func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
  314. if v := s.findVolume(i); v != nil {
  315. return v.readNeedle(n)
  316. }
  317. return 0, fmt.Errorf("Volume %v not found!", i)
  318. }
  319. func (s *Store) GetVolume(i VolumeId) *Volume {
  320. return s.findVolume(i)
  321. }
  322. func (s *Store) HasVolume(i VolumeId) bool {
  323. v := s.findVolume(i)
  324. return v != nil
  325. }