You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.4 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "sync"
  7. "fmt"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. )
  11. type DiskLocation struct {
  12. Directory string
  13. MaxVolumeCount int
  14. volumes map[needle.VolumeId]*Volume
  15. sync.RWMutex
  16. }
  17. func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
  18. location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
  19. location.volumes = make(map[needle.VolumeId]*Volume)
  20. return location
  21. }
  22. func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) {
  23. name := dir.Name()
  24. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  25. collection := ""
  26. base := name[:len(name)-len(".dat")]
  27. i := strings.LastIndex(base, "_")
  28. if i > 0 {
  29. collection, base = base[0:i], base[i+1:]
  30. }
  31. vol, err := needle.NewVolumeId(base)
  32. return vol, collection, err
  33. }
  34. return 0, "", fmt.Errorf("Path is not a volume: %s", name)
  35. }
  36. func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
  37. name := dir.Name()
  38. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  39. vid, collection, err := l.volumeIdFromPath(dir)
  40. if err == nil {
  41. mutex.RLock()
  42. _, found := l.volumes[vid]
  43. mutex.RUnlock()
  44. if !found {
  45. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
  46. mutex.Lock()
  47. l.volumes[vid] = v
  48. mutex.Unlock()
  49. size, _, _ := v.FileStat()
  50. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
  51. l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  52. // println("volume", vid, "last append at", v.lastAppendAtNs)
  53. } else {
  54. glog.V(0).Infof("new volume %s error %s", name, e)
  55. }
  56. }
  57. }
  58. }
  59. }
  60. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
  61. task_queue := make(chan os.FileInfo, 10*concurrency)
  62. go func() {
  63. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  64. for _, dir := range dirs {
  65. task_queue <- dir
  66. }
  67. }
  68. close(task_queue)
  69. }()
  70. var wg sync.WaitGroup
  71. var mutex sync.RWMutex
  72. for workerNum := 0; workerNum < concurrency; workerNum++ {
  73. wg.Add(1)
  74. go func() {
  75. defer wg.Done()
  76. for dir := range task_queue {
  77. l.loadExistingVolume(dir, needleMapKind, &mutex)
  78. }
  79. }()
  80. }
  81. wg.Wait()
  82. }
  83. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  84. l.Lock()
  85. defer l.Unlock()
  86. l.concurrentLoadingVolumes(needleMapKind, 10)
  87. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  88. }
  89. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  90. l.Lock()
  91. defer l.Unlock()
  92. for k, v := range l.volumes {
  93. if v.Collection == collection {
  94. e = l.deleteVolumeById(k)
  95. if e != nil {
  96. return
  97. }
  98. }
  99. }
  100. return
  101. }
  102. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
  103. v, ok := l.volumes[vid]
  104. if !ok {
  105. return
  106. }
  107. e = v.Destroy()
  108. if e != nil {
  109. return
  110. }
  111. delete(l.volumes, vid)
  112. return
  113. }
  114. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
  115. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  116. for _, dir := range dirs {
  117. volId, _, err := l.volumeIdFromPath(dir)
  118. if vid == volId && err == nil {
  119. var mutex sync.RWMutex
  120. l.loadExistingVolume(dir, needleMapKind, &mutex)
  121. return true
  122. }
  123. }
  124. }
  125. return false
  126. }
  127. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
  128. l.Lock()
  129. defer l.Unlock()
  130. _, ok := l.volumes[vid]
  131. if !ok {
  132. return fmt.Errorf("Volume not found, VolumeId: %d", vid)
  133. }
  134. return l.deleteVolumeById(vid)
  135. }
  136. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  137. l.Lock()
  138. defer l.Unlock()
  139. v, ok := l.volumes[vid]
  140. if !ok {
  141. return fmt.Errorf("Volume not loaded, VolumeId: %d", vid)
  142. }
  143. v.Close()
  144. delete(l.volumes, vid)
  145. return nil
  146. }
  147. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  148. l.Lock()
  149. defer l.Unlock()
  150. l.volumes[vid] = volume
  151. }
  152. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  153. l.RLock()
  154. defer l.RUnlock()
  155. v, ok := l.volumes[vid]
  156. return v, ok
  157. }
  158. func (l *DiskLocation) VolumesLen() int {
  159. l.RLock()
  160. defer l.RUnlock()
  161. return len(l.volumes)
  162. }
  163. func (l *DiskLocation) Close() {
  164. l.Lock()
  165. defer l.Unlock()
  166. for _, v := range l.volumes {
  167. v.Close()
  168. }
  169. return
  170. }