|
|
@ -553,6 +553,15 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader |
|
|
}) |
|
|
}) |
|
|
if createErr != nil { |
|
|
if createErr != nil { |
|
|
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) |
|
|
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) |
|
|
|
|
|
|
|
|
|
|
|
// CRITICAL: Cleanup orphaned chunks before returning error
|
|
|
|
|
|
// If CreateEntry fails, the uploaded chunks are orphaned and must be deleted
|
|
|
|
|
|
// to prevent resource leaks and wasted storage
|
|
|
|
|
|
if len(chunkResult.FileChunks) > 0 { |
|
|
|
|
|
glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) |
|
|
|
|
|
s3a.deleteOrphanedChunks(chunkResult.FileChunks) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} |
|
|
return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} |
|
|
} |
|
|
} |
|
|
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) |
|
|
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) |
|
|
@ -1705,3 +1714,88 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe |
|
|
func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult { |
|
|
func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult { |
|
|
return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object) |
|
|
return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// deleteOrphanedChunks attempts to delete chunks that were uploaded but whose entry creation failed
|
|
|
|
|
|
// This prevents resource leaks and wasted storage. Errors are logged but don't prevent cleanup attempts.
|
|
|
|
|
|
func (s3a *S3ApiServer) deleteOrphanedChunks(chunks []*filer_pb.FileChunk) { |
|
|
|
|
|
if len(chunks) == 0 { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Extract file IDs from chunks
|
|
|
|
|
|
var fileIds []string |
|
|
|
|
|
for _, chunk := range chunks { |
|
|
|
|
|
if chunk.GetFileIdString() != "" { |
|
|
|
|
|
fileIds = append(fileIds, chunk.GetFileIdString()) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if len(fileIds) == 0 { |
|
|
|
|
|
glog.Warningf("deleteOrphanedChunks: no valid file IDs found in %d chunks", len(chunks)) |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
glog.V(3).Infof("deleteOrphanedChunks: attempting to delete %d file IDs: %v", len(fileIds), fileIds) |
|
|
|
|
|
|
|
|
|
|
|
// Create a lookup function that queries the filer for volume locations
|
|
|
|
|
|
// This is similar to createLookupFileIdFunction but returns the format needed by DeleteFileIdsWithLookupVolumeId
|
|
|
|
|
|
lookupFunc := func(vids []string) (map[string]*operation.LookupResult, error) { |
|
|
|
|
|
results := make(map[string]*operation.LookupResult) |
|
|
|
|
|
|
|
|
|
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
|
|
// Query filer for all volume IDs at once
|
|
|
|
|
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ |
|
|
|
|
|
VolumeIds: vids, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Convert filer response to operation.LookupResult format
|
|
|
|
|
|
for vid, locs := range resp.LocationsMap { |
|
|
|
|
|
result := &operation.LookupResult{ |
|
|
|
|
|
VolumeOrFileId: vid, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for _, loc := range locs.Locations { |
|
|
|
|
|
result.Locations = append(result.Locations, operation.Location{ |
|
|
|
|
|
Url: loc.Url, |
|
|
|
|
|
PublicUrl: loc.PublicUrl, |
|
|
|
|
|
DataCenter: loc.DataCenter, |
|
|
|
|
|
GrpcPort: int(loc.GrpcPort), |
|
|
|
|
|
}) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
results[vid] = result |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return results, err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Attempt deletion using the operation package's batch delete with custom lookup
|
|
|
|
|
|
deleteResults := operation.DeleteFileIdsWithLookupVolumeId(s3a.option.GrpcDialOption, fileIds, lookupFunc) |
|
|
|
|
|
|
|
|
|
|
|
// Log results - track successes and failures
|
|
|
|
|
|
successCount := 0 |
|
|
|
|
|
failureCount := 0 |
|
|
|
|
|
for _, result := range deleteResults { |
|
|
|
|
|
if result.Error != "" { |
|
|
|
|
|
glog.Warningf("deleteOrphanedChunks: failed to delete chunk %s: %s (status: %d)", |
|
|
|
|
|
result.FileId, result.Error, result.Status) |
|
|
|
|
|
failureCount++ |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(4).Infof("deleteOrphanedChunks: successfully deleted chunk %s (size: %d bytes)", |
|
|
|
|
|
result.FileId, result.Size) |
|
|
|
|
|
successCount++ |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if failureCount > 0 { |
|
|
|
|
|
glog.Warningf("deleteOrphanedChunks: cleanup completed with %d successes and %d failures out of %d chunks", |
|
|
|
|
|
successCount, failureCount, len(fileIds)) |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(3).Infof("deleteOrphanedChunks: successfully deleted all %d orphaned chunks", successCount) |
|
|
|
|
|
} |
|
|
|
|
|
} |