You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

612 lines
18 KiB

4 years ago
5 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
  1. package shell
  2. import (
  3. "bufio"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "math"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "path/filepath"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/operation"
  17. "github.com/chrislusf/seaweedfs/weed/pb"
  18. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  19. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  20. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  21. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  22. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  23. "github.com/chrislusf/seaweedfs/weed/storage/types"
  24. "github.com/chrislusf/seaweedfs/weed/util"
  25. )
  26. func init() {
  27. Commands = append(Commands, &commandVolumeFsck{})
  28. }
  29. type commandVolumeFsck struct {
  30. env *CommandEnv
  31. }
  32. func (c *commandVolumeFsck) Name() string {
  33. return "volume.fsck"
  34. }
  35. func (c *commandVolumeFsck) Help() string {
  36. return `check all volumes to find entries not used by the filer
  37. Important assumption!!!
  38. the system is all used by one filer.
  39. This command works this way:
  40. 1. collect all file ids from all volumes, as set A
  41. 2. collect all file ids from the filer, as set B
  42. 3. find out the set A subtract B
  43. If -findMissingChunksInFiler is enabled, this works
  44. in a reverse way:
  45. 1. collect all file ids from all volumes, as set A
  46. 2. collect all file ids from the filer, as set B
  47. 3. find out the set B subtract A
  48. `
  49. }
  50. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  51. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  52. verbose := fsckCommand.Bool("v", false, "verbose mode")
  53. findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
  54. findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
  55. applyPurging := fsckCommand.Bool("forcePurge", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer")
  56. if err = fsckCommand.Parse(args); err != nil {
  57. return nil
  58. }
  59. if err = commandEnv.confirmIsLocked(args); err != nil {
  60. return
  61. }
  62. c.env = commandEnv
  63. // create a temp folder
  64. tempFolder, err := os.MkdirTemp("", "sw_fsck")
  65. if err != nil {
  66. return fmt.Errorf("failed to create temp folder: %v", err)
  67. }
  68. if *verbose {
  69. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  70. }
  71. defer os.RemoveAll(tempFolder)
  72. // collect all volume id locations
  73. volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
  74. if err != nil {
  75. return fmt.Errorf("failed to collect all volume locations: %v", err)
  76. }
  77. // collect each volume file ids
  78. for volumeId, vinfo := range volumeIdToVInfo {
  79. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
  80. if err != nil {
  81. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  82. }
  83. }
  84. if *findMissingChunksInFiler {
  85. // collect all filer file ids and paths
  86. if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose); err != nil {
  87. return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
  88. }
  89. // for each volume, check filer file ids
  90. if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
  91. return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
  92. }
  93. } else {
  94. // collect all filer file ids
  95. if err = c.collectFilerFileIds(volumeIdToVInfo, tempFolder, writer, *verbose); err != nil {
  96. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  97. }
  98. // volume file ids subtract filer file ids
  99. if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
  100. return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
  101. }
  102. }
  103. return nil
  104. }
  105. func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool) error {
  106. if verbose {
  107. fmt.Fprintf(writer, "checking each file from filer ...\n")
  108. }
  109. files := make(map[uint32]*os.File)
  110. for vid := range volumeIdToServer {
  111. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  112. if openErr != nil {
  113. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  114. }
  115. files[vid] = dst
  116. }
  117. defer func() {
  118. for _, f := range files {
  119. f.Close()
  120. }
  121. }()
  122. type Item struct {
  123. vid uint32
  124. fileKey uint64
  125. cookie uint32
  126. path util.FullPath
  127. }
  128. return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  129. if verbose && entry.Entry.IsDirectory {
  130. fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
  131. }
  132. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  133. if resolveErr != nil {
  134. return nil
  135. }
  136. dChunks = append(dChunks, mChunks...)
  137. for _, chunk := range dChunks {
  138. outputChan <- &Item{
  139. vid: chunk.Fid.VolumeId,
  140. fileKey: chunk.Fid.FileKey,
  141. cookie: chunk.Fid.Cookie,
  142. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  143. }
  144. }
  145. return nil
  146. }, func(outputChan chan interface{}) {
  147. buffer := make([]byte, 16)
  148. for item := range outputChan {
  149. i := item.(*Item)
  150. if f, ok := files[i.vid]; ok {
  151. util.Uint64toBytes(buffer, i.fileKey)
  152. util.Uint32toBytes(buffer[8:], i.cookie)
  153. util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
  154. f.Write(buffer)
  155. f.Write([]byte(i.path))
  156. // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
  157. } else {
  158. fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
  159. }
  160. }
  161. })
  162. }
  163. func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
  164. for volumeId, vinfo := range volumeIdToVInfo {
  165. checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose, applyPurging)
  166. if checkErr != nil {
  167. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  168. }
  169. }
  170. return nil
  171. }
  172. func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
  173. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  174. for volumeId, vinfo := range volumeIdToVInfo {
  175. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
  176. if checkErr != nil {
  177. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  178. }
  179. totalInUseCount += inUseCount
  180. totalOrphanChunkCount += uint64(len(orphanFileIds))
  181. totalOrphanDataSize += orphanDataSize
  182. if verbose {
  183. for _, fid := range orphanFileIds {
  184. fmt.Fprintf(writer, "%s\n", fid)
  185. }
  186. }
  187. if len(orphanFileIds) > 0 {
  188. if *applyPurging {
  189. if verbose {
  190. fmt.Fprintf(writer, "purging process for volume %d", volumeId)
  191. }
  192. if vinfo.isEcVolume {
  193. fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
  194. continue
  195. }
  196. needleVID := needle.VolumeId(volumeId)
  197. if vinfo.isReadOnly {
  198. err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true)
  199. if err != nil {
  200. return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
  201. }
  202. fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
  203. defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false)
  204. }
  205. fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
  206. // FIXME Does not delete all replicas of this volumeId on all volume servers. Using fileId based deletion in all cases for now.
  207. // if inUseCount < 1 {
  208. // if verbose {
  209. // fmt.Fprintf(writer, "removing empty (all content orphaned) volume %d\n", volumeId)
  210. // }
  211. // if err := deleteVolume(c.env.option.GrpcDialOption, needleVID, vinfo.server); err != nil {
  212. // return fmt.Errorf("removing empty volume %d: %v", volumeId, err)
  213. // }
  214. // } else {
  215. if verbose {
  216. fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
  217. }
  218. if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
  219. return fmt.Errorf("purging volume %d: %v", volumeId, err)
  220. }
  221. }
  222. }
  223. }
  224. if !*applyPurging {
  225. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  226. fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  227. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  228. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  229. }
  230. if totalOrphanChunkCount == 0 {
  231. fmt.Fprintf(writer, "no orphan data\n")
  232. //return nil
  233. }
  234. return nil
  235. }
  236. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
  237. if verbose {
  238. fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  239. }
  240. return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  241. ext := ".idx"
  242. if vinfo.isEcVolume {
  243. ext = ".ecx"
  244. }
  245. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  246. VolumeId: volumeId,
  247. Ext: ext,
  248. CompactionRevision: math.MaxUint32,
  249. StopOffset: math.MaxInt64,
  250. Collection: vinfo.collection,
  251. IsEcVolume: vinfo.isEcVolume,
  252. IgnoreSourceFileNotFound: false,
  253. })
  254. if err != nil {
  255. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  256. }
  257. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  258. if err != nil {
  259. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
  260. }
  261. return nil
  262. })
  263. }
  264. func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error {
  265. if verbose {
  266. fmt.Fprintf(writer, "collecting file ids from filer ...\n")
  267. }
  268. files := make(map[uint32]*os.File)
  269. for vid := range volumeIdToServer {
  270. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  271. if openErr != nil {
  272. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  273. }
  274. files[vid] = dst
  275. }
  276. defer func() {
  277. for _, f := range files {
  278. f.Close()
  279. }
  280. }()
  281. type Item struct {
  282. vid uint32
  283. fileKey uint64
  284. }
  285. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  286. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  287. if resolveErr != nil {
  288. if verbose {
  289. fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
  290. }
  291. return nil
  292. }
  293. dChunks = append(dChunks, mChunks...)
  294. for _, chunk := range dChunks {
  295. outputChan <- &Item{
  296. vid: chunk.Fid.VolumeId,
  297. fileKey: chunk.Fid.FileKey,
  298. }
  299. }
  300. return nil
  301. }, func(outputChan chan interface{}) {
  302. buffer := make([]byte, 8)
  303. for item := range outputChan {
  304. i := item.(*Item)
  305. util.Uint64toBytes(buffer, i.fileKey)
  306. files[i.vid].Write(buffer)
  307. }
  308. })
  309. }
  310. func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool, applyPurging *bool) (err error) {
  311. if verbose {
  312. fmt.Fprintf(writer, "find missing file chunks in volume %d ...\n", volumeId)
  313. }
  314. db := needle_map.NewMemDb()
  315. defer db.Close()
  316. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  317. return
  318. }
  319. file := getFilerFileIdFile(tempFolder, volumeId)
  320. fp, err := os.Open(file)
  321. if err != nil {
  322. return
  323. }
  324. defer fp.Close()
  325. type Item struct {
  326. fileKey uint64
  327. cookie uint32
  328. path util.FullPath
  329. }
  330. br := bufio.NewReader(fp)
  331. buffer := make([]byte, 16)
  332. item := &Item{}
  333. var readSize int
  334. for {
  335. readSize, err = io.ReadFull(br, buffer)
  336. if err != nil || readSize != 16 {
  337. break
  338. }
  339. item.fileKey = util.BytesToUint64(buffer[:8])
  340. item.cookie = util.BytesToUint32(buffer[8:12])
  341. pathSize := util.BytesToUint32(buffer[12:16])
  342. pathBytes := make([]byte, int(pathSize))
  343. n, err := io.ReadFull(br, pathBytes)
  344. if err != nil {
  345. fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
  346. }
  347. if n != int(pathSize) {
  348. fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
  349. }
  350. item.path = util.FullPath(string(pathBytes))
  351. needleId := types.NeedleId(item.fileKey)
  352. if _, found := db.Get(needleId); !found {
  353. fmt.Fprintf(writer, "%s\n", item.path)
  354. if *applyPurging {
  355. req, err := http.NewRequest(http.MethodDelete, "", nil)
  356. // defining the URL this way automatically escapes complex path names
  357. req.URL = &url.URL{
  358. Scheme: "http",
  359. Host: c.env.option.FilerAddress.ToHttpAddress(),
  360. Path: string(item.path),
  361. }
  362. if verbose {
  363. fmt.Printf("full HTTP delete request to be sent: %v\n", req)
  364. }
  365. if err != nil {
  366. fmt.Errorf("HTTP delete request error: %v\n", err)
  367. }
  368. client := &http.Client{}
  369. resp, err := client.Do(req)
  370. if err != nil {
  371. fmt.Errorf("DELETE fetch error: %v\n", err)
  372. }
  373. defer resp.Body.Close()
  374. _, err = ioutil.ReadAll(resp.Body)
  375. if err != nil {
  376. fmt.Errorf("DELETE response error: %v\n", err)
  377. }
  378. if verbose {
  379. fmt.Println("delete response Status : ", resp.Status)
  380. fmt.Println("delete response Headers : ", resp.Header)
  381. }
  382. }
  383. }
  384. }
  385. return nil
  386. }
  387. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  388. db := needle_map.NewMemDb()
  389. defer db.Close()
  390. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  391. return
  392. }
  393. filerFileIdsData, err := os.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  394. if err != nil {
  395. return
  396. }
  397. dataLen := len(filerFileIdsData)
  398. if dataLen%8 != 0 {
  399. return 0, nil, 0, fmt.Errorf("filer data is corrupted")
  400. }
  401. for i := 0; i < len(filerFileIdsData); i += 8 {
  402. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  403. db.Delete(types.NeedleId(fileKey))
  404. inUseCount++
  405. }
  406. var orphanFileCount uint64
  407. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  408. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  409. orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
  410. orphanFileCount++
  411. orphanDataSize += uint64(n.Size)
  412. return nil
  413. })
  414. if orphanFileCount > 0 {
  415. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  416. fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  417. volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  418. }
  419. return
  420. }
  421. type VInfo struct {
  422. server pb.ServerAddress
  423. collection string
  424. isEcVolume bool
  425. isReadOnly bool
  426. }
  427. func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
  428. if verbose {
  429. fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
  430. }
  431. volumeIdToServer = make(map[uint32]VInfo)
  432. // collect topology information
  433. topologyInfo, _, err := collectTopologyInfo(commandEnv)
  434. if err != nil {
  435. return
  436. }
  437. eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  438. for _, diskInfo := range t.DiskInfos {
  439. for _, vi := range diskInfo.VolumeInfos {
  440. volumeIdToServer[vi.Id] = VInfo{
  441. server: pb.NewServerAddressFromDataNode(t),
  442. collection: vi.Collection,
  443. isEcVolume: false,
  444. isReadOnly: vi.ReadOnly,
  445. }
  446. }
  447. for _, ecShardInfo := range diskInfo.EcShardInfos {
  448. volumeIdToServer[ecShardInfo.Id] = VInfo{
  449. server: pb.NewServerAddressFromDataNode(t),
  450. collection: ecShardInfo.Collection,
  451. isEcVolume: true,
  452. isReadOnly: true,
  453. }
  454. }
  455. }
  456. })
  457. if verbose {
  458. fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
  459. }
  460. return
  461. }
  462. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
  463. fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
  464. locations, found := c.env.MasterClient.GetLocations(volumeId)
  465. if !found {
  466. return fmt.Errorf("failed to find volume %d locations", volumeId)
  467. }
  468. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  469. var wg sync.WaitGroup
  470. for _, location := range locations {
  471. wg.Add(1)
  472. go func(server pb.ServerAddress, fidList []string) {
  473. defer wg.Done()
  474. if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  475. err = deleteErr
  476. } else if deleteResults != nil {
  477. resultChan <- deleteResults
  478. }
  479. }(location.ServerAddress(), fileIds)
  480. }
  481. wg.Wait()
  482. close(resultChan)
  483. for results := range resultChan {
  484. for _, result := range results {
  485. if result.Error != "" {
  486. fmt.Fprintf(writer, "purge error: %s\n", result.Error)
  487. }
  488. }
  489. }
  490. return
  491. }
  492. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  493. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  494. }
  495. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  496. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  497. }
  498. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  499. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  500. dst, err := os.OpenFile(fileName, flags, 0644)
  501. if err != nil {
  502. return nil
  503. }
  504. defer dst.Close()
  505. for {
  506. resp, receiveErr := client.Recv()
  507. if receiveErr == io.EOF {
  508. break
  509. }
  510. if receiveErr != nil {
  511. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  512. }
  513. dst.Write(resp.FileContent)
  514. }
  515. return nil
  516. }