You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

458 lines
11 KiB

5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/stats"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. DirectoryUuid string
  22. IdxDirectory string
  23. DiskType types.DiskType
  24. MaxVolumeCount int32
  25. OriginalMaxVolumeCount int32
  26. MinFreeSpace util.MinFreeSpace
  27. volumes map[needle.VolumeId]*Volume
  28. volumesLock sync.RWMutex
  29. // erasure coding
  30. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  31. ecVolumesLock sync.RWMutex
  32. isDiskSpaceLow bool
  33. closeCh chan struct{}
  34. }
  35. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  36. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  37. dirUuidString = ""
  38. fileName := dir + "/vol_dir.uuid"
  39. if !util.FileExists(fileName) {
  40. dirUuid, _ := uuid.NewRandom()
  41. dirUuidString = dirUuid.String()
  42. writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
  43. if writeErr != nil {
  44. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
  45. }
  46. } else {
  47. uuidData, readErr := os.ReadFile(fileName)
  48. if readErr != nil {
  49. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  50. }
  51. dirUuidString = string(uuidData)
  52. }
  53. return dirUuidString, nil
  54. }
  55. func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  56. glog.V(4).Infof("Added new Disk %s: maxVolumes=%d", dir, maxVolumeCount)
  57. dir = util.ResolvePath(dir)
  58. if idxDir == "" {
  59. idxDir = dir
  60. } else {
  61. idxDir = util.ResolvePath(idxDir)
  62. }
  63. dirUuid, err := GenerateDirUuid(dir)
  64. if err != nil {
  65. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  66. }
  67. location := &DiskLocation{
  68. Directory: dir,
  69. DirectoryUuid: dirUuid,
  70. IdxDirectory: idxDir,
  71. DiskType: diskType,
  72. MaxVolumeCount: maxVolumeCount,
  73. OriginalMaxVolumeCount: maxVolumeCount,
  74. MinFreeSpace: minFreeSpace,
  75. }
  76. location.volumes = make(map[needle.VolumeId]*Volume)
  77. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  78. location.closeCh = make(chan struct{})
  79. go func() {
  80. location.CheckDiskSpace()
  81. for {
  82. select {
  83. case <-location.closeCh:
  84. return
  85. case <-time.After(time.Minute):
  86. location.CheckDiskSpace()
  87. }
  88. }
  89. }()
  90. return location
  91. }
  92. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  93. if isValidVolume(filename) {
  94. base := filename[:len(filename)-4]
  95. collection, volumeId, err := parseCollectionVolumeId(base)
  96. return volumeId, collection, err
  97. }
  98. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  99. }
  100. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  101. i := strings.LastIndex(base, "_")
  102. if i > 0 {
  103. collection, base = base[0:i], base[i+1:]
  104. }
  105. vol, err := needle.NewVolumeId(base)
  106. return collection, vol, err
  107. }
  108. func isValidVolume(basename string) bool {
  109. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  110. }
  111. func getValidVolumeName(basename string) string {
  112. if isValidVolume(basename) {
  113. return basename[:len(basename)-4]
  114. }
  115. return ""
  116. }
  117. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
  118. basename := dirEntry.Name()
  119. if dirEntry.IsDir() {
  120. return false
  121. }
  122. volumeName := getValidVolumeName(basename)
  123. if volumeName == "" {
  124. return false
  125. }
  126. // skip if ec volumes exists
  127. if skipIfEcVolumesExists {
  128. if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") {
  129. return false
  130. }
  131. }
  132. // check for incomplete volume
  133. noteFile := l.Directory + "/" + volumeName + ".note"
  134. if util.FileExists(noteFile) {
  135. note, _ := os.ReadFile(noteFile)
  136. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  137. removeVolumeFiles(l.Directory + "/" + volumeName)
  138. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  139. return false
  140. }
  141. // parse out collection, volume id
  142. vid, collection, err := volumeIdFromFileName(basename)
  143. if err != nil {
  144. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  145. return false
  146. }
  147. // avoid loading one volume more than once
  148. l.volumesLock.RLock()
  149. _, found := l.volumes[vid]
  150. l.volumesLock.RUnlock()
  151. if found {
  152. glog.V(1).Infof("loaded volume, %v", vid)
  153. return true
  154. }
  155. // load the volume
  156. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
  157. if e != nil {
  158. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  159. return false
  160. }
  161. l.SetVolume(vid, v)
  162. size, _, _ := v.FileStat()
  163. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  164. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  165. return true
  166. }
  167. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
  168. task_queue := make(chan os.DirEntry, 10*concurrency)
  169. go func() {
  170. foundVolumeNames := make(map[string]bool)
  171. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  172. for _, entry := range dirEntries {
  173. volumeName := getValidVolumeName(entry.Name())
  174. if volumeName == "" {
  175. continue
  176. }
  177. if _, found := foundVolumeNames[volumeName]; !found {
  178. foundVolumeNames[volumeName] = true
  179. task_queue <- entry
  180. }
  181. }
  182. }
  183. close(task_queue)
  184. }()
  185. var wg sync.WaitGroup
  186. for workerNum := 0; workerNum < concurrency; workerNum++ {
  187. wg.Add(1)
  188. go func() {
  189. defer wg.Done()
  190. for fi := range task_queue {
  191. _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
  192. }
  193. }()
  194. }
  195. wg.Wait()
  196. }
  197. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
  198. workerNum := runtime.NumCPU()
  199. val, ok := os.LookupEnv("GOMAXPROCS")
  200. if ok {
  201. num, err := strconv.Atoi(val)
  202. if err != nil || num < 1 {
  203. num = 10
  204. glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
  205. }
  206. workerNum = num
  207. } else {
  208. if workerNum <= 10 {
  209. workerNum = 10
  210. }
  211. }
  212. l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
  213. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  214. l.loadAllEcShards()
  215. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  216. }
  217. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  218. l.volumesLock.Lock()
  219. delVolsMap := l.unmountVolumeByCollection(collection)
  220. l.volumesLock.Unlock()
  221. l.ecVolumesLock.Lock()
  222. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  223. l.ecVolumesLock.Unlock()
  224. errChain := make(chan error, 2)
  225. var wg sync.WaitGroup
  226. wg.Add(2)
  227. go func() {
  228. for _, v := range delVolsMap {
  229. if err := v.Destroy(false); err != nil {
  230. errChain <- err
  231. }
  232. }
  233. wg.Done()
  234. }()
  235. go func() {
  236. for _, v := range delEcVolsMap {
  237. v.Destroy()
  238. }
  239. wg.Done()
  240. }()
  241. go func() {
  242. wg.Wait()
  243. close(errChain)
  244. }()
  245. errBuilder := strings.Builder{}
  246. for err := range errChain {
  247. errBuilder.WriteString(err.Error())
  248. errBuilder.WriteString("; ")
  249. }
  250. if errBuilder.Len() > 0 {
  251. e = fmt.Errorf(errBuilder.String())
  252. }
  253. return
  254. }
  255. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
  256. v, ok := l.volumes[vid]
  257. if !ok {
  258. return
  259. }
  260. e = v.Destroy(onlyEmpty)
  261. if e != nil {
  262. return
  263. }
  264. found = true
  265. delete(l.volumes, vid)
  266. return
  267. }
  268. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  269. if fileInfo, found := l.LocateVolume(vid); found {
  270. return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
  271. }
  272. return false
  273. }
  274. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  275. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
  276. l.volumesLock.Lock()
  277. defer l.volumesLock.Unlock()
  278. _, ok := l.volumes[vid]
  279. if !ok {
  280. return ErrVolumeNotFound
  281. }
  282. _, err := l.deleteVolumeById(vid, onlyEmpty)
  283. return err
  284. }
  285. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  286. l.volumesLock.Lock()
  287. defer l.volumesLock.Unlock()
  288. v, ok := l.volumes[vid]
  289. if !ok {
  290. return ErrVolumeNotFound
  291. }
  292. v.Close()
  293. delete(l.volumes, vid)
  294. return nil
  295. }
  296. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  297. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  298. for k, v := range l.volumes {
  299. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  300. deltaVols[k] = v
  301. }
  302. }
  303. for k := range deltaVols {
  304. delete(l.volumes, k)
  305. }
  306. return deltaVols
  307. }
  308. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  309. l.volumesLock.Lock()
  310. defer l.volumesLock.Unlock()
  311. l.volumes[vid] = volume
  312. volume.location = l
  313. }
  314. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  315. l.volumesLock.RLock()
  316. defer l.volumesLock.RUnlock()
  317. v, ok := l.volumes[vid]
  318. return v, ok
  319. }
  320. func (l *DiskLocation) VolumesLen() int {
  321. l.volumesLock.RLock()
  322. defer l.volumesLock.RUnlock()
  323. return len(l.volumes)
  324. }
  325. func (l *DiskLocation) SetStopping() {
  326. l.volumesLock.Lock()
  327. for _, v := range l.volumes {
  328. v.SyncToDisk()
  329. }
  330. l.volumesLock.Unlock()
  331. return
  332. }
  333. func (l *DiskLocation) Close() {
  334. l.volumesLock.Lock()
  335. for _, v := range l.volumes {
  336. v.Close()
  337. }
  338. l.volumesLock.Unlock()
  339. l.ecVolumesLock.Lock()
  340. for _, ecVolume := range l.ecVolumes {
  341. ecVolume.Close()
  342. }
  343. l.ecVolumesLock.Unlock()
  344. close(l.closeCh)
  345. return
  346. }
  347. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  348. // println("LocateVolume", vid, "on", l.Directory)
  349. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  350. for _, entry := range dirEntries {
  351. // println("checking", entry.Name(), "...")
  352. volId, _, err := volumeIdFromFileName(entry.Name())
  353. // println("volId", volId, "err", err)
  354. if vid == volId && err == nil {
  355. return entry, true
  356. }
  357. }
  358. }
  359. return nil, false
  360. }
  361. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  362. l.volumesLock.RLock()
  363. defer l.volumesLock.RUnlock()
  364. for _, vol := range l.volumes {
  365. if vol.IsReadOnly() {
  366. continue
  367. }
  368. datSize, idxSize, _ := vol.FileStat()
  369. unUsedSpaceVolume := int64(volumeSizeLimit) - int64(datSize+idxSize)
  370. glog.V(4).Infof("Volume stats for %d: volumeSizeLimit=%d, datSize=%d idxSize=%d unused=%d", vol.Id, volumeSizeLimit, datSize, idxSize, unUsedSpaceVolume)
  371. if unUsedSpaceVolume >= 0 {
  372. unUsedSpace += uint64(unUsedSpaceVolume)
  373. }
  374. }
  375. return
  376. }
  377. func (l *DiskLocation) CheckDiskSpace() {
  378. if dir, e := filepath.Abs(l.Directory); e == nil {
  379. s := stats.NewDiskStatus(dir)
  380. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  381. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  382. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  383. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  384. if isLow != l.isDiskSpaceLow {
  385. l.isDiskSpaceLow = !l.isDiskSpaceLow
  386. }
  387. logLevel := glog.Level(4)
  388. if l.isDiskSpaceLow {
  389. logLevel = glog.Level(0)
  390. }
  391. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  392. }
  393. }