Skip to content

Commit 2a7467b

Browse files
Merge pull request #42 from code0-tech/fine-grained-logging
feat: added more logs for easier debugging
2 parents efecd19 + 62d235d commit 2a7467b

File tree

6 files changed

+113
-60
lines changed

6 files changed

+113
-60
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
version = "0.0.0"
33
name = "code0-flow"
4-
edition = "2021"
4+
edition = "2024"
55
description = "Crate for managing the code0-flows inside of the Flow Queue & FlowStore"
66
repository = "https://github.com/code0-tech/code0-flow"
77
homepage = "https://code0.tech"

src/flow_definition/mod.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use tucana::{
22
aquila::{
3+
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
34
data_type_service_client::DataTypeServiceClient,
45
flow_type_service_client::FlowTypeServiceClient,
56
runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
6-
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
77
},
88
shared::{DataType, FlowType, RuntimeFunctionDefinition},
99
};
@@ -51,14 +51,18 @@ impl FlowUpdateService {
5151

5252
async fn update_data_types(&self) {
5353
if self.data_types.is_empty() {
54+
log::info!("No data types to update");
5455
return;
5556
}
5657

5758
log::info!("Updating the current DataTypes!");
5859
let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
59-
Ok(client) => client,
60+
Ok(client) => {
61+
log::info!("Successfully connected to the DataTypeService");
62+
client
63+
}
6064
Err(err) => {
61-
log::error!("Failed to connect to DataTypeService: {}", err);
65+
log::error!("Failed to connect to the DataTypeService: {:?}", err);
6266
return;
6367
}
6468
};
@@ -75,23 +79,27 @@ impl FlowUpdateService {
7579
);
7680
}
7781
Err(err) => {
78-
log::error!("Failed to update data types: {}", err);
82+
log::error!("Failed to update data types: {:?}", err);
7983
}
8084
}
8185
}
8286

8387
async fn update_runtime_definitions(&self) {
8488
if self.runtime_definitions.is_empty() {
89+
log::info!("No runtime definitions to update");
8590
return;
8691
}
8792

8893
log::info!("Updating the current RuntimeDefinitions!");
8994
let mut client =
9095
match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
91-
Ok(client) => client,
96+
Ok(client) => {
97+
log::info!("Connected to RuntimeFunctionDefinitionService");
98+
client
99+
}
92100
Err(err) => {
93101
log::error!(
94-
"Failed to connect to RuntimeFunctionDefinitionService: {}",
102+
"Failed to connect to RuntimeFunctionDefinitionService: {:?}",
95103
err
96104
);
97105
return;
@@ -110,21 +118,25 @@ impl FlowUpdateService {
110118
);
111119
}
112120
Err(err) => {
113-
log::error!("Failed to update runtime function definitions: {}", err);
121+
log::error!("Failed to update runtime function definitions: {:?}", err);
114122
}
115123
}
116124
}
117125

118126
async fn update_flow_types(&self) {
119127
if self.flow_types.is_empty() {
128+
log::info!("No FlowTypes to update!");
120129
return;
121130
}
122131

123132
log::info!("Updating the current FlowTypes!");
124133
let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
125-
Ok(client) => client,
134+
Ok(client) => {
135+
log::info!("Connected to FlowTypeService!");
136+
client
137+
}
126138
Err(err) => {
127-
log::error!("Failed to connect to FlowTypeService: {}", err);
139+
log::error!("Failed to connect to FlowTypeService: {:?}", err);
128140
return;
129141
}
130142
};
@@ -141,7 +153,7 @@ impl FlowUpdateService {
141153
);
142154
}
143155
Err(err) => {
144-
log::error!("Failed to update flow types: {}", err);
156+
log::error!("Failed to update flow types: {:?}", err);
145157
}
146158
}
147159
}

