You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
11 KiB

5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/stats"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. DirectoryUuid string
  22. IdxDirectory string
  23. DiskType types.DiskType
  24. MaxVolumeCount int
  25. OriginalMaxVolumeCount int
  26. MinFreeSpace util.MinFreeSpace
  27. volumes map[needle.VolumeId]*Volume
  28. volumesLock sync.RWMutex
  29. // erasure coding
  30. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  31. ecVolumesLock sync.RWMutex
  32. isDiskSpaceLow bool
  33. }
  34. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  35. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  36. dirUuidString = ""
  37. fileName := dir + "/vol_dir.uuid"
  38. if !util.FileExists(fileName) {
  39. dirUuid, _ := uuid.NewRandom()
  40. dirUuidString = dirUuid.String()
  41. writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
  42. if writeErr != nil {
  43. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
  44. }
  45. } else {
  46. uuidData, readErr := os.ReadFile(fileName)
  47. if readErr != nil {
  48. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  49. }
  50. dirUuidString = string(uuidData)
  51. }
  52. return dirUuidString, nil
  53. }
  54. func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  55. dir = util.ResolvePath(dir)
  56. if idxDir == "" {
  57. idxDir = dir
  58. } else {
  59. idxDir = util.ResolvePath(idxDir)
  60. }
  61. dirUuid, err := GenerateDirUuid(dir)
  62. if err != nil {
  63. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  64. }
  65. location := &DiskLocation{
  66. Directory: dir,
  67. DirectoryUuid: dirUuid,
  68. IdxDirectory: idxDir,
  69. DiskType: diskType,
  70. MaxVolumeCount: maxVolumeCount,
  71. OriginalMaxVolumeCount: maxVolumeCount,
  72. MinFreeSpace: minFreeSpace,
  73. }
  74. location.volumes = make(map[needle.VolumeId]*Volume)
  75. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  76. go location.CheckDiskSpace()
  77. return location
  78. }
  79. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  80. if isValidVolume(filename) {
  81. base := filename[:len(filename)-4]
  82. collection, volumeId, err := parseCollectionVolumeId(base)
  83. return volumeId, collection, err
  84. }
  85. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  86. }
  87. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  88. i := strings.LastIndex(base, "_")
  89. if i > 0 {
  90. collection, base = base[0:i], base[i+1:]
  91. }
  92. vol, err := needle.NewVolumeId(base)
  93. return collection, vol, err
  94. }
  95. func isValidVolume(basename string) bool {
  96. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  97. }
  98. func getValidVolumeName(basename string) string {
  99. if isValidVolume(basename) {
  100. return basename[:len(basename)-4]
  101. }
  102. return ""
  103. }
  104. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool) bool {
  105. basename := dirEntry.Name()
  106. if dirEntry.IsDir() {
  107. return false
  108. }
  109. volumeName := getValidVolumeName(basename)
  110. if volumeName == "" {
  111. return false
  112. }
  113. // skip if ec volumes exists
  114. if skipIfEcVolumesExists {
  115. if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
  116. return false
  117. }
  118. }
  119. // check for incomplete volume
  120. noteFile := l.Directory + "/" + volumeName + ".note"
  121. if util.FileExists(noteFile) {
  122. note, _ := os.ReadFile(noteFile)
  123. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  124. removeVolumeFiles(l.Directory + "/" + volumeName)
  125. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  126. return false
  127. }
  128. // parse out collection, volume id
  129. vid, collection, err := volumeIdFromFileName(basename)
  130. if err != nil {
  131. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  132. return false
  133. }
  134. // avoid loading one volume more than once
  135. l.volumesLock.RLock()
  136. _, found := l.volumes[vid]
  137. l.volumesLock.RUnlock()
  138. if found {
  139. glog.V(1).Infof("loaded volume, %v", vid)
  140. return true
  141. }
  142. // load the volume
  143. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
  144. if e != nil {
  145. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  146. return false
  147. }
  148. l.SetVolume(vid, v)
  149. size, _, _ := v.FileStat()
  150. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  151. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  152. return true
  153. }
  154. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
  155. task_queue := make(chan os.DirEntry, 10*concurrency)
  156. go func() {
  157. foundVolumeNames := make(map[string]bool)
  158. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  159. for _, entry := range dirEntries {
  160. volumeName := getValidVolumeName(entry.Name())
  161. if volumeName == "" {
  162. continue
  163. }
  164. if _, found := foundVolumeNames[volumeName]; !found {
  165. foundVolumeNames[volumeName] = true
  166. task_queue <- entry
  167. }
  168. }
  169. }
  170. close(task_queue)
  171. }()
  172. var wg sync.WaitGroup
  173. for workerNum := 0; workerNum < concurrency; workerNum++ {
  174. wg.Add(1)
  175. go func() {
  176. defer wg.Done()
  177. for fi := range task_queue {
  178. _ = l.loadExistingVolume(fi, needleMapKind, true)
  179. }
  180. }()
  181. }
  182. wg.Wait()
  183. }
  184. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
  185. workerNum := runtime.NumCPU()
  186. val, ok := os.LookupEnv("GOMAXPROCS")
  187. if ok {
  188. num, err := strconv.Atoi(val)
  189. if err != nil || num < 1 {
  190. num = 10
  191. glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
  192. }
  193. workerNum = num
  194. } else {
  195. if workerNum <= 10 {
  196. workerNum = 10
  197. }
  198. }
  199. l.concurrentLoadingVolumes(needleMapKind, workerNum)
  200. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  201. l.loadAllEcShards()
  202. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  203. }
  204. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  205. l.volumesLock.Lock()
  206. delVolsMap := l.unmountVolumeByCollection(collection)
  207. l.volumesLock.Unlock()
  208. l.ecVolumesLock.Lock()
  209. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  210. l.ecVolumesLock.Unlock()
  211. errChain := make(chan error, 2)
  212. var wg sync.WaitGroup
  213. wg.Add(2)
  214. go func() {
  215. for _, v := range delVolsMap {
  216. if err := v.Destroy(); err != nil {
  217. errChain <- err
  218. }
  219. }
  220. wg.Done()
  221. }()
  222. go func() {
  223. for _, v := range delEcVolsMap {
  224. v.Destroy()
  225. }
  226. wg.Done()
  227. }()
  228. go func() {
  229. wg.Wait()
  230. close(errChain)
  231. }()
  232. errBuilder := strings.Builder{}
  233. for err := range errChain {
  234. errBuilder.WriteString(err.Error())
  235. errBuilder.WriteString("; ")
  236. }
  237. if errBuilder.Len() > 0 {
  238. e = fmt.Errorf(errBuilder.String())
  239. }
  240. return
  241. }
  242. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
  243. v, ok := l.volumes[vid]
  244. if !ok {
  245. return
  246. }
  247. e = v.Destroy()
  248. if e != nil {
  249. return
  250. }
  251. found = true
  252. delete(l.volumes, vid)
  253. return
  254. }
  255. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  256. if fileInfo, found := l.LocateVolume(vid); found {
  257. return l.loadExistingVolume(fileInfo, needleMapKind, false)
  258. }
  259. return false
  260. }
  261. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  262. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
  263. l.volumesLock.Lock()
  264. defer l.volumesLock.Unlock()
  265. _, ok := l.volumes[vid]
  266. if !ok {
  267. return ErrVolumeNotFound
  268. }
  269. _, err := l.deleteVolumeById(vid)
  270. return err
  271. }
  272. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  273. l.volumesLock.Lock()
  274. defer l.volumesLock.Unlock()
  275. v, ok := l.volumes[vid]
  276. if !ok {
  277. return ErrVolumeNotFound
  278. }
  279. v.Close()
  280. delete(l.volumes, vid)
  281. return nil
  282. }
  283. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  284. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  285. for k, v := range l.volumes {
  286. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  287. deltaVols[k] = v
  288. }
  289. }
  290. for k := range deltaVols {
  291. delete(l.volumes, k)
  292. }
  293. return deltaVols
  294. }
  295. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  296. l.volumesLock.Lock()
  297. defer l.volumesLock.Unlock()
  298. l.volumes[vid] = volume
  299. volume.location = l
  300. }
  301. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  302. l.volumesLock.RLock()
  303. defer l.volumesLock.RUnlock()
  304. v, ok := l.volumes[vid]
  305. return v, ok
  306. }
  307. func (l *DiskLocation) VolumesLen() int {
  308. l.volumesLock.RLock()
  309. defer l.volumesLock.RUnlock()
  310. return len(l.volumes)
  311. }
  312. func (l *DiskLocation) SetStopping() {
  313. l.volumesLock.Lock()
  314. for _, v := range l.volumes {
  315. v.SetStopping()
  316. }
  317. l.volumesLock.Unlock()
  318. return
  319. }
  320. func (l *DiskLocation) Close() {
  321. l.volumesLock.Lock()
  322. for _, v := range l.volumes {
  323. v.Close()
  324. }
  325. l.volumesLock.Unlock()
  326. l.ecVolumesLock.Lock()
  327. for _, ecVolume := range l.ecVolumes {
  328. ecVolume.Close()
  329. }
  330. l.ecVolumesLock.Unlock()
  331. return
  332. }
  333. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  334. // println("LocateVolume", vid, "on", l.Directory)
  335. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  336. for _, entry := range dirEntries {
  337. // println("checking", entry.Name(), "...")
  338. volId, _, err := volumeIdFromFileName(entry.Name())
  339. // println("volId", volId, "err", err)
  340. if vid == volId && err == nil {
  341. return entry, true
  342. }
  343. }
  344. }
  345. return nil, false
  346. }
  347. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  348. l.volumesLock.RLock()
  349. defer l.volumesLock.RUnlock()
  350. for _, vol := range l.volumes {
  351. if vol.IsReadOnly() {
  352. continue
  353. }
  354. datSize, idxSize, _ := vol.FileStat()
  355. unUsedSpace += volumeSizeLimit - (datSize + idxSize)
  356. }
  357. return
  358. }
  359. func (l *DiskLocation) CheckDiskSpace() {
  360. for {
  361. if dir, e := filepath.Abs(l.Directory); e == nil {
  362. s := stats.NewDiskStatus(dir)
  363. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  364. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  365. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  366. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  367. if isLow != l.isDiskSpaceLow {
  368. l.isDiskSpaceLow = !l.isDiskSpaceLow
  369. }
  370. logLevel := glog.Level(4)
  371. if l.isDiskSpaceLow {
  372. logLevel = glog.Level(0)
  373. }
  374. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  375. }
  376. time.Sleep(time.Minute)
  377. }
  378. }