You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

465 lines
12 KiB

5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/stats"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. DirectoryUuid string
  22. IdxDirectory string
  23. DiskType types.DiskType
  24. MaxVolumeCount int32
  25. OriginalMaxVolumeCount int32
  26. MinFreeSpace util.MinFreeSpace
  27. volumes map[needle.VolumeId]*Volume
  28. volumesLock sync.RWMutex
  29. // erasure coding
  30. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  31. ecVolumesLock sync.RWMutex
  32. isDiskSpaceLow bool
  33. closeCh chan struct{}
  34. }
  35. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  36. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  37. fileName := dir + "/vol_dir.uuid"
  38. if !util.FileExists(fileName) {
  39. dirUuidString, err = writeNewUuid(fileName)
  40. } else {
  41. uuidData, readErr := os.ReadFile(fileName)
  42. if readErr != nil {
  43. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  44. }
  45. if len(uuidData) > 0 {
  46. dirUuidString = string(uuidData)
  47. } else {
  48. dirUuidString, err = writeNewUuid(fileName)
  49. }
  50. }
  51. return dirUuidString, err
  52. }
  53. func writeNewUuid(fileName string) (string, error) {
  54. dirUuid, _ := uuid.NewRandom()
  55. dirUuidString := dirUuid.String()
  56. if err := util.WriteFile(fileName, []byte(dirUuidString), 0644); err != nil {
  57. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, err)
  58. }
  59. return dirUuidString, nil
  60. }
  61. func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  62. glog.V(4).Infof("Added new Disk %s: maxVolumes=%d", dir, maxVolumeCount)
  63. dir = util.ResolvePath(dir)
  64. if idxDir == "" {
  65. idxDir = dir
  66. } else {
  67. idxDir = util.ResolvePath(idxDir)
  68. }
  69. dirUuid, err := GenerateDirUuid(dir)
  70. if err != nil {
  71. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  72. }
  73. location := &DiskLocation{
  74. Directory: dir,
  75. DirectoryUuid: dirUuid,
  76. IdxDirectory: idxDir,
  77. DiskType: diskType,
  78. MaxVolumeCount: maxVolumeCount,
  79. OriginalMaxVolumeCount: maxVolumeCount,
  80. MinFreeSpace: minFreeSpace,
  81. }
  82. location.volumes = make(map[needle.VolumeId]*Volume)
  83. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  84. location.closeCh = make(chan struct{})
  85. go func() {
  86. location.CheckDiskSpace()
  87. for {
  88. select {
  89. case <-location.closeCh:
  90. return
  91. case <-time.After(time.Minute):
  92. location.CheckDiskSpace()
  93. }
  94. }
  95. }()
  96. return location
  97. }
  98. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  99. if isValidVolume(filename) {
  100. base := filename[:len(filename)-4]
  101. collection, volumeId, err := parseCollectionVolumeId(base)
  102. return volumeId, collection, err
  103. }
  104. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  105. }
  106. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  107. i := strings.LastIndex(base, "_")
  108. if i > 0 {
  109. collection, base = base[0:i], base[i+1:]
  110. }
  111. vol, err := needle.NewVolumeId(base)
  112. return collection, vol, err
  113. }
  114. func isValidVolume(basename string) bool {
  115. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  116. }
  117. func getValidVolumeName(basename string) string {
  118. if isValidVolume(basename) {
  119. return basename[:len(basename)-4]
  120. }
  121. return ""
  122. }
  123. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
  124. basename := dirEntry.Name()
  125. if dirEntry.IsDir() {
  126. return false
  127. }
  128. volumeName := getValidVolumeName(basename)
  129. if volumeName == "" {
  130. return false
  131. }
  132. // skip if ec volumes exists
  133. if skipIfEcVolumesExists {
  134. if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") {
  135. return false
  136. }
  137. }
  138. // check for incomplete volume
  139. noteFile := l.Directory + "/" + volumeName + ".note"
  140. if util.FileExists(noteFile) {
  141. note, _ := os.ReadFile(noteFile)
  142. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  143. removeVolumeFiles(l.Directory + "/" + volumeName)
  144. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  145. return false
  146. }
  147. // parse out collection, volume id
  148. vid, collection, err := volumeIdFromFileName(basename)
  149. if err != nil {
  150. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  151. return false
  152. }
  153. // avoid loading one volume more than once
  154. l.volumesLock.RLock()
  155. _, found := l.volumes[vid]
  156. l.volumesLock.RUnlock()
  157. if found {
  158. glog.V(1).Infof("loaded volume, %v", vid)
  159. return true
  160. }
  161. // load the volume
  162. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
  163. if e != nil {
  164. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  165. return false
  166. }
  167. l.SetVolume(vid, v)
  168. size, _, _ := v.FileStat()
  169. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  170. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  171. return true
  172. }
  173. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
  174. task_queue := make(chan os.DirEntry, 10*concurrency)
  175. go func() {
  176. foundVolumeNames := make(map[string]bool)
  177. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  178. for _, entry := range dirEntries {
  179. volumeName := getValidVolumeName(entry.Name())
  180. if volumeName == "" {
  181. continue
  182. }
  183. if _, found := foundVolumeNames[volumeName]; !found {
  184. foundVolumeNames[volumeName] = true
  185. task_queue <- entry
  186. }
  187. }
  188. }
  189. close(task_queue)
  190. }()
  191. var wg sync.WaitGroup
  192. for workerNum := 0; workerNum < concurrency; workerNum++ {
  193. wg.Add(1)
  194. go func() {
  195. defer wg.Done()
  196. for fi := range task_queue {
  197. _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
  198. }
  199. }()
  200. }
  201. wg.Wait()
  202. }
  203. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
  204. workerNum := runtime.NumCPU()
  205. val, ok := os.LookupEnv("GOMAXPROCS")
  206. if ok {
  207. num, err := strconv.Atoi(val)
  208. if err != nil || num < 1 {
  209. num = 10
  210. glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
  211. }
  212. workerNum = num
  213. } else {
  214. if workerNum <= 10 {
  215. workerNum = 10
  216. }
  217. }
  218. l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
  219. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  220. l.loadAllEcShards()
  221. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  222. }
  223. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  224. l.volumesLock.Lock()
  225. delVolsMap := l.unmountVolumeByCollection(collection)
  226. l.volumesLock.Unlock()
  227. l.ecVolumesLock.Lock()
  228. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  229. l.ecVolumesLock.Unlock()
  230. errChain := make(chan error, 2)
  231. var wg sync.WaitGroup
  232. wg.Add(2)
  233. go func() {
  234. for _, v := range delVolsMap {
  235. if err := v.Destroy(false); err != nil {
  236. errChain <- err
  237. }
  238. }
  239. wg.Done()
  240. }()
  241. go func() {
  242. for _, v := range delEcVolsMap {
  243. v.Destroy()
  244. }
  245. wg.Done()
  246. }()
  247. go func() {
  248. wg.Wait()
  249. close(errChain)
  250. }()
  251. errBuilder := strings.Builder{}
  252. for err := range errChain {
  253. errBuilder.WriteString(err.Error())
  254. errBuilder.WriteString("; ")
  255. }
  256. if errBuilder.Len() > 0 {
  257. e = fmt.Errorf(errBuilder.String())
  258. }
  259. return
  260. }
  261. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
  262. v, ok := l.volumes[vid]
  263. if !ok {
  264. return
  265. }
  266. e = v.Destroy(onlyEmpty)
  267. if e != nil {
  268. return
  269. }
  270. found = true
  271. delete(l.volumes, vid)
  272. return
  273. }
  274. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  275. if fileInfo, found := l.LocateVolume(vid); found {
  276. return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
  277. }
  278. return false
  279. }
  280. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  281. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
  282. l.volumesLock.Lock()
  283. defer l.volumesLock.Unlock()
  284. _, ok := l.volumes[vid]
  285. if !ok {
  286. return ErrVolumeNotFound
  287. }
  288. _, err := l.deleteVolumeById(vid, onlyEmpty)
  289. return err
  290. }
  291. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  292. l.volumesLock.Lock()
  293. defer l.volumesLock.Unlock()
  294. v, ok := l.volumes[vid]
  295. if !ok {
  296. return ErrVolumeNotFound
  297. }
  298. v.Close()
  299. delete(l.volumes, vid)
  300. return nil
  301. }
  302. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  303. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  304. for k, v := range l.volumes {
  305. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  306. deltaVols[k] = v
  307. }
  308. }
  309. for k := range deltaVols {
  310. delete(l.volumes, k)
  311. }
  312. return deltaVols
  313. }
  314. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  315. l.volumesLock.Lock()
  316. defer l.volumesLock.Unlock()
  317. l.volumes[vid] = volume
  318. volume.location = l
  319. }
  320. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  321. l.volumesLock.RLock()
  322. defer l.volumesLock.RUnlock()
  323. v, ok := l.volumes[vid]
  324. return v, ok
  325. }
  326. func (l *DiskLocation) VolumesLen() int {
  327. l.volumesLock.RLock()
  328. defer l.volumesLock.RUnlock()
  329. return len(l.volumes)
  330. }
  331. func (l *DiskLocation) SetStopping() {
  332. l.volumesLock.Lock()
  333. for _, v := range l.volumes {
  334. v.SyncToDisk()
  335. }
  336. l.volumesLock.Unlock()
  337. return
  338. }
  339. func (l *DiskLocation) Close() {
  340. l.volumesLock.Lock()
  341. for _, v := range l.volumes {
  342. v.Close()
  343. }
  344. l.volumesLock.Unlock()
  345. l.ecVolumesLock.Lock()
  346. for _, ecVolume := range l.ecVolumes {
  347. ecVolume.Close()
  348. }
  349. l.ecVolumesLock.Unlock()
  350. close(l.closeCh)
  351. return
  352. }
  353. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  354. // println("LocateVolume", vid, "on", l.Directory)
  355. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  356. for _, entry := range dirEntries {
  357. // println("checking", entry.Name(), "...")
  358. volId, _, err := volumeIdFromFileName(entry.Name())
  359. // println("volId", volId, "err", err)
  360. if vid == volId && err == nil {
  361. return entry, true
  362. }
  363. }
  364. }
  365. return nil, false
  366. }
  367. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  368. l.volumesLock.RLock()
  369. defer l.volumesLock.RUnlock()
  370. for _, vol := range l.volumes {
  371. if vol.IsReadOnly() {
  372. continue
  373. }
  374. datSize, idxSize, _ := vol.FileStat()
  375. unUsedSpaceVolume := int64(volumeSizeLimit) - int64(datSize+idxSize)
  376. glog.V(4).Infof("Volume stats for %d: volumeSizeLimit=%d, datSize=%d idxSize=%d unused=%d", vol.Id, volumeSizeLimit, datSize, idxSize, unUsedSpaceVolume)
  377. if unUsedSpaceVolume >= 0 {
  378. unUsedSpace += uint64(unUsedSpaceVolume)
  379. }
  380. }
  381. return
  382. }
  383. func (l *DiskLocation) CheckDiskSpace() {
  384. if dir, e := filepath.Abs(l.Directory); e == nil {
  385. s := stats.NewDiskStatus(dir)
  386. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  387. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  388. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  389. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  390. if isLow != l.isDiskSpaceLow {
  391. l.isDiskSpaceLow = !l.isDiskSpaceLow
  392. }
  393. logLevel := glog.Level(4)
  394. if l.isDiskSpaceLow {
  395. logLevel = glog.Level(0)
  396. }
  397. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  398. }
  399. }