You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
3.4 KiB

  1. package storage
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. )
  9. type DiskLocation struct {
  10. Directory string
  11. MaxVolumeCount int
  12. volumes map[VolumeId]*Volume
  13. sync.RWMutex
  14. }
  15. func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
  16. location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
  17. location.volumes = make(map[VolumeId]*Volume)
  18. return location
  19. }
  20. func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
  21. name := dir.Name()
  22. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  23. collection := ""
  24. base := name[:len(name)-len(".dat")]
  25. i := strings.LastIndex(base, "_")
  26. if i > 0 {
  27. collection, base = base[0:i], base[i+1:]
  28. }
  29. if vid, err := NewVolumeId(base); err == nil {
  30. mutex.RLock()
  31. _, found := l.volumes[vid]
  32. mutex.RUnlock()
  33. if !found {
  34. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
  35. mutex.Lock()
  36. l.volumes[vid] = v
  37. mutex.Unlock()
  38. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
  39. l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  40. if v.Size() != v.dataFileSize {
  41. glog.V(0).Infof("data file %s, size=%d expected=%d",
  42. l.Directory+"/"+name, v.Size(), v.dataFileSize)
  43. }
  44. } else {
  45. glog.V(0).Infof("new volume %s error %s", name, e)
  46. }
  47. }
  48. }
  49. }
  50. }
  51. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) {
  52. var concurrency int
  53. if concurrentFlag {
  54. //You could choose a better optimized concurency value after testing at your environment
  55. concurrency = 10
  56. } else {
  57. concurrency = 1
  58. }
  59. task_queue := make(chan os.FileInfo, 10*concurrency)
  60. go func() {
  61. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  62. for _, dir := range dirs {
  63. task_queue <- dir
  64. }
  65. }
  66. close(task_queue)
  67. }()
  68. var wg sync.WaitGroup
  69. var mutex sync.RWMutex
  70. for workerNum := 0; workerNum < concurrency; workerNum++ {
  71. wg.Add(1)
  72. go func() {
  73. defer wg.Done()
  74. for dir := range task_queue {
  75. l.loadExistingVolume(dir, needleMapKind, &mutex)
  76. }
  77. }()
  78. }
  79. wg.Wait()
  80. }
  81. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  82. l.Lock()
  83. defer l.Unlock()
  84. l.concurrentLoadingVolumes(needleMapKind, true)
  85. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  86. }
  87. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  88. l.Lock()
  89. defer l.Unlock()
  90. for k, v := range l.volumes {
  91. if v.Collection == collection {
  92. e = l.deleteVolumeById(k)
  93. if e != nil {
  94. return
  95. }
  96. }
  97. }
  98. return
  99. }
  100. func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
  101. v, ok := l.volumes[vid]
  102. if !ok {
  103. return
  104. }
  105. e = v.Destroy()
  106. if e != nil {
  107. return
  108. }
  109. delete(l.volumes, vid)
  110. return
  111. }
  112. func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
  113. l.Lock()
  114. defer l.Unlock()
  115. l.volumes[vid] = volume
  116. }
  117. func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
  118. l.RLock()
  119. defer l.RUnlock()
  120. v, ok := l.volumes[vid]
  121. return v, ok
  122. }
  123. func (l *DiskLocation) VolumesLen() int {
  124. l.RLock()
  125. defer l.RUnlock()
  126. return len(l.volumes)
  127. }
  128. func (l *DiskLocation) Close() {
  129. l.Lock()
  130. defer l.Unlock()
  131. for _, v := range l.volumes {
  132. v.Close()
  133. }
  134. return
  135. }