You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

434 lines
10 KiB

5 years ago
5 years ago
5 years ago
6 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/google/uuid"
  17. )
  18. type DiskLocation struct {
  19. Directory string
  20. DirectoryUuid string
  21. IdxDirectory string
  22. DiskType types.DiskType
  23. MaxVolumeCount int
  24. OriginalMaxVolumeCount int
  25. MinFreeSpace util.MinFreeSpace
  26. volumes map[needle.VolumeId]*Volume
  27. volumesLock sync.RWMutex
  28. // erasure coding
  29. ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
  30. ecVolumesLock sync.RWMutex
  31. isDiskSpaceLow bool
  32. }
  33. func GenerateDirUuid(dir string) (dirUuidString string, err error) {
  34. glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
  35. dirUuidString = ""
  36. fileName := dir + "/vol_dir.uuid"
  37. if !util.FileExists(fileName) {
  38. dirUuid, _ := uuid.NewRandom()
  39. dirUuidString = dirUuid.String()
  40. writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
  41. if writeErr != nil {
  42. return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
  43. }
  44. } else {
  45. uuidData, readErr := os.ReadFile(fileName)
  46. if readErr != nil {
  47. return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
  48. }
  49. dirUuidString = string(uuidData)
  50. }
  51. return dirUuidString, nil
  52. }
  53. func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSpace, idxDir string, diskType types.DiskType) *DiskLocation {
  54. dir = util.ResolvePath(dir)
  55. if idxDir == "" {
  56. idxDir = dir
  57. } else {
  58. idxDir = util.ResolvePath(idxDir)
  59. }
  60. dirUuid, err := GenerateDirUuid(dir)
  61. if err != nil {
  62. glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
  63. }
  64. location := &DiskLocation{
  65. Directory: dir,
  66. DirectoryUuid: dirUuid,
  67. IdxDirectory: idxDir,
  68. DiskType: diskType,
  69. MaxVolumeCount: maxVolumeCount,
  70. OriginalMaxVolumeCount: maxVolumeCount,
  71. MinFreeSpace: minFreeSpace,
  72. }
  73. location.volumes = make(map[needle.VolumeId]*Volume)
  74. location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
  75. go location.CheckDiskSpace()
  76. return location
  77. }
  78. func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
  79. if isValidVolume(filename) {
  80. base := filename[:len(filename)-4]
  81. collection, volumeId, err := parseCollectionVolumeId(base)
  82. return volumeId, collection, err
  83. }
  84. return 0, "", fmt.Errorf("file is not a volume: %s", filename)
  85. }
  86. func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
  87. i := strings.LastIndex(base, "_")
  88. if i > 0 {
  89. collection, base = base[0:i], base[i+1:]
  90. }
  91. vol, err := needle.NewVolumeId(base)
  92. return collection, vol, err
  93. }
  94. func isValidVolume(basename string) bool {
  95. return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
  96. }
  97. func getValidVolumeName(basename string) string {
  98. if isValidVolume(basename) {
  99. return basename[:len(basename)-4]
  100. }
  101. return ""
  102. }
  103. func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool) bool {
  104. basename := dirEntry.Name()
  105. if dirEntry.IsDir() {
  106. return false
  107. }
  108. volumeName := getValidVolumeName(basename)
  109. if volumeName == "" {
  110. return false
  111. }
  112. // skip if ec volumes exists
  113. if skipIfEcVolumesExists {
  114. if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
  115. return false
  116. }
  117. }
  118. // check for incomplete volume
  119. noteFile := l.Directory + "/" + volumeName + ".note"
  120. if util.FileExists(noteFile) {
  121. note, _ := os.ReadFile(noteFile)
  122. glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
  123. removeVolumeFiles(l.Directory + "/" + volumeName)
  124. removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
  125. return false
  126. }
  127. // parse out collection, volume id
  128. vid, collection, err := volumeIdFromFileName(basename)
  129. if err != nil {
  130. glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
  131. return false
  132. }
  133. // avoid loading one volume more than once
  134. l.volumesLock.RLock()
  135. _, found := l.volumes[vid]
  136. l.volumesLock.RUnlock()
  137. if found {
  138. glog.V(1).Infof("loaded volume, %v", vid)
  139. return true
  140. }
  141. // load the volume
  142. v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
  143. if e != nil {
  144. glog.V(0).Infof("new volume %s error %s", volumeName, e)
  145. return false
  146. }
  147. l.SetVolume(vid, v)
  148. size, _, _ := v.FileStat()
  149. glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
  150. l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
  151. return true
  152. }
  153. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
  154. task_queue := make(chan os.DirEntry, 10*concurrency)
  155. go func() {
  156. foundVolumeNames := make(map[string]bool)
  157. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  158. for _, entry := range dirEntries {
  159. volumeName := getValidVolumeName(entry.Name())
  160. if volumeName == "" {
  161. continue
  162. }
  163. if _, found := foundVolumeNames[volumeName]; !found {
  164. foundVolumeNames[volumeName] = true
  165. task_queue <- entry
  166. }
  167. }
  168. }
  169. close(task_queue)
  170. }()
  171. var wg sync.WaitGroup
  172. for workerNum := 0; workerNum < concurrency; workerNum++ {
  173. wg.Add(1)
  174. go func() {
  175. defer wg.Done()
  176. for fi := range task_queue {
  177. _ = l.loadExistingVolume(fi, needleMapKind, true)
  178. }
  179. }()
  180. }
  181. wg.Wait()
  182. }
  183. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
  184. workerNum := runtime.NumCPU()
  185. if workerNum <= 10 {
  186. workerNum = 10
  187. }
  188. l.concurrentLoadingVolumes(needleMapKind, workerNum)
  189. glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
  190. l.loadAllEcShards()
  191. glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
  192. }
  193. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  194. l.volumesLock.Lock()
  195. delVolsMap := l.unmountVolumeByCollection(collection)
  196. l.volumesLock.Unlock()
  197. l.ecVolumesLock.Lock()
  198. delEcVolsMap := l.unmountEcVolumeByCollection(collection)
  199. l.ecVolumesLock.Unlock()
  200. errChain := make(chan error, 2)
  201. var wg sync.WaitGroup
  202. wg.Add(2)
  203. go func() {
  204. for _, v := range delVolsMap {
  205. if err := v.Destroy(); err != nil {
  206. errChain <- err
  207. }
  208. }
  209. wg.Done()
  210. }()
  211. go func() {
  212. for _, v := range delEcVolsMap {
  213. v.Destroy()
  214. }
  215. wg.Done()
  216. }()
  217. go func() {
  218. wg.Wait()
  219. close(errChain)
  220. }()
  221. errBuilder := strings.Builder{}
  222. for err := range errChain {
  223. errBuilder.WriteString(err.Error())
  224. errBuilder.WriteString("; ")
  225. }
  226. if errBuilder.Len() > 0 {
  227. e = fmt.Errorf(errBuilder.String())
  228. }
  229. return
  230. }
  231. func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
  232. v, ok := l.volumes[vid]
  233. if !ok {
  234. return
  235. }
  236. e = v.Destroy()
  237. if e != nil {
  238. return
  239. }
  240. found = true
  241. delete(l.volumes, vid)
  242. return
  243. }
  244. func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
  245. if fileInfo, found := l.LocateVolume(vid); found {
  246. return l.loadExistingVolume(fileInfo, needleMapKind, false)
  247. }
  248. return false
  249. }
  250. var ErrVolumeNotFound = fmt.Errorf("volume not found")
  251. func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
  252. l.volumesLock.Lock()
  253. defer l.volumesLock.Unlock()
  254. _, ok := l.volumes[vid]
  255. if !ok {
  256. return ErrVolumeNotFound
  257. }
  258. _, err := l.deleteVolumeById(vid)
  259. return err
  260. }
  261. func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
  262. l.volumesLock.Lock()
  263. defer l.volumesLock.Unlock()
  264. v, ok := l.volumes[vid]
  265. if !ok {
  266. return ErrVolumeNotFound
  267. }
  268. v.Close()
  269. delete(l.volumes, vid)
  270. return nil
  271. }
  272. func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
  273. deltaVols := make(map[needle.VolumeId]*Volume, 0)
  274. for k, v := range l.volumes {
  275. if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
  276. deltaVols[k] = v
  277. }
  278. }
  279. for k := range deltaVols {
  280. delete(l.volumes, k)
  281. }
  282. return deltaVols
  283. }
  284. func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
  285. l.volumesLock.Lock()
  286. defer l.volumesLock.Unlock()
  287. l.volumes[vid] = volume
  288. volume.location = l
  289. }
  290. func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
  291. l.volumesLock.RLock()
  292. defer l.volumesLock.RUnlock()
  293. v, ok := l.volumes[vid]
  294. return v, ok
  295. }
  296. func (l *DiskLocation) VolumesLen() int {
  297. l.volumesLock.RLock()
  298. defer l.volumesLock.RUnlock()
  299. return len(l.volumes)
  300. }
  301. func (l *DiskLocation) SetStopping() {
  302. l.volumesLock.Lock()
  303. for _, v := range l.volumes {
  304. v.SetStopping()
  305. }
  306. l.volumesLock.Unlock()
  307. return
  308. }
  309. func (l *DiskLocation) Close() {
  310. l.volumesLock.Lock()
  311. for _, v := range l.volumes {
  312. v.Close()
  313. }
  314. l.volumesLock.Unlock()
  315. l.ecVolumesLock.Lock()
  316. for _, ecVolume := range l.ecVolumes {
  317. ecVolume.Close()
  318. }
  319. l.ecVolumesLock.Unlock()
  320. return
  321. }
  322. func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
  323. // println("LocateVolume", vid, "on", l.Directory)
  324. if dirEntries, err := os.ReadDir(l.Directory); err == nil {
  325. for _, entry := range dirEntries {
  326. // println("checking", entry.Name(), "...")
  327. volId, _, err := volumeIdFromFileName(entry.Name())
  328. // println("volId", volId, "err", err)
  329. if vid == volId && err == nil {
  330. return entry, true
  331. }
  332. }
  333. }
  334. return nil, false
  335. }
  336. func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
  337. l.volumesLock.RLock()
  338. defer l.volumesLock.RUnlock()
  339. for _, vol := range l.volumes {
  340. if vol.IsReadOnly() {
  341. continue
  342. }
  343. datSize, idxSize, _ := vol.FileStat()
  344. unUsedSpace += volumeSizeLimit - (datSize + idxSize)
  345. }
  346. return
  347. }
  348. func (l *DiskLocation) CheckDiskSpace() {
  349. for {
  350. if dir, e := filepath.Abs(l.Directory); e == nil {
  351. s := stats.NewDiskStatus(dir)
  352. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
  353. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
  354. stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
  355. isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree)
  356. if isLow != l.isDiskSpaceLow {
  357. l.isDiskSpaceLow = !l.isDiskSpaceLow
  358. }
  359. logLevel := glog.Level(4)
  360. if l.isDiskSpaceLow {
  361. logLevel = glog.Level(0)
  362. }
  363. glog.V(logLevel).Infof("dir %s %s", dir, desc)
  364. }
  365. time.Sleep(time.Minute)
  366. }
  367. }