diff --git a/Cargo.lock b/Cargo.lock index 00d4ae3..cfcc2c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,9 +282,9 @@ dependencies = [ [[package]] name = "code0-flow" -version = "0.0.37" +version = "0.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15072d95451694ffcdb7fcd85cb44453e86fab6b9c2c23487c7699d2662918c9" +checksum = "708985c5041a712c44a26f15b5658946469ff79dc092e5ffb37e8861029f6bd9" dependencies = [ "async-nats", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 6121302..4b37159 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ version = "0.0.0" edition = "2024" [workspace.dependencies] -code0-flow = { version = "0.0.37" } +code0-flow = { version = "0.0.38" } tucana = { version = "0.0.75", features = ["aquila"] } serde_json = { version = "1.0.138" } log = "0.4.27" diff --git a/crates/base/src/client/mod.rs b/crates/base/src/client/mod.rs index 9189f74..8acd621 100644 --- a/crates/base/src/client/mod.rs +++ b/crates/base/src/client/mod.rs @@ -22,8 +22,12 @@ pub struct DracoRuntimeStatusService { const MAX_BACKOFF: u64 = 2000 * 60; const MAX_RETRIES: i8 = 10; -// Will create a channel and retry if its not possible -pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Channel { +pub async fn create_channel_with_retry( + channel_name: &str, + url: String, + connect_timeout: std::time::Duration, + request_timeout: std::time::Duration, +) -> Channel { let mut backoff = 100; let mut retries = 0; @@ -31,8 +35,7 @@ pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Chann let channel = match Endpoint::from_shared(url.clone()) { Ok(c) => { log::debug!("Creating a new endpoint for the: {} Service", channel_name); - c.connect_timeout(std::time::Duration::from_secs(2)) - .timeout(std::time::Duration::from_secs(10)) + c.connect_timeout(connect_timeout).timeout(request_timeout) } Err(err) => { panic!( @@ -67,8 +70,15 @@ pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Chann } } impl DracoRuntimeStatusService { - pub async fn from_url(aquila_url: String, aquila_token: String, identifier: String) -> Self { - let channel = create_channel_with_retry("Aquila", aquila_url).await; + pub async fn from_url( + aquila_url: String, + aquila_token: String, + identifier: String, + connect_timeout: std::time::Duration, + request_timeout: std::time::Duration, + ) -> Self { + let channel = + create_channel_with_retry("Aquila", aquila_url, connect_timeout, request_timeout).await; Self::new(channel, identifier, aquila_token) } diff --git a/crates/base/src/config.rs b/crates/base/src/config.rs index bccb8be..69e0178 100644 --- a/crates/base/src/config.rs +++ b/crates/base/src/config.rs @@ -67,6 +67,12 @@ pub struct AdapterConfig { /// Interval for runtime status heartbeat updates while the adapter is running. /// Set to 0 to disable periodic heartbeat updates. pub adapter_status_update_interval_seconds: u64, + + /// Timeout in seconds for establishing Aquila gRPC channels. + pub aquila_grpc_connect_timeout_secs: u64, + + /// Timeout in seconds for Aquila gRPC requests. + pub aquila_grpc_request_timeout_secs: u64, } impl AdapterConfig { @@ -103,6 +109,10 @@ impl AdapterConfig { "ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS", 30_u64, ); + let aquila_grpc_connect_timeout_secs = + code0_flow::flow_config::env_with_default("AQUILA_GRPC_CONNECT_TIMEOUT_SECS", 2_u64); + let aquila_grpc_request_timeout_secs = + code0_flow::flow_config::env_with_default("AQUILA_GRPC_REQUEST_TIMEOUT_SECS", 10_u64); Self { environment, nats_bucket, @@ -116,6 +126,8 @@ impl AdapterConfig { with_health_service, draco_variant, adapter_status_update_interval_seconds, + aquila_grpc_connect_timeout_secs, + aquila_grpc_request_timeout_secs, } } diff --git a/crates/base/src/runner.rs b/crates/base/src/runner.rs index 348a33c..0995447 100644 --- a/crates/base/src/runner.rs +++ b/crates/base/src/runner.rs @@ -67,6 +67,8 @@ impl ServerRunner { config.aquila_url.clone(), config.aquila_token.clone(), config.draco_variant.clone(), + Duration::from_secs(config.aquila_grpc_connect_timeout_secs), + Duration::from_secs(config.aquila_grpc_request_timeout_secs), ) .await, )); @@ -84,6 +86,8 @@ impl ServerRunner { config.aquila_url.clone(), config.definition_path.as_str(), config.aquila_token.clone(), + Duration::from_secs(config.aquila_grpc_connect_timeout_secs), + Duration::from_secs(config.aquila_grpc_request_timeout_secs), ) .await .with_definition_source(service_name)