From da0819c2144be0309ea2332c2b98c108905dcac2 Mon Sep 17 00:00:00 2001 From: Jovi Hsu Date: Mon, 8 Nov 2021 13:58:09 +0800 Subject: [PATCH] frontend websocket init --- backend/src/agent/agent.rs | 9 ++- backend/src/agent/mod.rs | 4 +- backend/src/agent/remote.rs | 15 ++--- backend/src/agent/ws.rs | 13 ++-- backend/src/main.rs | 5 +- backend/src/user/auth.rs | 15 ++--- frontend/src/components/mod.rs | 3 +- frontend/src/components/ws.rs | 106 +++++++++++++++++++++++++++++++++ frontend/src/pages/page_ssh.rs | 14 ++++- 9 files changed, 159 insertions(+), 25 deletions(-) create mode 100644 frontend/src/components/ws.rs diff --git a/backend/src/agent/agent.rs b/backend/src/agent/agent.rs index dd7662f..c6ee786 100644 --- a/backend/src/agent/agent.rs +++ b/backend/src/agent/agent.rs @@ -27,7 +27,12 @@ impl Decoder for TcpCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { info!("recv from server: {:?}", src); - Ok(Some(Bytes::from(src.to_vec()))) + if 0 == src.len() { + return Ok(None); + } + let web_bytes = Bytes::from(src.to_vec()); + src.clear(); + Ok(Some(web_bytes)) } } @@ -181,10 +186,12 @@ impl Handler for AgentManager { fn handle(&mut self, msg: AgentManagerMsg, _ctx: &mut Context) -> Self::Result { match msg { AgentManagerMsg::Add(addr) => { + info!("add agent: {:?}", addr.0); self.agents.insert(addr.0, addr.1); AgentManagerResult::NoReturn } AgentManagerMsg::Get(aid) => { + info!("get agent: {}", aid); if let Some(addr) = self.agents.get(&aid) { AgentManagerResult::Success(addr.clone()) } else { diff --git a/backend/src/agent/mod.rs b/backend/src/agent/mod.rs index 21052b4..20548c9 100644 --- a/backend/src/agent/mod.rs +++ b/backend/src/agent/mod.rs @@ -1,4 +1,4 @@ +pub mod agent; pub mod remote; -pub mod ws; pub mod resolver; -pub mod agent; \ No newline at end of file +pub mod ws; diff --git a/backend/src/agent/remote.rs b/backend/src/agent/remote.rs index ae0eca7..0ffa059 100644 --- a/backend/src/agent/remote.rs +++ b/backend/src/agent/remote.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix_session::Session; use actix_web::*; use serde::{Deserialize, Serialize}; @@ -6,8 +8,6 @@ use serde_json::json; use log::info; use rand::Rng; -use crate::AppData; - use super::agent; #[derive(Debug, Serialize, Deserialize)] @@ -22,14 +22,14 @@ pub struct RemoteInfo { #[post("/target/validate")] pub async fn target_validate( - data: web::Data, + req: HttpRequest, params: web::Json, ) -> Result { let remote = params.into_inner(); info!("{:?}", remote); - // let resolved = data.resolver.send(ResolveMsg::Resolve(remote.host)).await; + let app_data = req.app_data::>().unwrap(); - match data.resolver.lockup(remote.host).await { + match app_data.resolver.lockup(remote.host).await { Some(ipaddr) => { let json = json!({ "status": "success", @@ -49,18 +49,19 @@ pub async fn target_validate( #[post("/target/ssh")] pub async fn target_ssh( + req: HttpRequest, session: Session, - data: web::Data, params: web::Json, ) -> Result { let aid = rand::thread_rng().gen::(); + let app_data = req.app_data::>().unwrap(); let remote = params.into_inner(); let agent = agent::Agent::new(aid, (remote.ip, remote.port)).await; match agent { Some(addr) => { // add to agent list - let _ = data + let _ = app_data .agents .send(agent::AgentManagerMsg::Add((aid, addr))) .await; diff --git a/backend/src/agent/ws.rs b/backend/src/agent/ws.rs index 974ebb7..3881a99 100644 --- a/backend/src/agent/ws.rs +++ b/backend/src/agent/ws.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix::{Actor, Addr, Message, StreamHandler}; use actix::{AsyncContext, Handler}; use actix_session::Session; @@ -7,8 +9,6 @@ use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; use log::*; -use crate::AppData; - use super::agent::*; #[derive(Message)] @@ -63,12 +63,17 @@ impl StreamHandler> for WsSession { pub async fn ws_index( req: HttpRequest, session: Session, - data: web::Data, stream: web::Payload, ) -> Result { let aid = session.get::("aid").unwrap_or(Some(0)).unwrap(); + let app_data = req.app_data::>().unwrap(); - let resp = match data.agents.send(AgentManagerMsg::Get(aid)).await.unwrap() { + let resp = match app_data + .agents + .send(AgentManagerMsg::Get(aid)) + .await + .unwrap() + { AgentManagerResult::Success(agent) => ws::start(WsSession { agent }, &req, stream), _ => Err(Error::from(actix_web::error::ErrorInternalServerError( "Agent not found", diff --git a/backend/src/main.rs b/backend/src/main.rs index bad95c3..20c1079 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix::Addr; use actix_files as fs; use actix_session::CookieSession; @@ -61,9 +63,10 @@ async fn main() -> std::io::Result<()> { info!("Server starts at http://127.0.0.1:8080"); let private_key = rand::thread_rng().gen::<[u8; 32]>(); + let app_data = Arc::new(AppData::new()); HttpServer::new(move || { App::new() - .app_data(AppData::new()) + .app_data(app_data.clone()) .wrap(CookieSession::signed(&private_key).secure(false)) .wrap(middleware::Compress::new(ContentEncoding::Gzip)) .service(index) diff --git a/backend/src/user/auth.rs b/backend/src/user/auth.rs index 9738005..b622bf4 100644 --- a/backend/src/user/auth.rs +++ b/backend/src/user/auth.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix::{Actor, Addr, Context, Handler, Message, MessageResponse}; use actix_web::*; use serde::{Deserialize, Serialize}; @@ -5,8 +7,6 @@ use serde_json::json; use log::info; -use crate::AppData; - #[derive(MessageResponse)] #[allow(dead_code)] enum AuthResult { @@ -63,12 +63,13 @@ impl Handler for Authenticator { } #[post("/auth")] -pub async fn auth( - params: web::Json, - data: web::Data, -) -> Result { +pub async fn auth(params: web::Json, req: HttpRequest) -> Result { let auth_info = params.into_inner(); - let res = data.authenticator.send(AuthMsg::DoAuth(auth_info)).await; + let app_data = req.app_data::>().unwrap(); + let res = app_data + .authenticator + .send(AuthMsg::DoAuth(auth_info)) + .await; match res { Ok(AuthResult::AuthSuccess) => Ok(HttpResponse::Ok().json(json!({ diff --git a/frontend/src/components/mod.rs b/frontend/src/components/mod.rs index a7832bd..8d25617 100644 --- a/frontend/src/components/mod.rs +++ b/frontend/src/components/mod.rs @@ -1,2 +1,3 @@ pub mod auth; -pub mod host; \ No newline at end of file +pub mod host; +pub mod ws; diff --git a/frontend/src/components/ws.rs b/frontend/src/components/ws.rs new file mode 100644 index 0000000..fe475dc --- /dev/null +++ b/frontend/src/components/ws.rs @@ -0,0 +1,106 @@ +use yew::prelude::*; +use yew::services::websocket::{WebSocketService, WebSocketStatus, WebSocketTask}; +use yew::services::ConsoleService; +use yew::{format::Binary, utils::host}; + +pub struct WebsocketCtx { + ws: Option, + link: ComponentLink, + error_msg: String, + onrecv: Callback>, +} + +#[derive(Clone, PartialEq, Properties)] +pub struct WebsocketProps { + #[prop_or_default] + pub onrecv: Callback>, +} + +pub enum WebsocketMsg { + Connect, + Disconnected, + Ignore, + Send(Binary), + Recv(Binary), +} + +impl Component for WebsocketCtx { + type Message = WebsocketMsg; + type Properties = WebsocketProps; + + fn create(props: Self::Properties, link: ComponentLink) -> Self { + Self { + ws: None, + link: link, + error_msg: String::new(), + onrecv: props.onrecv, + } + } + + fn update(&mut self, msg: Self::Message) -> ShouldRender { + match msg { + WebsocketMsg::Connect => { + ConsoleService::log("Connecting"); + let cbout = self.link.callback(|data| WebsocketMsg::Recv(data)); + let cbnot = self.link.callback(|input| { + ConsoleService::log(&format!("Notification: {:?}", input)); + match input { + WebSocketStatus::Closed | WebSocketStatus::Error => { + WebsocketMsg::Disconnected + } + _ => WebsocketMsg::Ignore, + } + }); + if self.ws.is_none() { + let task = WebSocketService::connect_binary( + &format!("ws://{}/ws", host().unwrap()), + cbout, + cbnot, + ); + self.ws = Some(task.unwrap()); + } + true + } + WebsocketMsg::Disconnected => { + self.ws = None; + self.error_msg = "Disconnected".to_string(); + true + } + WebsocketMsg::Ignore => false, + WebsocketMsg::Send(data) => { + if let Some(ref mut ws) = self.ws { + ws.send_binary(data); + } + false + } + WebsocketMsg::Recv(Ok(s)) => { + // ConsoleService::log(&format!("recv {:?}", s)); + self.onrecv.emit(s); + false + } + WebsocketMsg::Recv(Err(s)) => { + self.error_msg = format!("Error when reading from server: {}\n", &s.to_string()); + self.link.send_message(WebsocketMsg::Disconnected); + true + } + } + } + + fn change(&mut self, _prop: Self::Properties) -> ShouldRender { + false + } + + fn view(&self) -> Html { + html! { + <> + {self.error_msg.clone()} + + } + } + + fn rendered(&mut self, first_render: bool) { + if first_render && self.ws.is_none() { + self.link.send_message(WebsocketMsg::Connect); + } + } +} diff --git a/frontend/src/pages/page_ssh.rs b/frontend/src/pages/page_ssh.rs index 679cff1..0c6b44e 100644 --- a/frontend/src/pages/page_ssh.rs +++ b/frontend/src/pages/page_ssh.rs @@ -22,6 +22,7 @@ pub enum SshMsg { SshConnect((String, u16)), SshConnectResp(Result), SshConnected, + SshRecv(Vec), } impl Component for PageSsh { @@ -88,6 +89,10 @@ impl Component for PageSsh { self.connected = true; true } + SshMsg::SshRecv(v) => { + self.error_msg = String::from_utf8(v).unwrap(); + true + } } } @@ -96,8 +101,8 @@ impl Component for PageSsh { } fn view(&self) -> Html { - let connect_ssh = self.link.callback(SshMsg::SshConnect); if !self.connected { + let connect_ssh = self.link.callback(SshMsg::SshConnect); html! { <> @@ -105,8 +110,13 @@ impl Component for PageSsh { } } else { + let recv_msg = self.link.callback(|v| SshMsg::SshRecv(v)); html! { - <> + <> + + {self.error_msg.clone()} + } } }