|
|
@ -677,6 +677,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R |
|
|
|
|
|
|
|
|
// Get chunks
|
|
|
// Get chunks
|
|
|
chunks := entry.GetChunks() |
|
|
chunks := entry.GetChunks() |
|
|
|
|
|
glog.V(3).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d", |
|
|
|
|
|
len(chunks), totalSize, isRangeRequest, offset, size) |
|
|
|
|
|
|
|
|
if len(chunks) == 0 { |
|
|
if len(chunks) == 0 { |
|
|
// BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
|
|
|
// BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
|
|
|
if totalSize > 0 && len(entry.Content) == 0 { |
|
|
if totalSize > 0 && len(entry.Content) == 0 { |
|
|
@ -691,6 +694,11 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Log chunk details
|
|
|
|
|
|
for i, chunk := range chunks { |
|
|
|
|
|
glog.V(3).Infof(" Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Create lookup function via filer client (reuse shared helper)
|
|
|
// Create lookup function via filer client (reuse shared helper)
|
|
|
ctx := r.Context() |
|
|
ctx := r.Context() |
|
|
lookupFileIdFn := s3a.createLookupFileIdFunction() |
|
|
lookupFileIdFn := s3a.createLookupFileIdFunction() |
|
|
@ -700,9 +708,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R |
|
|
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) |
|
|
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) |
|
|
chunkResolveTime = time.Since(tChunkResolve) |
|
|
chunkResolveTime = time.Since(tChunkResolve) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if !isRangeRequest { |
|
|
|
|
|
w.WriteHeader(http.StatusInternalServerError) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) |
|
|
|
|
|
// Don't try to write headers if we already wrote them for range request
|
|
|
return fmt.Errorf("failed to resolve chunks: %v", err) |
|
|
return fmt.Errorf("failed to resolve chunks: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -723,9 +730,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R |
|
|
) |
|
|
) |
|
|
streamPrepTime = time.Since(tStreamPrep) |
|
|
streamPrepTime = time.Since(tStreamPrep) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if !isRangeRequest { |
|
|
|
|
|
w.WriteHeader(http.StatusInternalServerError) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) |
|
|
|
|
|
// Don't try to write headers if we already wrote them for range request
|
|
|
return fmt.Errorf("failed to prepare stream: %v", err) |
|
|
return fmt.Errorf("failed to prepare stream: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|