You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

454 lines
11 KiB

5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/stats"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. DirectoryUuid string
  22. IdxDirectory string
  23. DiskType types.DiskType
  24. MaxVolumeCount int32
  25. OriginalMaxVolumeCount int32
  26. MinFreeSpace util.MinFreeSpace
  27. volumes map[needle.VolumeId]*Volume
  28. volumesLock sync.RWMutex
  29. // erasure coding
  30. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  31. ecVolumesLock sync.RWMutex
  32. isDiskSpaceLow bool
  33. closeCh chan struct{}
  34. }
  35. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  36. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  37. dirUuidString = ""
  38. fileName := dir + "/vol_dir.uuid"
  39. if !util.FileExists(fileName) {
  40. dirUuid, _ := uuid.NewRandom()
  41. dirUuidString = dirUuid.String()
  42. writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
  43. if writeErr != nil {
  44. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
  45. }
  46. } else {
  47. uuidData, readErr := os.ReadFile(fileName)
  48. if readErr != nil {
  49. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  50. }
  51. dirUuidString = string(uuidData)
  52. }
  53. return dirUuidString, nil
  54. }
  55. func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  56. dir = util.ResolvePath(dir)
  57. if idxDir == "" {
  58. idxDir = dir
  59. } else {
  60. idxDir = util.ResolvePath(idxDir)
  61. }
  62. dirUuid, err := GenerateDirUuid(dir)
  63. if err != nil {
  64. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  65. }
  66. location := &DiskLocation{
  67. Directory: dir,
  68. DirectoryUuid: dirUuid,
  69. IdxDirectory: idxDir,
  70. DiskType: diskType,
  71. MaxVolumeCount: maxVolumeCount,
  72. OriginalMaxVolumeCount: maxVolumeCount,
  73. MinFreeSpace: minFreeSpace,
  74. }
  75. location.volumes = make(map[needle.VolumeId]*Volume)
  76. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  77. location.closeCh = make(chan struct{})
  78. go func() {
  79. location.CheckDiskSpace()
  80. for {
  81. select {
  82. case <-location.closeCh:
  83. return
  84. case <-time.After(time.Minute):
  85. location.CheckDiskSpace()
  86. }
  87. }
  88. }()
  89. return location
  90. }
  91. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  92. if isValidVolume(filename) {
  93. base := filename[:len(filename)-4]
  94. collection, volumeId, err := parseCollectionVolumeId(base)
  95. return volumeId, collection, err
  96. }
  97. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  98. }
  99. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  100. i := strings.LastIndex(base, "_")
  101. if i > 0 {
  102. collection, base = base[0:i], base[i+1:]
  103. }
  104. vol, err := needle.NewVolumeId(base)
  105. return collection, vol, err
  106. }
  107. func isValidVolume(basename string) bool {
  108. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  109. }
  110. func getValidVolumeName(basename string) string {
  111. if isValidVolume(basename) {
  112. return basename[:len(basename)-4]
  113. }
  114. return ""
  115. }
  116. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
  117. basename := dirEntry.Name()
  118. if dirEntry.IsDir() {
  119. return false
  120. }
  121. volumeName := getValidVolumeName(basename)
  122. if volumeName == "" {
  123. return false
  124. }
  125. // skip if ec volumes exists
  126. if skipIfEcVolumesExists {
  127. if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
  128. return false
  129. }
  130. }
  131. // check for incomplete volume
  132. noteFile := l.Directory + "/" + volumeName + ".note"
  133. if util.FileExists(noteFile) {
  134. note, _ := os.ReadFile(noteFile)
  135. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  136. removeVolumeFiles(l.Directory + "/" + volumeName)
  137. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  138. return false
  139. }
  140. // parse out collection, volume id
  141. vid, collection, err := volumeIdFromFileName(basename)
  142. if err != nil {
  143. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  144. return false
  145. }
  146. // avoid loading one volume more than once
  147. l.volumesLock.RLock()
  148. _, found := l.volumes[vid]
  149. l.volumesLock.RUnlock()
  150. if found {
  151. glog.V(1).Infof("loaded volume, %v", vid)
  152. return true
  153. }
  154. // load the volume
  155. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
  156. if e != nil {
  157. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  158. return false
  159. }
  160. l.SetVolume(vid, v)
  161. size, _, _ := v.FileStat()
  162. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  163. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  164. return true
  165. }
  166. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
  167. task_queue := make(chan os.DirEntry, 10*concurrency)
  168. go func() {
  169. foundVolumeNames := make(map[string]bool)
  170. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  171. for _, entry := range dirEntries {
  172. volumeName := getValidVolumeName(entry.Name())
  173. if volumeName == "" {
  174. continue
  175. }
  176. if _, found := foundVolumeNames[volumeName]; !found {
  177. foundVolumeNames[volumeName] = true
  178. task_queue <- entry
  179. }
  180. }
  181. }
  182. close(task_queue)
  183. }()
  184. var wg sync.WaitGroup
  185. for workerNum := 0; workerNum < concurrency; workerNum++ {
  186. wg.Add(1)
  187. go func() {
  188. defer wg.Done()
  189. for fi := range task_queue {
  190. _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
  191. }
  192. }()
  193. }
  194. wg.Wait()
  195. }
  196. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
  197. workerNum := runtime.NumCPU()
  198. val, ok := os.LookupEnv("GOMAXPROCS")
  199. if ok {
  200. num, err := strconv.Atoi(val)
  201. if err != nil || num < 1 {
  202. num = 10
  203. glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
  204. }
  205. workerNum = num
  206. } else {
  207. if workerNum <= 10 {
  208. workerNum = 10
  209. }
  210. }
  211. l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
  212. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  213. l.loadAllEcShards()
  214. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  215. }
  216. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  217. l.volumesLock.Lock()
  218. delVolsMap := l.unmountVolumeByCollection(collection)
  219. l.volumesLock.Unlock()
  220. l.ecVolumesLock.Lock()
  221. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  222. l.ecVolumesLock.Unlock()
  223. errChain := make(chan error, 2)
  224. var wg sync.WaitGroup
  225. wg.Add(2)
  226. go func() {
  227. for _, v := range delVolsMap {
  228. if err := v.Destroy(false); err != nil {
  229. errChain <- err
  230. }
  231. }
  232. wg.Done()
  233. }()
  234. go func() {
  235. for _, v := range delEcVolsMap {
  236. v.Destroy()
  237. }
  238. wg.Done()
  239. }()
  240. go func() {
  241. wg.Wait()
  242. close(errChain)
  243. }()
  244. errBuilder := strings.Builder{}
  245. for err := range errChain {
  246. errBuilder.WriteString(err.Error())
  247. errBuilder.WriteString("; ")
  248. }
  249. if errBuilder.Len() > 0 {
  250. e = fmt.Errorf(errBuilder.String())
  251. }
  252. return
  253. }
  254. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
  255. v, ok := l.volumes[vid]
  256. if !ok {
  257. return
  258. }
  259. e = v.Destroy(onlyEmpty)
  260. if e != nil {
  261. return
  262. }
  263. found = true
  264. delete(l.volumes, vid)
  265. return
  266. }
  267. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  268. if fileInfo, found := l.LocateVolume(vid); found {
  269. return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
  270. }
  271. return false
  272. }
  273. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  274. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
  275. l.volumesLock.Lock()
  276. defer l.volumesLock.Unlock()
  277. _, ok := l.volumes[vid]
  278. if !ok {
  279. return ErrVolumeNotFound
  280. }
  281. _, err := l.deleteVolumeById(vid, onlyEmpty)
  282. return err
  283. }
  284. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  285. l.volumesLock.Lock()
  286. defer l.volumesLock.Unlock()
  287. v, ok := l.volumes[vid]
  288. if !ok {
  289. return ErrVolumeNotFound
  290. }
  291. v.Close()
  292. delete(l.volumes, vid)
  293. return nil
  294. }
  295. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  296. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  297. for k, v := range l.volumes {
  298. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  299. deltaVols[k] = v
  300. }
  301. }
  302. for k := range deltaVols {
  303. delete(l.volumes, k)
  304. }
  305. return deltaVols
  306. }
  307. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  308. l.volumesLock.Lock()
  309. defer l.volumesLock.Unlock()
  310. l.volumes[vid] = volume
  311. volume.location = l
  312. }
  313. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  314. l.volumesLock.RLock()
  315. defer l.volumesLock.RUnlock()
  316. v, ok := l.volumes[vid]
  317. return v, ok
  318. }
  319. func (l *DiskLocation) VolumesLen() int {
  320. l.volumesLock.RLock()
  321. defer l.volumesLock.RUnlock()
  322. return len(l.volumes)
  323. }
  324. func (l *DiskLocation) SetStopping() {
  325. l.volumesLock.Lock()
  326. for _, v := range l.volumes {
  327. v.SyncToDisk()
  328. }
  329. l.volumesLock.Unlock()
  330. return
  331. }
  332. func (l *DiskLocation) Close() {
  333. l.volumesLock.Lock()
  334. for _, v := range l.volumes {
  335. v.Close()
  336. }
  337. l.volumesLock.Unlock()
  338. l.ecVolumesLock.Lock()
  339. for _, ecVolume := range l.ecVolumes {
  340. ecVolume.Close()
  341. }
  342. l.ecVolumesLock.Unlock()
  343. close(l.closeCh)
  344. return
  345. }
  346. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  347. // println("LocateVolume", vid, "on", l.Directory)
  348. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  349. for _, entry := range dirEntries {
  350. // println("checking", entry.Name(), "...")
  351. volId, _, err := volumeIdFromFileName(entry.Name())
  352. // println("volId", volId, "err", err)
  353. if vid == volId && err == nil {
  354. return entry, true
  355. }
  356. }
  357. }
  358. return nil, false
  359. }
  360. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  361. l.volumesLock.RLock()
  362. defer l.volumesLock.RUnlock()
  363. for _, vol := range l.volumes {
  364. if vol.IsReadOnly() {
  365. continue
  366. }
  367. datSize, idxSize, _ := vol.FileStat()
  368. unUsedSpace += volumeSizeLimit - (datSize + idxSize)
  369. }
  370. return
  371. }
  372. func (l *DiskLocation) CheckDiskSpace() {
  373. if dir, e := filepath.Abs(l.Directory); e == nil {
  374. s := stats.NewDiskStatus(dir)
  375. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  376. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  377. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  378. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  379. if isLow != l.isDiskSpaceLow {
  380. l.isDiskSpaceLow = !l.isDiskSpaceLow
  381. }
  382. logLevel := glog.Level(4)
  383. if l.isDiskSpaceLow {
  384. logLevel = glog.Level(0)
  385. }
  386. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  387. }
  388. }