|
|
|
@ -911,28 +911,20 @@ async fn get_or_head_handler_inner( |
|
|
|
return proxy_or_redirect_to_target(&state, info, vid, false).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Download throttling
|
|
|
|
// Download throttling — matches Go's checkDownloadLimit + waitForDownloadSlot
|
|
|
|
let download_guard = if state.concurrent_download_limit > 0 {
|
|
|
|
let timeout = state.inflight_download_data_timeout;
|
|
|
|
let deadline = tokio::time::Instant::now() + timeout;
|
|
|
|
let query_string = request.uri().query().unwrap_or("").to_string();
|
|
|
|
let mut noted_over_limit = false;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
let current = state.inflight_download_bytes.load(Ordering::Relaxed);
|
|
|
|
if current <= state.concurrent_download_limit {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if !noted_over_limit {
|
|
|
|
metrics::HANDLER_COUNTER
|
|
|
|
.with_label_values(&[metrics::DOWNLOAD_LIMIT_COND])
|
|
|
|
.inc();
|
|
|
|
noted_over_limit = true;
|
|
|
|
}
|
|
|
|
let current = state.inflight_download_bytes.load(Ordering::Relaxed);
|
|
|
|
if current > state.concurrent_download_limit {
|
|
|
|
metrics::HANDLER_COUNTER
|
|
|
|
.with_label_values(&[metrics::DOWNLOAD_LIMIT_COND])
|
|
|
|
.inc();
|
|
|
|
|
|
|
|
// Match Go's checkDownloadLimit: when this request has not already been
|
|
|
|
// proxied and the local volume has replicas, try another replica first.
|
|
|
|
// Go tries proxy to replica ONCE before entering the blocking wait
|
|
|
|
// loop (checkDownloadLimit L65). It does NOT retry on each wakeup.
|
|
|
|
let should_try_replica =
|
|
|
|
!query_string.contains("proxied=true") && !state.master_url.is_empty() && {
|
|
|
|
let store = state.store.read().unwrap();
|
|
|
|
@ -948,15 +940,22 @@ async fn get_or_head_handler_inner( |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if tokio::time::timeout_at(deadline, state.download_notify.notified())
|
|
|
|
.await
|
|
|
|
.is_err()
|
|
|
|
{
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::TOO_MANY_REQUESTS,
|
|
|
|
"download limit exceeded",
|
|
|
|
raw_query.as_deref(),
|
|
|
|
);
|
|
|
|
// Blocking wait loop (Go's waitForDownloadSlot)
|
|
|
|
loop {
|
|
|
|
if tokio::time::timeout_at(deadline, state.download_notify.notified())
|
|
|
|
.await
|
|
|
|
.is_err()
|
|
|
|
{
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::TOO_MANY_REQUESTS,
|
|
|
|
"download limit exceeded",
|
|
|
|
raw_query.as_deref(),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
let current = state.inflight_download_bytes.load(Ordering::Relaxed);
|
|
|
|
if current <= state.concurrent_download_limit {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// We'll set the actual bytes after reading the needle (once we know the size)
|
|
|
|
|