You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

199 lines
4.2 KiB

  1. package storage
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "sync"
  7. "fmt"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. )
  10. type DiskLocation struct {
  11. Directory string
  12. MaxVolumeCount int
  13. volumes map[VolumeId]*Volume
  14. sync.RWMutex
  15. }
  16. func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
  17. location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
  18. location.volumes = make(map[VolumeId]*Volume)
  19. return location
  20. }
  21. func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) {
  22. name := dir.Name()
  23. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  24. collection := ""
  25. base := name[:len(name)-len(".dat")]
  26. i := strings.LastIndex(base, "_")
  27. if i > 0 {
  28. collection, base = base[0:i], base[i+1:]
  29. }
  30. vol, err := NewVolumeId(base)
  31. return vol, collection, err
  32. }
  33. return 0, "", fmt.Errorf("Path is not a volume: %s", name)
  34. }
  35. func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
  36. name := dir.Name()
  37. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  38. vid, collection, err := l.volumeIdFromPath(dir)
  39. if err == nil {
  40. mutex.RLock()
  41. _, found := l.volumes[vid]
  42. mutex.RUnlock()
  43. if !found {
  44. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
  45. mutex.Lock()
  46. l.volumes[vid] = v
  47. mutex.Unlock()
  48. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
  49. l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  50. } else {
  51. glog.V(0).Infof("new volume %s error %s", name, e)
  52. }
  53. }
  54. }
  55. }
  56. }
  57. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
  58. task_queue := make(chan os.FileInfo, 10*concurrency)
  59. go func() {
  60. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  61. for _, dir := range dirs {
  62. task_queue <- dir
  63. }
  64. }
  65. close(task_queue)
  66. }()
  67. var wg sync.WaitGroup
  68. var mutex sync.RWMutex
  69. for workerNum := 0; workerNum < concurrency; workerNum++ {
  70. wg.Add(1)
  71. go func() {
  72. defer wg.Done()
  73. for dir := range task_queue {
  74. l.loadExistingVolume(dir, needleMapKind, &mutex)
  75. }
  76. }()
  77. }
  78. wg.Wait()
  79. }
  80. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  81. l.Lock()
  82. defer l.Unlock()
  83. l.concurrentLoadingVolumes(needleMapKind, 10)
  84. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  85. }
  86. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  87. l.Lock()
  88. defer l.Unlock()
  89. for k, v := range l.volumes {
  90. if v.Collection == collection {
  91. e = l.deleteVolumeById(k)
  92. if e != nil {
  93. return
  94. }
  95. }
  96. }
  97. return
  98. }
  99. func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
  100. v, ok := l.volumes[vid]
  101. if !ok {
  102. return
  103. }
  104. e = v.Destroy()
  105. if e != nil {
  106. return
  107. }
  108. delete(l.volumes, vid)
  109. return
  110. }
  111. func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool {
  112. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  113. for _, dir := range dirs {
  114. volId, _, err := l.volumeIdFromPath(dir)
  115. if vid == volId && err == nil {
  116. var mutex sync.RWMutex
  117. l.loadExistingVolume(dir, needleMapKind, &mutex)
  118. return true
  119. }
  120. }
  121. }
  122. return false
  123. }
  124. func (l *DiskLocation) DeleteVolume(vid VolumeId) error {
  125. l.Lock()
  126. defer l.Unlock()
  127. _, ok := l.volumes[vid]
  128. if !ok {
  129. return fmt.Errorf("Volume not found, VolumeId: %d", vid)
  130. }
  131. return l.deleteVolumeById(vid)
  132. }
  133. func (l *DiskLocation) UnloadVolume(vid VolumeId) error {
  134. l.Lock()
  135. defer l.Unlock()
  136. v, ok := l.volumes[vid]
  137. if !ok {
  138. return fmt.Errorf("Volume not loaded, VolumeId: %d", vid)
  139. }
  140. v.Close()
  141. delete(l.volumes, vid)
  142. return nil
  143. }
  144. func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
  145. l.Lock()
  146. defer l.Unlock()
  147. l.volumes[vid] = volume
  148. }
  149. func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
  150. l.RLock()
  151. defer l.RUnlock()
  152. v, ok := l.volumes[vid]
  153. return v, ok
  154. }
  155. func (l *DiskLocation) VolumesLen() int {
  156. l.RLock()
  157. defer l.RUnlock()
  158. return len(l.volumes)
  159. }
  160. func (l *DiskLocation) Close() {
  161. l.Lock()
  162. defer l.Unlock()
  163. for _, v := range l.volumes {
  164. v.Close()
  165. }
  166. return
  167. }