You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							90 lines
						
					
					
						
							2.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							90 lines
						
					
					
						
							2.9 KiB
						
					
					
				
								package weed_server
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/operation"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/remote_storage"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/security"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/storage/needle"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/storage/types"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
							 | 
						|
									resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
							 | 
						|
									v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
							 | 
						|
									if v == nil {
							 | 
						|
										return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									remoteConf := req.RemoteConf
							 | 
						|
								
							 | 
						|
									client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
							 | 
						|
									if getClientErr != nil {
							 | 
						|
										return nil, fmt.Errorf("get remote client: %w", getClientErr)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									remoteStorageLocation := req.RemoteLocation
							 | 
						|
								
							 | 
						|
									data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
							 | 
						|
									if ReadRemoteErr != nil {
							 | 
						|
										return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
									wg.Add(1)
							 | 
						|
									go func() {
							 | 
						|
										defer wg.Done()
							 | 
						|
										n := new(needle.Needle)
							 | 
						|
										n.Id = types.NeedleId(req.NeedleId)
							 | 
						|
										n.Cookie = types.Cookie(req.Cookie)
							 | 
						|
										n.Data, n.DataSize = data, uint32(len(data))
							 | 
						|
										// copied from *Needle.prepareWriteBuffer()
							 | 
						|
										n.Size = 4 + types.Size(n.DataSize) + 1
							 | 
						|
										n.Checksum = needle.NewCRC(n.Data)
							 | 
						|
										n.LastModified = uint64(time.Now().Unix())
							 | 
						|
										n.SetHasLastModifiedDate()
							 | 
						|
										if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
							 | 
						|
											if err == nil {
							 | 
						|
												err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, localWriteErr)
							 | 
						|
											}
							 | 
						|
										} else {
							 | 
						|
											resp.ETag = n.Etag()
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
									if len(req.Replicas) > 0 {
							 | 
						|
										fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
							 | 
						|
										for _, replica := range req.Replicas {
							 | 
						|
											wg.Add(1)
							 | 
						|
											go func(targetVolumeServer string) {
							 | 
						|
												defer wg.Done()
							 | 
						|
												uploadOption := &operation.UploadOption{
							 | 
						|
													UploadUrl:         fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
							 | 
						|
													Filename:          "",
							 | 
						|
													Cipher:            false,
							 | 
						|
													IsInputCompressed: false,
							 | 
						|
													MimeType:          "",
							 | 
						|
													PairMap:           nil,
							 | 
						|
													Jwt:               security.EncodedJwt(req.Auth),
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												uploader, uploaderErr := operation.NewUploader()
							 | 
						|
												if uploaderErr != nil && err == nil {
							 | 
						|
													err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
							 | 
						|
													return
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
							 | 
						|
													err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
							 | 
						|
												}
							 | 
						|
											}(replica.Url)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									wg.Wait()
							 | 
						|
								
							 | 
						|
									return resp, err
							 | 
						|
								}
							 |