Skip to content

Commit 5fa17c6

Browse files
authored
chore(extension): moves code outside of binary code (#830)
# What? Extension code was living in the `main.rs` for the binary, I don't think it was ideal to have it there, so I moved it to its own module. # Why? Allows us to then have a dedicated extension module as a library. # Notes Might need to move the TelemetryAPI code into this module, as it's extension related, that would be in another PR tho
1 parent 7cb691c commit 5fa17c6

File tree

7 files changed

+258
-173
lines changed

7 files changed

+258
-173
lines changed

bottlecap/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2024"
55
publish = false
66

77
[dependencies]
8+
anyhow = { version = "1.0", default-features = false }
89
async-trait = { version = "0.1", default-features = false }
910
bytes = { version = "1.2", default-features = false }
1011
chrono = { version = "0.4", features = ["serde", "std", "now"], default-features = false }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 38 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@ use tikv_jemallocator::Jemalloc;
1717
static GLOBAL: Jemalloc = Jemalloc;
1818

1919
use bottlecap::{
20-
DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST,
21-
EXTENSION_HOST_IP, EXTENSION_ID_HEADER, EXTENSION_NAME, EXTENSION_NAME_HEADER, EXTENSION_ROUTE,
22-
LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
20+
DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
2321
appsec::processor::{
2422
Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor,
2523
},
26-
base_url,
2724
config::{
2825
self, Config,
2926
aws::{AwsConfig, AwsCredentials, build_lambda_function_arn},
3027
},
3128
event_bus::{Event, EventBus},
29+
extension::{
30+
self, EXTENSION_HOST, EXTENSION_HOST_IP, ExtensionError, NextEventResponse,
31+
RegisterResponse,
32+
},
3233
fips::{log_fips_status, prepare_client_provider},
3334
lifecycle::{
3435
flush_control::{FlushControl, FlushDecision},
@@ -82,11 +83,9 @@ use dogstatsd::{
8283
};
8384
use futures::stream::{FuturesOrdered, StreamExt};
8485
use reqwest::Client;
85-
use serde::Deserialize;
8686
use std::{
87-
collections::{HashMap, hash_map},
87+
collections::hash_map,
8888
env,
89-
io::{Error, Result},
9089
path::Path,
9190
sync::Arc,
9291
time::{Duration, Instant},
@@ -229,113 +228,8 @@ impl PendingFlushHandles {
229228
}
230229
}
231230

232-
#[derive(Clone, Deserialize)]
233-
#[serde(rename_all = "camelCase")]
234-
struct RegisterResponse {
235-
// Skip deserialize because this field is not available in the response
236-
// body, but as a header. Header is extracted and set manually.
237-
#[serde(skip_deserializing)]
238-
extension_id: String,
239-
account_id: Option<String>,
240-
}
241-
242-
#[derive(Deserialize)]
243-
#[serde(tag = "eventType")]
244-
enum NextEventResponse {
245-
#[serde(rename(deserialize = "INVOKE"))]
246-
Invoke {
247-
#[serde(rename(deserialize = "deadlineMs"))]
248-
deadline_ms: u64,
249-
#[serde(rename(deserialize = "requestId"))]
250-
request_id: String,
251-
#[serde(rename(deserialize = "invokedFunctionArn"))]
252-
invoked_function_arn: String,
253-
},
254-
#[serde(rename(deserialize = "SHUTDOWN"))]
255-
Shutdown {
256-
#[serde(rename(deserialize = "shutdownReason"))]
257-
shutdown_reason: String,
258-
#[serde(rename(deserialize = "deadlineMs"))]
259-
deadline_ms: u64,
260-
},
261-
}
262-
263-
async fn next_event(client: &Client, ext_id: &str) -> Result<NextEventResponse> {
264-
let base_url = base_url(EXTENSION_ROUTE)
265-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
266-
let url = format!("{base_url}/event/next");
267-
268-
let response = client
269-
.get(&url)
270-
.header(EXTENSION_ID_HEADER, ext_id)
271-
.send()
272-
.await
273-
.map_err(|e| {
274-
error!("Next request failed: {}", e);
275-
Error::new(std::io::ErrorKind::InvalidData, e.to_string())
276-
})?;
277-
278-
let status = response.status();
279-
let text = response.text().await.map_err(|e| {
280-
error!("Next response: Failed to read response body: {}", e);
281-
Error::new(std::io::ErrorKind::InvalidData, e.to_string())
282-
})?;
283-
284-
if !status.is_success() {
285-
error!("Next response HTTP Error {} - Response: {}", status, text);
286-
return Err(Error::new(
287-
std::io::ErrorKind::InvalidData,
288-
format!("HTTP Error {status}"),
289-
));
290-
}
291-
292-
serde_json::from_str(&text).map_err(|e| {
293-
error!("Next JSON parse error on response: {}", text);
294-
Error::new(std::io::ErrorKind::InvalidData, e.to_string())
295-
})
296-
}
297-
298-
async fn register(client: &Client) -> Result<RegisterResponse> {
299-
let mut map = HashMap::new();
300-
let base_url = base_url(EXTENSION_ROUTE)
301-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
302-
map.insert("events", vec!["INVOKE", "SHUTDOWN"]);
303-
let url = format!("{base_url}/register");
304-
305-
let resp = client
306-
.post(&url)
307-
.header(EXTENSION_NAME_HEADER, EXTENSION_NAME)
308-
.header(EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES)
309-
.json(&map)
310-
.send()
311-
.await
312-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
313-
314-
if resp.status() != 200 {
315-
let err = resp.error_for_status_ref();
316-
panic!("Can't register extension {err:?}");
317-
}
318-
319-
let extension_id = resp
320-
.headers()
321-
.get(EXTENSION_ID_HEADER)
322-
.expect("Extension ID header not found")
323-
.to_str()
324-
.expect("Can't convert header to string")
325-
.to_string();
326-
let mut register_response: RegisterResponse = resp
327-
.json::<RegisterResponse>()
328-
.await
329-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
330-
331-
// Set manually since it's not part of the response body
332-
register_response.extension_id = extension_id;
333-
334-
Ok(register_response)
335-
}
336-
337231
#[tokio::main]
338-
async fn main() -> Result<()> {
232+
async fn main() -> anyhow::Result<()> {
339233
let start_time = Instant::now();
340234
init_ustr();
341235
let (aws_config, aws_credentials, config) = load_configs(start_time);
@@ -346,24 +240,14 @@ async fn main() -> Result<()> {
346240
debug!("Starting Datadog Extension {version_without_next}");
347241
prepare_client_provider()?;
348242
let client = create_reqwest_client_builder()
349-
.map_err(|e| {
350-
Error::new(
351-
std::io::ErrorKind::InvalidData,
352-
format!("Failed to create client builder: {e:?}"),
353-
)
354-
})?
243+
.map_err(|e| anyhow::anyhow!("Failed to create client builder: {e:?}"))?
355244
.no_proxy()
356245
.build()
357-
.map_err(|e| {
358-
Error::new(
359-
std::io::ErrorKind::InvalidData,
360-
format!("Failed to create client: {e:?}"),
361-
)
362-
})?;
246+
.map_err(|e| anyhow::anyhow!("Failed to create client: {e:?}"))?;
363247

364-
let r = register(&client)
248+
let r = extension::register(&client, &aws_config.runtime_api)
365249
.await
366-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
250+
.map_err(|e| anyhow::anyhow!("Failed to register extension: {e:?}"))?;
367251

368252
let aws_config = Arc::new(aws_config);
369253
let api_key_factory = create_api_key_factory(&config, &aws_config, aws_credentials);
@@ -384,7 +268,7 @@ async fn main() -> Result<()> {
384268
}
385269
Err(e) => {
386270
error!("Extension loop failed: {e:?}, Calling /next without Datadog instrumentation");
387-
extension_loop_idle(&client, &r).await
271+
extension_loop_idle(&client, &r, &aws_config).await
388272
}
389273
}
390274
}
@@ -448,15 +332,19 @@ fn create_api_key_factory(
448332
})))
449333
}
450334

