You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

455 lines
11 KiB

5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/stats"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. DirectoryUuid string
  22. IdxDirectory string
  23. DiskType types.DiskType
  24. MaxVolumeCount int32
  25. OriginalMaxVolumeCount int32
  26. MinFreeSpace util.MinFreeSpace
  27. volumes map[needle.VolumeId]*Volume
  28. volumesLock sync.RWMutex
  29. // erasure coding
  30. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  31. ecVolumesLock sync.RWMutex
  32. isDiskSpaceLow bool
  33. closeCh chan struct{}
  34. }
  35. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  36. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  37. dirUuidString = ""
  38. fileName := dir + "/vol_dir.uuid"
  39. if !util.FileExists(fileName) {
  40. dirUuid, _ := uuid.NewRandom()
  41. dirUuidString = dirUuid.String()
  42. writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
  43. if writeErr != nil {
  44. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
  45. }
  46. } else {
  47. uuidData, readErr := os.ReadFile(fileName)
  48. if readErr != nil {
  49. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  50. }
  51. dirUuidString = string(uuidData)
  52. }
  53. return dirUuidString, nil
  54. }
  55. func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  56. dir = util.ResolvePath(dir)
  57. if idxDir == "" {
  58. idxDir = dir
  59. } else {
  60. idxDir = util.ResolvePath(idxDir)
  61. }
  62. dirUuid, err := GenerateDirUuid(dir)
  63. if err != nil {
  64. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  65. }
  66. location := &DiskLocation{
  67. Directory: dir,
  68. DirectoryUuid: dirUuid,
  69. IdxDirectory: idxDir,
  70. DiskType: diskType,
  71. MaxVolumeCount: maxVolumeCount,
  72. OriginalMaxVolumeCount: maxVolumeCount,
  73. MinFreeSpace: minFreeSpace,
  74. }
  75. location.volumes = make(map[needle.VolumeId]*Volume)
  76. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  77. location.closeCh = make(chan struct{})
  78. go func() {
  79. location.CheckDiskSpace()
  80. for {
  81. select {
  82. case <-location.closeCh:
  83. return
  84. case <-time.After(time.Minute):
  85. location.CheckDiskSpace()
  86. }
  87. }
  88. }()
  89. return location
  90. }
  91. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  92. if isValidVolume(filename) {
  93. base := filename[:len(filename)-4]
  94. collection, volumeId, err := parseCollectionVolumeId(base)
  95. return volumeId, collection, err
  96. }
  97. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  98. }
  99. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  100. i := strings.LastIndex(base, "_")
  101. if i > 0 {
  102. collection, base = base[0:i], base[i+1:]
  103. }
  104. vol, err := needle.NewVolumeId(base)
  105. return collection, vol, err
  106. }
  107. func isValidVolume(basename string) bool {
  108. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  109. }
  110. func getValidVolumeName(basename string) string {
  111. if isValidVolume(basename) {
  112. return basename[:len(basename)-4]
  113. }
  114. return ""
  115. }
  116. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
  117. basename := dirEntry.Name()
  118. if dirEntry.IsDir() {
  119. return false
  120. }
  121. volumeName := getValidVolumeName(basename)
  122. if volumeName == "" {
  123. return false
  124. }
  125. // skip if ec volumes exists
  126. if skipIfEcVolumesExists {
  127. if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
  128. return false
  129. }
  130. }
  131. // check for incomplete volume
  132. noteFile := l.Directory + "/" + volumeName + ".note"
  133. if util.FileExists(noteFile) {
  134. note, _ := os.ReadFile(noteFile)
  135. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  136. removeVolumeFiles(l.Directory + "/" + volumeName)
  137. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  138. return false
  139. }
  140. // parse out collection, volume id
  141. vid, collection, err := volumeIdFromFileName(basename)
  142. if err != nil {
  143. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  144. return false
  145. }
  146. // avoid loading one volume more than once
  147. l.volumesLock.RLock()
  148. _, found := l.volumes[vid]
  149. l.volumesLock.RUnlock()
  150. if found {
  151. glog.V(1).Infof("loaded volume, %v", vid)
  152. return true
  153. }
  154. // load the volume
  155. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
  156. if e != nil {
  157. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  158. return false
  159. }
  160. l.SetVolume(vid, v)
  161. stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume", v.DiskType().ReadableString()).Inc()
  162. size, _, _ := v.FileStat()
  163. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  164. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  165. return true
  166. }
  167. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
  168. task_queue := make(chan os.DirEntry, 10*concurrency)
  169. go func() {
  170. foundVolumeNames := make(map[string]bool)
  171. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  172. for _, entry := range dirEntries {
  173. volumeName := getValidVolumeName(entry.Name())
  174. if volumeName == "" {
  175. continue
  176. }
  177. if _, found := foundVolumeNames[volumeName]; !found {
  178. foundVolumeNames[volumeName] = true
  179. task_queue <- entry
  180. }
  181. }
  182. }
  183. close(task_queue)
  184. }()
  185. var wg sync.WaitGroup
  186. for workerNum := 0; workerNum < concurrency; workerNum++ {
  187. wg.Add(1)
  188. go func() {
  189. defer wg.Done()
  190. for fi := range task_queue {
  191. _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
  192. }
  193. }()
  194. }
  195. wg.Wait()
  196. }
  197. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
  198. workerNum := runtime.NumCPU()
  199. val, ok := os.LookupEnv("GOMAXPROCS")
  200. if ok {
  201. num, err := strconv.Atoi(val)
  202. if err != nil || num < 1 {
  203. num = 10
  204. glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
  205. }
  206. workerNum = num
  207. } else {
  208. if workerNum <= 10 {
  209. workerNum = 10
  210. }
  211. }
  212. l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
  213. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  214. l.loadAllEcShards()
  215. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  216. }
  217. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  218. l.volumesLock.Lock()
  219. delVolsMap := l.unmountVolumeByCollection(collection)
  220. l.volumesLock.Unlock()
  221. l.ecVolumesLock.Lock()
  222. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  223. l.ecVolumesLock.Unlock()
  224. errChain := make(chan error, 2)
  225. var wg sync.WaitGroup
  226. wg.Add(2)
  227. go func() {
  228. for _, v := range delVolsMap {
  229. if err := v.Destroy(false); err != nil {
  230. errChain <- err
  231. }
  232. }
  233. wg.Done()
  234. }()
  235. go func() {
  236. for _, v := range delEcVolsMap {
  237. v.Destroy()
  238. }
  239. wg.Done()
  240. }()
  241. go func() {
  242. wg.Wait()
  243. close(errChain)
  244. }()
  245. errBuilder := strings.Builder{}
  246. for err := range errChain {
  247. errBuilder.WriteString(err.Error())
  248. errBuilder.WriteString("; ")
  249. }
  250. if errBuilder.Len() > 0 {
  251. e = fmt.Errorf(errBuilder.String())
  252. }
  253. return
  254. }
  255. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
  256. v, ok := l.volumes[vid]
  257. if !ok {
  258. return
  259. }
  260. e = v.Destroy(onlyEmpty)
  261. if e != nil {
  262. return
  263. }
  264. found = true
  265. delete(l.volumes, vid)
  266. return
  267. }
  268. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  269. if fileInfo, found := l.LocateVolume(vid); found {
  270. return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
  271. }
  272. return false
  273. }
  274. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  275. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
  276. l.volumesLock.Lock()
  277. defer l.volumesLock.Unlock()
  278. _, ok := l.volumes[vid]
  279. if !ok {
  280. return ErrVolumeNotFound
  281. }
  282. _, err := l.deleteVolumeById(vid, onlyEmpty)
  283. return err
  284. }
  285. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  286. l.volumesLock.Lock()
  287. defer l.volumesLock.Unlock()
  288. v, ok := l.volumes[vid]
  289. if !ok {
  290. return ErrVolumeNotFound
  291. }
  292. v.Close()
  293. delete(l.volumes, vid)
  294. return nil
  295. }
  296. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  297. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  298. for k, v := range l.volumes {
  299. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  300. deltaVols[k] = v
  301. }
  302. }
  303. for k := range deltaVols {
  304. delete(l.volumes, k)
  305. }
  306. return deltaVols
  307. }
  308. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  309. l.volumesLock.Lock()
  310. defer l.volumesLock.Unlock()
  311. l.volumes[vid] = volume
  312. volume.location = l
  313. }
  314. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  315. l.volumesLock.RLock()
  316. defer l.volumesLock.RUnlock()
  317. v, ok := l.volumes[vid]
  318. return v, ok
  319. }
  320. func (l *DiskLocation) VolumesLen() int {
  321. l.volumesLock.RLock()
  322. defer l.volumesLock.RUnlock()
  323. return len(l.volumes)
  324. }
  325. func (l *DiskLocation) SetStopping() {
  326. l.volumesLock.Lock()
  327. for _, v := range l.volumes {
  328. v.SyncToDisk()
  329. }
  330. l.volumesLock.Unlock()
  331. return
  332. }
  333. func (l *DiskLocation) Close() {
  334. l.volumesLock.Lock()
  335. for _, v := range l.volumes {
  336. v.Close()
  337. }
  338. l.volumesLock.Unlock()
  339. l.ecVolumesLock.Lock()
  340. for _, ecVolume := range l.ecVolumes {
  341. ecVolume.Close()
  342. }
  343. l.ecVolumesLock.Unlock()
  344. close(l.closeCh)
  345. return
  346. }
  347. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  348. // println("LocateVolume", vid, "on", l.Directory)
  349. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  350. for _, entry := range dirEntries {
  351. // println("checking", entry.Name(), "...")
  352. volId, _, err := volumeIdFromFileName(entry.Name())
  353. // println("volId", volId, "err", err)
  354. if vid == volId && err == nil {
  355. return entry, true
  356. }
  357. }
  358. }
  359. return nil, false
  360. }
  361. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  362. l.volumesLock.RLock()
  363. defer l.volumesLock.RUnlock()
  364. for _, vol := range l.volumes {
  365. if vol.IsReadOnly() {
  366. continue
  367. }
  368. datSize, idxSize, _ := vol.FileStat()
  369. unUsedSpace += volumeSizeLimit - (datSize + idxSize)
  370. }
  371. return
  372. }
  373. func (l *DiskLocation) CheckDiskSpace() {
  374. if dir, e := filepath.Abs(l.Directory); e == nil {
  375. s := stats.NewDiskStatus(dir)
  376. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  377. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  378. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  379. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  380. if isLow != l.isDiskSpaceLow {
  381. l.isDiskSpaceLow = !l.isDiskSpaceLow
  382. }
  383. logLevel := glog.Level(4)
  384. if l.isDiskSpaceLow {
  385. logLevel = glog.Level(0)
  386. }
  387. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  388. }
  389. }