You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							141 lines
						
					
					
						
							4.3 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							141 lines
						
					
					
						
							4.3 KiB
						
					
					
				| package weed_server | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/operation" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/needle" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/super_block" | |
| ) | |
| 
 | |
| func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { | |
| 
 | |
| 	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) | |
| 	if v == nil { | |
| 		return fmt.Errorf("not found volume id %d", req.VolumeId) | |
| 	} | |
| 
 | |
| 	defer glog.V(1).Infof("tailing volume %d finished", v.Id) | |
| 
 | |
| 	lastTimestampNs := req.SinceNs | |
| 	drainingSeconds := req.IdleTimeoutSeconds | |
| 
 | |
| 	for { | |
| 		lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) | |
| 		if err != nil { | |
| 			glog.Infof("sendNeedlesSince: %v", err) | |
| 			return fmt.Errorf("streamFollow: %v", err) | |
| 		} | |
| 		time.Sleep(2 * time.Second) | |
| 
 | |
| 		if req.IdleTimeoutSeconds == 0 { | |
| 			lastTimestampNs = lastProcessedTimestampNs | |
| 			continue | |
| 		} | |
| 		if lastProcessedTimestampNs == lastTimestampNs { | |
| 			drainingSeconds-- | |
| 			if drainingSeconds <= 0 { | |
| 				return nil | |
| 			} | |
| 			glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds) | |
| 		} else { | |
| 			lastTimestampNs = lastProcessedTimestampNs | |
| 			drainingSeconds = req.IdleTimeoutSeconds | |
| 			glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) | |
| 		} | |
| 
 | |
| 	} | |
| 
 | |
| } | |
| 
 | |
| func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { | |
| 
 | |
| 	foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) | |
| 	if err != nil { | |
| 		return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) | |
| 	} | |
| 
 | |
| 	// log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne) | |
|  | |
| 	if isLastOne { | |
| 		// need to heart beat to the client to ensure the connection health | |
| 		sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())}) | |
| 		return lastTimestampNs, sendErr | |
| 	} | |
| 
 | |
| 	scanner := &VolumeFileScanner4Tailing{ | |
| 		stream:  stream, | |
| 		version: uint32(v.Version()), | |
| 	} | |
| 
 | |
| 	err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner) | |
| 
 | |
| 	return scanner.lastProcessedTimestampNs, err | |
| 
 | |
| } | |
| 
 | |
| func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) { | |
| 
 | |
| 	resp := &volume_server_pb.VolumeTailReceiverResponse{} | |
| 
 | |
| 	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) | |
| 	if v == nil { | |
| 		return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId) | |
| 	} | |
| 
 | |
| 	defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) | |
| 
 | |
| 	return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { | |
| 		_, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false) | |
| 		return err | |
| 	}) | |
| 
 | |
| } | |
| 
 | |
| // generate the volume idx | |
| type VolumeFileScanner4Tailing struct { | |
| 	stream                   volume_server_pb.VolumeServer_VolumeTailSenderServer | |
| 	lastProcessedTimestampNs uint64 | |
| 	version                  uint32 | |
| } | |
| 
 | |
| func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error { | |
| 	return nil | |
| 
 | |
| } | |
| func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool { | |
| 	return true | |
| } | |
| 
 | |
| func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { | |
| 	isLastChunk := false | |
| 
 | |
| 	// need to send body by chunks | |
| 	for i := 0; i < len(needleBody); i += BufferSizeLimit { | |
| 		stopOffset := i + BufferSizeLimit | |
| 		if stopOffset >= len(needleBody) { | |
| 			isLastChunk = true | |
| 			stopOffset = len(needleBody) | |
| 		} | |
| 
 | |
| 		sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{ | |
| 			NeedleHeader: needleHeader, | |
| 			NeedleBody:   needleBody[i:stopOffset], | |
| 			IsLastChunk:  isLastChunk, | |
| 			Version:      scanner.version, | |
| 		}) | |
| 		if sendErr != nil { | |
| 			return sendErr | |
| 		} | |
| 	} | |
| 
 | |
| 	scanner.lastProcessedTimestampNs = n.AppendAtNs | |
| 	return nil | |
| }
 |