451-
async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()> {
335+
async fn extension_loop_idle(
336+
client: &Client,
337+
r: &RegisterResponse,
338+
aws_config: &AwsConfig,
339+
) -> anyhow::Result<()> {
452340
loop {
453-
match next_event(client, &r.extension_id).await {
341+
match extension::next_event(client, &r.extension_id, &aws_config.runtime_api).await {
454342
Ok(_) => {
455343
debug!("Extension is idle, skipping next event");
456344
}
457345
Err(e) => {
458346
error!("Error getting next event: {e:?}");
459-
return Err(e);
347+
return Err(e.into());
460348
}
461349
};
462350
}
@@ -470,7 +358,7 @@ async fn extension_loop_active(
470358
r: &RegisterResponse,
471359
api_key_factory: Arc<ApiKeyFactory>,
472360
start_time: Instant,
473-
) -> Result<()> {
361+
) -> anyhow::Result<()> {
474362
let mut event_bus = EventBus::run();
475363

476364
let account_id = r
@@ -558,7 +446,7 @@ async fn extension_loop_active(
558446

559447
let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy(
560448
config,
561-
aws_config,
449+
Arc::clone(&aws_config),
562450
&invocation_processor,
563451
appsec_processor.as_ref(),
564452
Arc::clone(&propagator),
@@ -578,7 +466,8 @@ async fn extension_loop_active(
578466
let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await;
579467

580468
let telemetry_listener_cancel_token =
581-
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;
469+
setup_telemetry_client(&r.extension_id, &aws_config.runtime_api, logs_agent_channel)
470+
.await?;
582471

583472
let otlp_cancel_token = start_otlp_agent(
584473
config,
@@ -597,7 +486,8 @@ async fn extension_loop_active(
597486
"Datadog Next-Gen Extension ready in {:}ms",
598487
start_time.elapsed().as_millis().to_string()
599488
);
600-
let next_lambda_response = next_event(client, &r.extension_id).await;
489+
let next_lambda_response =
490+
extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await;
601491
// first invoke we must call next
602492
let mut pending_flush_handles = PendingFlushHandles::new();
603493
handle_next_invocation(next_lambda_response, invocation_processor.clone()).await;
@@ -647,7 +537,8 @@ async fn extension_loop_active(
647537
&metrics_aggr_handle.clone(),
648538
)
649539
.await;
650-
let next_response = next_event(client, &r.extension_id).await;
540+
let next_response =
541+
extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await;
651542
maybe_shutdown_event =
652543
handle_next_invocation(next_response, invocation_processor.clone()).await;
653544
} else {
@@ -719,7 +610,8 @@ async fn extension_loop_active(
719610
// If we get platform.runtimeDone or platform.runtimeReport
720611
// That's fine, we still wait to break until we get the response from next
721612
// and then we break to determine if we'll flush or not
722-
let next_lambda_response = next_event(client, &r.extension_id);
613+
let next_lambda_response =
614+
extension::next_event(client, &aws_config.runtime_api, &r.extension_id);
723615
tokio::pin!(next_lambda_response);
724616
'next_invocation: loop {
725617
tokio::select! {
@@ -932,7 +824,7 @@ async fn handle_event_bus_event(
932824
}
933825

934826
async fn handle_next_invocation(
935-
next_response: Result<NextEventResponse>,
827+
next_response: Result<NextEventResponse, ExtensionError>,
936828
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
937829
) -> NextEventResponse {
938830
match next_response {
@@ -1210,8 +1102,9 @@ async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> Cancel
12101102

12111103
async fn setup_telemetry_client(
12121104
extension_id: &str,
1105+
runtime_api: &str,
12131106
logs_agent_channel: Sender<TelemetryEvent>,
1214-
) -> Result<CancellationToken> {
1107+
) -> anyhow::Result<CancellationToken> {
12151108
let telemetry_listener =
12161109
TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_agent_channel);
12171110

@@ -1222,11 +1115,15 @@ async fn setup_telemetry_client(
12221115
}
12231116
});
12241117

1225-
let telemetry_client = TelemetryApiClient::new(extension_id.to_string(), TELEMETRY_PORT);
1118+
let telemetry_client = TelemetryApiClient::new(
1119+
extension_id.to_string(),
1120+
TELEMETRY_PORT,
1121+
runtime_api.to_string(),
1122+
);
12261123
telemetry_client
12271124
.subscribe()
12281125
.await
1229-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
1126+
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;
12301127
Ok(cancel_token)
12311128
}
12321129

0 commit comments

Comments
 (0)