You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
3.2 KiB

  1. package storage
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. )
  9. type DiskLocation struct {
  10. Directory string
  11. MaxVolumeCount int
  12. volumes map[VolumeId]*Volume
  13. sync.RWMutex
  14. }
  15. func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
  16. location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
  17. location.volumes = make(map[VolumeId]*Volume)
  18. return location
  19. }
  20. func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
  21. name := dir.Name()
  22. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  23. collection := ""
  24. base := name[:len(name)-len(".dat")]
  25. i := strings.LastIndex(base, "_")
  26. if i > 0 {
  27. collection, base = base[0:i], base[i+1:]
  28. }
  29. if vid, err := NewVolumeId(base); err == nil {
  30. mutex.RLock()
  31. _, found := l.volumes[vid]
  32. mutex.RUnlock()
  33. if !found {
  34. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
  35. mutex.Lock()
  36. l.volumes[vid] = v
  37. mutex.Unlock()
  38. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  39. } else {
  40. glog.V(0).Infof("new volume %s error %s", name, e)
  41. }
  42. }
  43. }
  44. }
  45. }
  46. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) {
  47. var concurrency int
  48. if concurrentFlag {
  49. //You could choose a better optimized concurency value after testing at your environment
  50. concurrency = 10
  51. } else {
  52. concurrency = 1
  53. }
  54. task_queue := make(chan os.FileInfo, 10*concurrency)
  55. go func() {
  56. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  57. for _, dir := range dirs {
  58. task_queue <- dir
  59. }
  60. }
  61. close(task_queue)
  62. }()
  63. var wg sync.WaitGroup
  64. var mutex sync.RWMutex
  65. for workerNum := 0; workerNum < concurrency; workerNum++ {
  66. wg.Add(1)
  67. go func() {
  68. defer wg.Done()
  69. for dir := range task_queue {
  70. l.loadExistingVolume(dir, needleMapKind, &mutex)
  71. }
  72. }()
  73. }
  74. wg.Wait()
  75. }
  76. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  77. l.Lock()
  78. defer l.Unlock()
  79. l.concurrentLoadingVolumes(needleMapKind, true)
  80. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  81. }
  82. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  83. l.Lock()
  84. defer l.Unlock()
  85. for k, v := range l.volumes {
  86. if v.Collection == collection {
  87. e = l.deleteVolumeById(k)
  88. if e != nil {
  89. return
  90. }
  91. }
  92. }
  93. return
  94. }
  95. func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
  96. v, ok := l.volumes[vid]
  97. if !ok {
  98. return
  99. }
  100. e = v.Destroy()
  101. if e != nil {
  102. return
  103. }
  104. delete(l.volumes, vid)
  105. return
  106. }
  107. func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
  108. l.Lock()
  109. defer l.Unlock()
  110. l.volumes[vid] = volume
  111. }
  112. func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
  113. l.RLock()
  114. defer l.RUnlock()
  115. v, ok := l.volumes[vid]
  116. return v, ok
  117. }
  118. func (l *DiskLocation) VolumesLen() int {
  119. l.RLock()
  120. defer l.RUnlock()
  121. return len(l.volumes)
  122. }
  123. func (l *DiskLocation) Close() {
  124. l.Lock()
  125. defer l.Unlock()
  126. for _, v := range l.volumes {
  127. v.Close()
  128. }
  129. return
  130. }