|
@@ -5,10 +5,10 @@ use crate::{
|
|
|
};
|
|
|
use activitystreams::iri_string::types::IriString;
|
|
|
use actix_web::http::header::Date;
|
|
|
-use awc::{error::SendRequestError, Client, ClientResponse, Connector};
|
|
|
use base64::{engine::general_purpose::STANDARD, Engine};
|
|
|
use dashmap::DashMap;
|
|
|
-use http_signature_normalization_actix::{digest::ring::Sha256, prelude::*};
|
|
|
+use http_signature_normalization_reqwest::{digest::ring::Sha256, prelude::*};
|
|
|
+use reqwest_middleware::ClientWithMiddleware;
|
|
|
use ring::{
|
|
|
rand::SystemRandom,
|
|
|
signature::{RsaKeyPair, RSA_PKCS1_SHA256},
|
|
@@ -18,13 +18,22 @@ use std::{
|
|
|
sync::Arc,
|
|
|
time::{Duration, SystemTime},
|
|
|
};
|
|
|
-use tracing_awc::Tracing;
|
|
|
|
|
|
const ONE_SECOND: u64 = 1;
|
|
|
const ONE_MINUTE: u64 = 60 * ONE_SECOND;
|
|
|
const ONE_HOUR: u64 = 60 * ONE_MINUTE;
|
|
|
const ONE_DAY: u64 = 24 * ONE_HOUR;
|
|
|
|
|
|
+#[derive(Debug)]
|
|
|
+pub(crate) enum BreakerStrategy {
|
|
|
+ // Requires a successful response
|
|
|
+ Require2XX,
|
|
|
+ // Allows HTTP 2xx-401
|
|
|
+ Allow401AndBelow,
|
|
|
+ // Allows HTTP 2xx-404
|
|
|
+ Allow404AndBelow,
|
|
|
+}
|
|
|
+
|
|
|
#[derive(Clone)]
|
|
|
pub(crate) struct Breakers {
|
|
|
inner: Arc<DashMap<String, Breaker>>,
|
|
@@ -139,10 +148,8 @@ impl Default for Breaker {
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
pub(crate) struct Requests {
|
|
|
- pool_size: usize,
|
|
|
- client: Client,
|
|
|
+ client: ClientWithMiddleware,
|
|
|
key_id: String,
|
|
|
- user_agent: String,
|
|
|
private_key: Arc<RsaKeyPair>,
|
|
|
rng: SystemRandom,
|
|
|
config: Config<Spawner>,
|
|
@@ -153,66 +160,39 @@ pub(crate) struct Requests {
|
|
|
impl std::fmt::Debug for Requests {
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
f.debug_struct("Requests")
|
|
|
- .field("pool_size", &self.pool_size)
|
|
|
.field("key_id", &self.key_id)
|
|
|
- .field("user_agent", &self.user_agent)
|
|
|
.field("config", &self.config)
|
|
|
.field("breakers", &self.breakers)
|
|
|
.finish()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-thread_local! {
|
|
|
- static CLIENT: std::cell::OnceCell<Client> = std::cell::OnceCell::new();
|
|
|
-}
|
|
|
-
|
|
|
-pub(crate) fn build_client(user_agent: &str, pool_size: usize, timeout_seconds: u64) -> Client {
|
|
|
- CLIENT.with(|client| {
|
|
|
- client
|
|
|
- .get_or_init(|| {
|
|
|
- let connector = Connector::new().limit(pool_size);
|
|
|
-
|
|
|
- Client::builder()
|
|
|
- .connector(connector)
|
|
|
- .wrap(Tracing)
|
|
|
- .add_default_header(("User-Agent", user_agent.to_string()))
|
|
|
- .timeout(Duration::from_secs(timeout_seconds))
|
|
|
- .finish()
|
|
|
- })
|
|
|
- .clone()
|
|
|
- })
|
|
|
-}
|
|
|
-
|
|
|
impl Requests {
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
pub(crate) fn new(
|
|
|
key_id: String,
|
|
|
private_key: RsaPrivateKey,
|
|
|
- user_agent: String,
|
|
|
breakers: Breakers,
|
|
|
last_online: Arc<LastOnline>,
|
|
|
- pool_size: usize,
|
|
|
- timeout_seconds: u64,
|
|
|
spawner: Spawner,
|
|
|
+ client: ClientWithMiddleware,
|
|
|
) -> Self {
|
|
|
let private_key_der = private_key.to_pkcs1_der().expect("Can encode der");
|
|
|
let private_key = ring::signature::RsaKeyPair::from_der(private_key_der.as_bytes())
|
|
|
.expect("Key is valid");
|
|
|
Requests {
|
|
|
- pool_size,
|
|
|
- client: build_client(&user_agent, pool_size, timeout_seconds),
|
|
|
+ client,
|
|
|
key_id,
|
|
|
- user_agent,
|
|
|
private_key: Arc::new(private_key),
|
|
|
rng: SystemRandom::new(),
|
|
|
- config: Config::new().mastodon_compat().spawner(spawner),
|
|
|
+ config: Config::new_with_spawner(spawner).mastodon_compat(),
|
|
|
breakers,
|
|
|
last_online,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
pub(crate) fn spawner(mut self, spawner: Spawner) -> Self {
|
|
|
- self.config = self.config.spawner(spawner);
|
|
|
+ self.config = self.config.set_spawner(spawner);
|
|
|
self
|
|
|
}
|
|
|
|
|
@@ -223,97 +203,126 @@ impl Requests {
|
|
|
async fn check_response(
|
|
|
&self,
|
|
|
parsed_url: &IriString,
|
|
|
- res: Result<ClientResponse, SendRequestError>,
|
|
|
- ) -> Result<ClientResponse, Error> {
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ res: Result<reqwest::Response, reqwest_middleware::Error>,
|
|
|
+ ) -> Result<reqwest::Response, Error> {
|
|
|
if res.is_err() {
|
|
|
self.breakers.fail(&parsed_url);
|
|
|
}
|
|
|
|
|
|
- let mut res =
|
|
|
- res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
|
|
|
+ let res = res?;
|
|
|
+
|
|
|
+ let status = res.status();
|
|
|
|
|
|
- if res.status().is_server_error() {
|
|
|
+ let success = match strategy {
|
|
|
+ BreakerStrategy::Require2XX => status.is_success(),
|
|
|
+ BreakerStrategy::Allow401AndBelow => (200..=401).contains(&status.as_u16()),
|
|
|
+ BreakerStrategy::Allow404AndBelow => (200..=404).contains(&status.as_u16()),
|
|
|
+ };
|
|
|
+
|
|
|
+ if !success {
|
|
|
self.breakers.fail(&parsed_url);
|
|
|
|
|
|
- if let Ok(bytes) = res.body().await {
|
|
|
- if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
|
|
- if !s.is_empty() {
|
|
|
- tracing::debug!("Response from {parsed_url}, {s}");
|
|
|
- }
|
|
|
+ if let Ok(s) = res.text().await {
|
|
|
+ if !s.is_empty() {
|
|
|
+ tracing::debug!("Response from {parsed_url}, {s}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
|
|
|
+ return Err(ErrorKind::Status(parsed_url.to_string(), status).into());
|
|
|
}
|
|
|
|
|
|
- self.last_online.mark_seen(&parsed_url);
|
|
|
- self.breakers.succeed(&parsed_url);
|
|
|
+ // only actually succeed a breaker on 2xx response
|
|
|
+ if status.is_success() {
|
|
|
+ self.last_online.mark_seen(&parsed_url);
|
|
|
+ self.breakers.succeed(&parsed_url);
|
|
|
+ }
|
|
|
|
|
|
Ok(res)
|
|
|
}
|
|
|
|
|
|
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
|
|
|
- pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
|
|
|
+ pub(crate) async fn fetch_json<T>(
|
|
|
+ &self,
|
|
|
+ url: &IriString,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<T, Error>
|
|
|
where
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
{
|
|
|
- self.do_fetch(url, "application/json").await
|
|
|
+ self.do_fetch(url, "application/json", strategy).await
|
|
|
}
|
|
|
|
|
|
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
|
|
|
- pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
|
|
|
+ pub(crate) async fn fetch_json_msky<T>(
|
|
|
+ &self,
|
|
|
+ url: &IriString,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<T, Error>
|
|
|
where
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
{
|
|
|
- let mut res = self
|
|
|
+ let body = self
|
|
|
.do_deliver(
|
|
|
url,
|
|
|
&serde_json::json!({}),
|
|
|
"application/json",
|
|
|
"application/json",
|
|
|
+ strategy,
|
|
|
)
|
|
|
+ .await?
|
|
|
+ .bytes()
|
|
|
.await?;
|
|
|
|
|
|
- let body = res
|
|
|
- .body()
|
|
|
- .await
|
|
|
- .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
|
|
|
-
|
|
|
- Ok(serde_json::from_slice(body.as_ref())?)
|
|
|
+ Ok(serde_json::from_slice(&body)?)
|
|
|
}
|
|
|
|
|
|
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
|
|
|
- pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
|
|
|
+ pub(crate) async fn fetch<T>(
|
|
|
+ &self,
|
|
|
+ url: &IriString,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<T, Error>
|
|
|
where
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
{
|
|
|
- self.do_fetch(url, "application/activity+json").await
|
|
|
+ self.do_fetch(url, "application/activity+json", strategy)
|
|
|
+ .await
|
|
|
}
|
|
|
|
|
|
- async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
|
|
|
+ async fn do_fetch<T>(
|
|
|
+ &self,
|
|
|
+ url: &IriString,
|
|
|
+ accept: &str,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<T, Error>
|
|
|
where
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
{
|
|
|
- let mut res = self.do_fetch_response(url, accept).await?;
|
|
|
-
|
|
|
- let body = res
|
|
|
- .body()
|
|
|
- .await
|
|
|
- .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
|
|
|
+ let body = self
|
|
|
+ .do_fetch_response(url, accept, strategy)
|
|
|
+ .await?
|
|
|
+ .bytes()
|
|
|
+ .await?;
|
|
|
|
|
|
- Ok(serde_json::from_slice(body.as_ref())?)
|
|
|
+ Ok(serde_json::from_slice(&body)?)
|
|
|
}
|
|
|
|
|
|
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
|
|
|
- pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
|
|
|
- self.do_fetch_response(url, "*/*").await
|
|
|
+ pub(crate) async fn fetch_response(
|
|
|
+ &self,
|
|
|
+ url: &IriString,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<reqwest::Response, Error> {
|
|
|
+ self.do_fetch_response(url, "*/*", strategy).await
|
|
|
}
|
|
|
|
|
|
pub(crate) async fn do_fetch_response(
|
|
|
&self,
|
|
|
url: &IriString,
|
|
|
accept: &str,
|
|
|
- ) -> Result<ClientResponse, Error> {
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<reqwest::Response, Error> {
|
|
|
if !self.breakers.should_try(url) {
|
|
|
return Err(ErrorKind::Breaker.into());
|
|
|
}
|
|
@@ -321,25 +330,20 @@ impl Requests {
|
|
|
let signer = self.signer();
|
|
|
let span = tracing::Span::current();
|
|
|
|
|
|
- let res = self
|
|
|
+ let request = self
|
|
|
.client
|
|
|
.get(url.as_str())
|
|
|
- .insert_header(("Accept", accept))
|
|
|
- .insert_header(Date(SystemTime::now().into()))
|
|
|
- .no_decompress()
|
|
|
- .signature(
|
|
|
- self.config.clone(),
|
|
|
- self.key_id.clone(),
|
|
|
- move |signing_string| {
|
|
|
- span.record("signing_string", signing_string);
|
|
|
- span.in_scope(|| signer.sign(signing_string))
|
|
|
- },
|
|
|
- )
|
|
|
- .await?
|
|
|
- .send()
|
|
|
- .await;
|
|
|
+ .header("Accept", accept)
|
|
|
+ .header("Date", Date(SystemTime::now().into()).to_string())
|
|
|
+ .signature(&self.config, self.key_id.clone(), move |signing_string| {
|
|
|
+ span.record("signing_string", signing_string);
|
|
|
+ span.in_scope(|| signer.sign(signing_string))
|
|
|
+ })
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ let res = self.client.execute(request).await;
|
|
|
|
|
|
- let res = self.check_response(url, res).await?;
|
|
|
+ let res = self.check_response(url, strategy, res).await?;
|
|
|
|
|
|
Ok(res)
|
|
|
}
|
|
@@ -349,7 +353,12 @@ impl Requests {
|
|
|
skip_all,
|
|
|
fields(inbox = inbox.to_string().as_str(), signing_string)
|
|
|
)]
|
|
|
- pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
|
|
|
+ pub(crate) async fn deliver<T>(
|
|
|
+ &self,
|
|
|
+ inbox: &IriString,
|
|
|
+ item: &T,
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<(), Error>
|
|
|
where
|
|
|
T: serde::ser::Serialize + std::fmt::Debug,
|
|
|
{
|
|
@@ -358,6 +367,7 @@ impl Requests {
|
|
|
item,
|
|
|
"application/activity+json",
|
|
|
"application/activity+json",
|
|
|
+ strategy,
|
|
|
)
|
|
|
.await?;
|
|
|
Ok(())
|
|
@@ -369,7 +379,8 @@ impl Requests {
|
|
|
item: &T,
|
|
|
content_type: &str,
|
|
|
accept: &str,
|
|
|
- ) -> Result<ClientResponse, Error>
|
|
|
+ strategy: BreakerStrategy,
|
|
|
+ ) -> Result<reqwest::Response, Error>
|
|
|
where
|
|
|
T: serde::ser::Serialize + std::fmt::Debug,
|
|
|
{
|
|
@@ -381,12 +392,12 @@ impl Requests {
|
|
|
let span = tracing::Span::current();
|
|
|
let item_string = serde_json::to_string(item)?;
|
|
|
|
|
|
- let (req, body) = self
|
|
|
+ let request = self
|
|
|
.client
|
|
|
.post(inbox.as_str())
|
|
|
- .insert_header(("Accept", accept))
|
|
|
- .insert_header(("Content-Type", content_type))
|
|
|
- .insert_header(Date(SystemTime::now().into()))
|
|
|
+ .header("Accept", accept)
|
|
|
+ .header("Content-Type", content_type)
|
|
|
+ .header("Date", Date(SystemTime::now().into()).to_string())
|
|
|
.signature_with_digest(
|
|
|
self.config.clone(),
|
|
|
self.key_id.clone(),
|
|
@@ -397,12 +408,11 @@ impl Requests {
|
|
|
span.in_scope(|| signer.sign(signing_string))
|
|
|
},
|
|
|
)
|
|
|
- .await?
|
|
|
- .split();
|
|
|
+ .await?;
|
|
|
|
|
|
- let res = req.send_body(body).await;
|
|
|
+ let res = self.client.execute(request).await;
|
|
|
|
|
|
- let res = self.check_response(inbox, res).await?;
|
|
|
+ let res = self.check_response(inbox, strategy, res).await?;
|
|
|
|
|
|
Ok(res)
|
|
|
}
|