From 22f6d0b8d696d9100679d428f1f8515f6dc3eddc Mon Sep 17 00:00:00 2001 From: Anton Anisimov Date: Wed, 26 Mar 2025 00:49:13 +0300 Subject: [PATCH 1/2] -> add | raw --- Cargo.toml | 2 +- src/executors/web_socket_executors.rs | 27 +++++++++++++++++++++++++-- src/websockets/mod.rs | 16 +++++++++++++--- 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f8dfc15c4..3d7810394 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ name = "robyn" crate-type = ["cdylib", "rlib"] [dependencies] -pyo3 = { version = "0.20.0", features = ["extension-module"] } +pyo3 = { version = "0.20.0", features = ["extension-module",] } pyo3-asyncio = { version="0.20.0" , features = ["attributes", "tokio-runtime"] } pyo3-log = "0.8.4" tokio = { version = "1.26.0", features = ["full"] } diff --git a/src/executors/web_socket_executors.rs b/src/executors/web_socket_executors.rs index 3d71bc2f1..4acf4fee0 100644 --- a/src/executors/web_socket_executors.rs +++ b/src/executors/web_socket_executors.rs @@ -2,14 +2,36 @@ use actix::prelude::*; use actix::AsyncContext; use actix_web_actors::ws; use pyo3::prelude::*; +use pyo3::types::PyString; use pyo3_asyncio::TaskLocals; use crate::types::function_info::FunctionInfo; use crate::websockets::WebSocketConnector; +pub enum WsMsgIn { + String(String), + Bytes(Vec), +} + +impl Default for WsMsgIn { + fn default() -> Self { + WsMsgIn::String("".to_owned()) + } +} + + +impl IntoPy for WsMsgIn { + fn into_py(self, py: Python<'_>) -> PyObject { + match self { + WsMsgIn::String(val) => val.into_py(py), + WsMsgIn::Bytes(val) => val.into_py(py), + } + } +} + fn get_function_output<'a>( function: &'a FunctionInfo, - fn_msg: Option, + fn_msg: Option, py: Python<'a>, ws: &WebSocketConnector, ) -> Result<&'a PyAny, PyErr> { @@ -57,9 +79,10 @@ fn get_function_output<'a>( } } + pub fn execute_ws_function( function: &FunctionInfo, - text: Option, + text: Option, task_locals: &TaskLocals, ctx: &mut ws::WebsocketContext, ws: &WebSocketConnector, diff --git a/src/websockets/mod.rs b/src/websockets/mod.rs index 51f51a47c..e03c27641 100644 --- a/src/websockets/mod.rs +++ b/src/websockets/mod.rs @@ -1,6 +1,6 @@ pub mod registry; -use crate::executors::web_socket_executors::execute_ws_function; +use crate::executors::web_socket_executors::{execute_ws_function, WsMsgIn}; use crate::types::function_info::FunctionInfo; use crate::types::multimap::QueryParams; use registry::{Close, SendMessageToAll, SendText}; @@ -90,13 +90,23 @@ impl StreamHandler> for WebSocketConnecto let function = self.router.get("message").unwrap(); execute_ws_function( function, - Some(text.to_string()), + Some(WsMsgIn::String(text.to_string())), + &self.task_locals, + ctx, + self, + ); + } + Ok(ws::Message::Binary(bin)) => { + debug!("Bin data received"); + let function = self.router.get("message").unwrap(); + execute_ws_function( + function, + Some(WsMsgIn::Bytes(bin.to_vec())), &self.task_locals, ctx, self, ); } - Ok(ws::Message::Binary(bin)) => ctx.binary(bin), Ok(ws::Message::Close(_close_reason)) => { debug!("Socket was closed"); let function = self.router.get("close").unwrap(); From 9997da92c9b0ca00882f3fcef70c5521a21b5273 Mon Sep 17 00:00:00 2001 From: Anton Anisimov Date: Wed, 26 Mar 2025 00:57:59 +0300 Subject: [PATCH 2/2] -> add | raw --- Cargo.toml | 2 +- src/executors/web_socket_executors.rs | 12 +++++++----- src/websockets/mod.rs | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d7810394..f8dfc15c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ name = "robyn" crate-type = ["cdylib", "rlib"] [dependencies] -pyo3 = { version = "0.20.0", features = ["extension-module",] } +pyo3 = { version = "0.20.0", features = ["extension-module"] } pyo3-asyncio = { version="0.20.0" , features = ["attributes", "tokio-runtime"] } pyo3-log = "0.8.4" tokio = { version = "1.26.0", features = ["full"] } diff --git a/src/executors/web_socket_executors.rs b/src/executors/web_socket_executors.rs index 4acf4fee0..a3b28d7e1 100644 --- a/src/executors/web_socket_executors.rs +++ b/src/executors/web_socket_executors.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use actix::prelude::*; use actix::AsyncContext; use actix_web_actors::ws; @@ -8,19 +10,19 @@ use pyo3_asyncio::TaskLocals; use crate::types::function_info::FunctionInfo; use crate::websockets::WebSocketConnector; -pub enum WsMsgIn { +pub enum WsMsgIn<'a> { String(String), - Bytes(Vec), + Bytes(Cow<'a, [u8]>), } -impl Default for WsMsgIn { +impl <'a>Default for WsMsgIn<'a> { fn default() -> Self { - WsMsgIn::String("".to_owned()) + WsMsgIn::String(Default::default()) } } -impl IntoPy for WsMsgIn { +impl <'a>IntoPy for WsMsgIn<'a> { fn into_py(self, py: Python<'_>) -> PyObject { match self { WsMsgIn::String(val) => val.into_py(py), diff --git a/src/websockets/mod.rs b/src/websockets/mod.rs index e03c27641..aa45794c9 100644 --- a/src/websockets/mod.rs +++ b/src/websockets/mod.rs @@ -17,6 +17,7 @@ use pyo3_asyncio::TaskLocals; use uuid::Uuid; use registry::{Register, WebSocketRegistry}; +use std::borrow::Cow; use std::collections::HashMap; /// Define HTTP actor @@ -101,7 +102,7 @@ impl StreamHandler> for WebSocketConnecto let function = self.router.get("message").unwrap(); execute_ws_function( function, - Some(WsMsgIn::Bytes(bin.to_vec())), + Some(WsMsgIn::Bytes(Cow::from(bin.to_vec()))), &self.task_locals, ctx, self,