src/flow_queue/connection.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@ use lapin::Connection;
33
pub async fn build_connection(rabbitmq_url: &str) -> Connection {
44
match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
55
Ok(env) => env,
6-
Err(error) => panic!(
7-
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
8-
error
9-
),
6+
Err(error) => {
7+
log::error!(
8+
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
9+
error
10+
);
11+
panic!("Cannot connect to FlowQueue (RabbitMQ) instance!");
12+
}
1013
}
1114
}
1215

1316
#[cfg(test)]
1417
mod tests {
1518
use crate::flow_queue::connection::build_connection;
19+
use testcontainers::GenericImage;
1620
use testcontainers::core::{IntoContainerPort, WaitFor};
1721
use testcontainers::runners::AsyncRunner;
18-
use testcontainers::GenericImage;
1922

2023
macro_rules! rabbitmq_container_test {
2124
($test_name:ident, $consumer:expr) => {

src/flow_queue/service.rs

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use std::{sync::Arc, time::Duration};
22

33
use futures_lite::StreamExt;
44
use lapin::{
5+
Channel,
56
options::{BasicConsumeOptions, QueueDeclareOptions},
67
types::FieldTable,
7-
Channel,
88
};
99
use log::debug;
1010
use serde::{Deserialize, Serialize};
@@ -44,6 +44,7 @@ pub enum RabbitMqError {
4444
ConnectionError(String),
4545
TimeoutError,
4646
DeserializationError,
47+
SerializationError,
4748
}
4849

4950
impl From<lapin::Error> for RabbitMqError {
@@ -65,6 +66,7 @@ impl std::fmt::Display for RabbitMqError {
6566
RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
6667
RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
6768
RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
69+
RabbitMqError::SerializationError => write!(f, "Failed to serialize message"),
6870
}
6971
}
7072
}
@@ -83,8 +85,10 @@ impl RabbitmqClient {
8385
)
8486
.await
8587
{
86-
Ok(_) => (),
87-
Err(err) => log::error!("Failed to declare send_queue: {}", err),
88+
Ok(_) => {
89+
log::info!("Successfully declared send_queue");
90+
}
91+
Err(err) => log::error!("Failed to declare send_queue: {:?}", err),
8892
}
8993

9094
match channel
@@ -95,8 +99,10 @@ impl RabbitmqClient {
9599
)
96100
.await
97101
{
98-
Ok(_) => (),
99-
Err(err) => log::error!("Failed to declare recieve_queue: {}", err),
102+
Ok(_) => {
103+
log::info!("Successfully declared recieve_queue");
104+
}
105+
Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err),
100106
}
101107

102108
RabbitmqClient {
@@ -109,23 +115,27 @@ impl RabbitmqClient {
109115
&self,
110116
message_json: String,
111117
queue_name: &str,
112-
) -> Result<(), lapin::Error> {
118+
) -> Result<(), RabbitMqError> {
113119
let channel = self.channel.lock().await;
114120

115-
channel
121+
match channel
116122
.basic_publish(
117123
"", // exchange
118124
queue_name, // routing key (queue name)
119125
lapin::options::BasicPublishOptions::default(),
120126
message_json.as_bytes(),
121127
lapin::BasicProperties::default(),
122128
)
123-
.await?;
124-
125-
Ok(())
129+
.await
130+
{
131+
Err(err) => {
132+
log::error!("Failed to publish message: {:?}", err);
133+
Err(RabbitMqError::LapinError(err))
134+
}
135+
Ok(_) => Ok(()),
136+
}
126137
}
127138

128-
// Receive messages from a queue
129139
// Receive messages from a queue with no timeout
130140
pub async fn await_message_no_timeout(
131141
&self,
@@ -146,8 +156,14 @@ impl RabbitmqClient {
146156
.await;
147157

148158
match consumer_res {
149-
Ok(consumer) => consumer,
150-
Err(err) => panic!("{}", err),
159+
Ok(consumer) => {
160+
log::info!("Established queue connection to {}", queue_name);
161+
consumer
162+
}
163+
Err(err) => {
164+
log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
165+
return Err(RabbitMqError::LapinError(err));
166+
}
151167
}
152168
};
153169

@@ -172,17 +188,19 @@ impl RabbitmqClient {
172188
let message = match serde_json::from_str::<Message>(message_str) {
173189
Ok(m) => m,
174190
Err(e) => {
175-
log::error!("Failed to parse message: {}", e);
191+
log::error!("Failed to parse message: {:?}", e);
176192
return Err(RabbitMqError::DeserializationError);
177193
}
178194
};
179195

180196
if message.message_id == message_id {
181197
if ack_on_success {
182-
delivery
198+
if let Err(delivery_error) = delivery
183199
.ack(lapin::options::BasicAckOptions::default())
184200
.await
185-
.expect("Failed to acknowledge message");
201+
{
202+
log::error!("Failed to acknowledge message: {:?}", delivery_error);
203+
}
186204
}
187205

188206
return Ok(message);
@@ -196,7 +214,7 @@ impl RabbitmqClient {
196214
&self,
197215
queue_name: &str,
198216
handle_message: fn(Message) -> Result<Message, lapin::Error>,
199-
) -> Result<(), lapin::Error> {
217+
) -> Result<(), RabbitMqError> {
200218
let mut consumer = {
201219
let channel = self.channel.lock().await;
202220

@@ -210,8 +228,14 @@ impl RabbitmqClient {
210228
.await;
211229

212230
match consumer_res {
213-
Ok(consumer) => consumer,
214-
Err(err) => panic!("Cannot consume messages: {}", err),
231+
Ok(consumer) => {
232+
log::info!("Established queue connection to {}", queue_name);
233+
consumer
234+
}
235+
Err(err) => {
236+
log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
237+
return Err(RabbitMqError::LapinError(err));
238+
}
215239
}
216240
};
217241

@@ -221,8 +245,8 @@ impl RabbitmqClient {
221245
let delivery = match delivery {
222246
Ok(del) => del,
223247
Err(err) => {
224-
log::error!("Error receiving message: {}", err);
225-
return Err(err);
248+
log::error!("Error receiving message: {:?}", err);
249+
return Err(RabbitMqError::LapinError(err));
226250
}
227251
};
228252

@@ -233,32 +257,32 @@ impl RabbitmqClient {
233257
str
234258
}
235259
Err(err) => {
236-
log::error!("Error decoding message: {}", err);
237-
return Ok(());
260+
log::error!("Error decoding message: {:?}", err);
261+
return Err(RabbitMqError::DeserializationError);
238262
}
239263
};
240264
// Parse the message
241265
let inc_message = match serde_json::from_str::<Message>(message_str) {
242266
Ok(mess) => mess,
243267
Err(err) => {
244-
log::error!("Error parsing message: {}", err);
245-
return Ok(());
268+
log::error!("Error parsing message: {:?}", err);
269+
return Err(RabbitMqError::DeserializationError);
246270
}
247271
};
248272

249273
let message = match handle_message(inc_message) {
250274
Ok(mess) => mess,
251275
Err(err) => {
252-
log::error!("Error handling message: {}", err);
253-
return Ok(());
276+
log::error!("Error handling message: {:?}", err);
277+
return Err(RabbitMqError::DeserializationError);
254278
}
255279
};
256280

257281
let message_json = match serde_json::to_string(&message) {
258282
Ok(json) => json,
259283
Err(err) => {
260-
log::error!("Error serializing message: {}", err);
261-
return Ok(());
284+
log::error!("Error serializing message: {:?}", err);
285+
return Err(RabbitMqError::SerializationError);
262286
}
263287
};
264288

@@ -267,10 +291,12 @@ impl RabbitmqClient {
267291
}
268292

269293
// Acknowledge the message
270-
delivery
294+
if let Err(delivery_error) = delivery
271295
.ack(lapin::options::BasicAckOptions::default())
272296
.await
273-
.expect("Failed to acknowledge message");
297+
{
298+
log::error!("Failed to acknowledge message: {:?}", delivery_error);
299+
}
274300
}
275301

276302
Ok(())

0 commit comments

Comments
 (0)