You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
9.1 KiB

5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/storage/types"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. type DiskLocation struct {
  18. Directory string
  19. IdxDirectory string
  20. DiskType types.DiskType
  21. MaxVolumeCount int
  22. OriginalMaxVolumeCount int
  23. MinFreeSpace util.MinFreeSpace
  24. volumes map[needle.VolumeId]*Volume
  25. volumesLock sync.RWMutex
  26. // erasure coding
  27. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  28. ecVolumesLock sync.RWMutex
  29. isDiskSpaceLow bool
  30. }
  31. func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  32. dir = util.ResolvePath(dir)
  33. if idxDir == "" {
  34. idxDir = dir
  35. } else {
  36. idxDir = util.ResolvePath(idxDir)
  37. }
  38. location := &DiskLocation{
  39. Directory: dir,
  40. IdxDirectory: idxDir,
  41. DiskType: diskType,
  42. MaxVolumeCount: maxVolumeCount,
  43. OriginalMaxVolumeCount: maxVolumeCount,
  44. MinFreeSpace: minFreeSpace,
  45. }
  46. location.volumes = make(map[needle.VolumeId]*Volume)
  47. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  48. go location.CheckDiskSpace()
  49. return location
  50. }
  51. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  52. if isValidVolume(filename) {
  53. base := filename[:len(filename)-4]
  54. collection, volumeId, err := parseCollectionVolumeId(base)
  55. return volumeId, collection, err
  56. }
  57. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  58. }
  59. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  60. i := strings.LastIndex(base, "_")
  61. if i > 0 {
  62. collection, base = base[0:i], base[i+1:]
  63. }
  64. vol, err := needle.NewVolumeId(base)
  65. return collection, vol, err
  66. }
  67. func isValidVolume(basename string) bool {
  68. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  69. }
  70. func getValidVolumeName(basename string) string {
  71. if isValidVolume(basename) {
  72. return basename[:len(basename)-4]
  73. }
  74. return ""
  75. }
  76. func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool {
  77. basename := fileInfo.Name()
  78. if fileInfo.IsDir() {
  79. return false
  80. }
  81. volumeName := getValidVolumeName(basename)
  82. if volumeName == "" {
  83. return false
  84. }
  85. // skip ec volumes
  86. if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
  87. return false
  88. }
  89. // check for incomplete volume
  90. noteFile := l.Directory + "/" + volumeName + ".note"
  91. if util.FileExists(noteFile) {
  92. note, _ := ioutil.ReadFile(noteFile)
  93. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  94. removeVolumeFiles(l.Directory + "/" + volumeName)
  95. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  96. return false
  97. }
  98. // parse out collection, volume id
  99. vid, collection, err := volumeIdFromFileName(basename)
  100. if err != nil {
  101. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  102. return false
  103. }
  104. // avoid loading one volume more than once
  105. l.volumesLock.RLock()
  106. _, found := l.volumes[vid]
  107. l.volumesLock.RUnlock()
  108. if found {
  109. glog.V(1).Infof("loaded volume, %v", vid)
  110. return true
  111. }
  112. // load the volume
  113. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
  114. if e != nil {
  115. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  116. return false
  117. }
  118. l.SetVolume(vid, v)
  119. size, _, _ := v.FileStat()
  120. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  121. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  122. return true
  123. }
  124. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
  125. task_queue := make(chan os.FileInfo, 10*concurrency)
  126. go func() {
  127. foundVolumeNames := make(map[string]bool)
  128. if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
  129. for _, fi := range fileInfos {
  130. volumeName := getValidVolumeName(fi.Name())
  131. if volumeName == "" {
  132. continue
  133. }
  134. if _, found := foundVolumeNames[volumeName]; !found {
  135. foundVolumeNames[volumeName] = true
  136. task_queue <- fi
  137. }
  138. }
  139. }
  140. close(task_queue)
  141. }()
  142. var wg sync.WaitGroup
  143. for workerNum := 0; workerNum < concurrency; workerNum++ {
  144. wg.Add(1)
  145. go func() {
  146. defer wg.Done()
  147. for fi := range task_queue {
  148. _ = l.loadExistingVolume(fi, needleMapKind)
  149. }
  150. }()
  151. }
  152. wg.Wait()
  153. }
  154. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
  155. l.concurrentLoadingVolumes(needleMapKind, 10)
  156. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  157. l.loadAllEcShards()
  158. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  159. }
  160. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  161. l.volumesLock.Lock()
  162. delVolsMap := l.unmountVolumeByCollection(collection)
  163. l.volumesLock.Unlock()
  164. l.ecVolumesLock.Lock()
  165. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  166. l.ecVolumesLock.Unlock()
  167. errChain := make(chan error, 2)
  168. var wg sync.WaitGroup
  169. wg.Add(2)
  170. go func() {
  171. for _, v := range delVolsMap {
  172. if err := v.Destroy(); err != nil {
  173. errChain <- err
  174. }
  175. }
  176. wg.Done()
  177. }()
  178. go func() {
  179. for _, v := range delEcVolsMap {
  180. v.Destroy()
  181. }
  182. wg.Done()
  183. }()
  184. go func() {
  185. wg.Wait()
  186. close(errChain)
  187. }()
  188. errBuilder := strings.Builder{}
  189. for err := range errChain {
  190. errBuilder.WriteString(err.Error())
  191. errBuilder.WriteString("; ")
  192. }
  193. if errBuilder.Len() > 0 {
  194. e = fmt.Errorf(errBuilder.String())
  195. }
  196. return
  197. }
  198. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
  199. v, ok := l.volumes[vid]
  200. if !ok {
  201. return
  202. }
  203. e = v.Destroy()
  204. if e != nil {
  205. return
  206. }
  207. found = true
  208. delete(l.volumes, vid)
  209. return
  210. }
  211. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  212. if fileInfo, found := l.LocateVolume(vid); found {
  213. return l.loadExistingVolume(fileInfo, needleMapKind)
  214. }
  215. return false
  216. }
  217. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  218. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
  219. l.volumesLock.Lock()
  220. defer l.volumesLock.Unlock()
  221. _, ok := l.volumes[vid]
  222. if !ok {
  223. return ErrVolumeNotFound
  224. }
  225. _, err := l.deleteVolumeById(vid)
  226. return err
  227. }
  228. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  229. l.volumesLock.Lock()
  230. defer l.volumesLock.Unlock()
  231. v, ok := l.volumes[vid]
  232. if !ok {
  233. return ErrVolumeNotFound
  234. }
  235. v.Close()
  236. delete(l.volumes, vid)
  237. return nil
  238. }
  239. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  240. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  241. for k, v := range l.volumes {
  242. if v.Collection == collectionName && !v.isCompacting {
  243. deltaVols[k] = v
  244. }
  245. }
  246. for k := range deltaVols {
  247. delete(l.volumes, k)
  248. }
  249. return deltaVols
  250. }
  251. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  252. l.volumesLock.Lock()
  253. defer l.volumesLock.Unlock()
  254. l.volumes[vid] = volume
  255. volume.location = l
  256. }
  257. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  258. l.volumesLock.RLock()
  259. defer l.volumesLock.RUnlock()
  260. v, ok := l.volumes[vid]
  261. return v, ok
  262. }
  263. func (l *DiskLocation) VolumesLen() int {
  264. l.volumesLock.RLock()
  265. defer l.volumesLock.RUnlock()
  266. return len(l.volumes)
  267. }
  268. func (l *DiskLocation) Close() {
  269. l.volumesLock.Lock()
  270. for _, v := range l.volumes {
  271. v.Close()
  272. }
  273. l.volumesLock.Unlock()
  274. l.ecVolumesLock.Lock()
  275. for _, ecVolume := range l.ecVolumes {
  276. ecVolume.Close()
  277. }
  278. l.ecVolumesLock.Unlock()
  279. return
  280. }
  281. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
  282. if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
  283. for _, fileInfo := range fileInfos {
  284. volId, _, err := volumeIdFromFileName(fileInfo.Name())
  285. if vid == volId && err == nil {
  286. return fileInfo, true
  287. }
  288. }
  289. }
  290. return nil, false
  291. }
  292. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  293. l.volumesLock.RLock()
  294. defer l.volumesLock.RUnlock()
  295. for _, vol := range l.volumes {
  296. if vol.IsReadOnly() {
  297. continue
  298. }
  299. datSize, idxSize, _ := vol.FileStat()
  300. unUsedSpace += volumeSizeLimit - (datSize + idxSize)
  301. }
  302. return
  303. }
  304. func (l *DiskLocation) CheckDiskSpace() {
  305. for {
  306. if dir, e := filepath.Abs(l.Directory); e == nil {
  307. s := stats.NewDiskStatus(dir)
  308. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  309. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  310. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  311. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  312. if isLow != l.isDiskSpaceLow {
  313. l.isDiskSpaceLow = !l.isDiskSpaceLow
  314. }
  315. logLevel := glog.Level(4)
  316. if l.isDiskSpaceLow {
  317. logLevel = glog.Level(0)
  318. }
  319. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  320. }
  321. time.Sleep(time.Minute)
  322. }
  323. }