From b2d56e28c49f9bb8d1aefee55de1b62a4ddda2fd Mon Sep 17 00:00:00 2001 From: Jovi Hsu Date: Mon, 8 Nov 2021 09:04:20 +0800 Subject: [PATCH] update actix to make it can handle socket stream --- backend/Cargo.toml | 10 +-- backend/src/agent/agent.rs | 112 +++++++++++++++++----------------- backend/src/agent/remote.rs | 6 +- backend/src/agent/resolver.rs | 12 +--- backend/src/agent/ws.rs | 37 +++++++++-- backend/src/main.rs | 2 +- 6 files changed, 99 insertions(+), 80 deletions(-) diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 915d18e..78cabd7 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -21,11 +21,11 @@ tokio-core = "0.1.18" tokio-codec = "0.1.2" tokio-util = "0.6.9" -actix = "0.10" -actix-session = "0.4" -actix-web = "3.3.2" -actix-files = "0.5.0" -actix-web-actors = "3.0.0" +actix = "0.12.0" +actix-session = "0.5.0-beta.3" +actix-web = "4.0.0-beta.10" +actix-files = "0.6.0-beta.8" +actix-web-actors = "4.0.0-beta.7" actix-codec = "0.4" urlencoding = "2.1.0" diff --git a/backend/src/agent/agent.rs b/backend/src/agent/agent.rs index 9051aa2..dd7662f 100644 --- a/backend/src/agent/agent.rs +++ b/backend/src/agent/agent.rs @@ -1,12 +1,11 @@ +use crate::agent::ws; use actix::prelude::*; use actix_codec::{Decoder, Encoder}; use actix_web::web::Bytes; use bytes::BytesMut; -use futures::stream::*; +use std::collections::HashMap; use std::io; -use std::{collections::HashMap, task::Poll}; use tokio::net::{tcp::OwnedWriteHalf, TcpStream}; -use tokio::runtime::{self, Runtime}; use tokio_util::codec::FramedRead; use log::*; @@ -16,7 +15,7 @@ struct TcpCodec; impl Encoder for TcpCodec { type Error = io::Error; - fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Bytes, _dst: &mut BytesMut) -> Result<(), Self::Error> { info!("encoding: {:?}", item); Ok(()) } @@ -32,25 +31,21 @@ impl Decoder for TcpCodec { } } -#[derive(MessageResponse)] -pub enum AgentResult { - Success, - Failed, -} - #[derive(Message)] -#[rtype(result = "AgentResult")] +#[rtype(result = "()")] pub enum AgentMsg { - ReadReady, - SendToServer(String), + Ready(Addr), + SendToServer(Bytes), SendToClient(Bytes), + Shutdown, } pub struct Agent { id: u32, server_info: String, writer: OwnedWriteHalf, - runtime: Runtime, + ws_addr: Option>, + pending: Vec, } impl Actor for Agent { @@ -63,24 +58,54 @@ impl Actor for Agent { } impl Handler for Agent { - type Result = AgentResult; + type Result = (); fn handle(&mut self, msg: AgentMsg, _ctx: &mut Context) -> Self::Result { match msg { - AgentMsg::SendToServer(data) => { - self.writer.try_write(data.as_bytes()).unwrap(); - AgentResult::Success + AgentMsg::Ready(ws_addr) => { + self.ws_addr = Some(ws_addr); + info!("Agent {} - Websocket connect ready", self.server_info); + for msg in self.pending.drain(..) { + self.ws_addr + .as_ref() + .unwrap() + .do_send(ws::WsMsg::SendToClient(msg)); + } + } + AgentMsg::SendToServer(data) => { + let to_send = data.to_vec(); + self.writer.try_write(&to_send).unwrap(); + } + AgentMsg::SendToClient(data) => { + if self.ws_addr.is_some() { + self.ws_addr + .as_ref() + .unwrap() + .do_send(ws::WsMsg::SendToClient(data)); + } } - AgentMsg::SendToClient(_data) => panic!("unexpected message"), _ => panic!("unexpected message"), } } } -impl StreamHandler for Agent { - fn handle(&mut self, msg: Bytes, ctx: &mut Context) { - info!("recv from server: {:?}", msg); - ctx.address().do_send(AgentMsg::SendToClient(msg)); +impl StreamHandler> for Agent { + fn handle(&mut self, msg: Result, ctx: &mut Context) { + match msg { + Ok(data) => { + info!("recv from server: {:?}", data); + if self.ws_addr.is_some() { + ctx.address().do_send(AgentMsg::SendToClient(data)); + } else { + info!("Websocket session not ready"); + self.pending.push(data); + } + } + Err(err) => { + error!("error: {:?}", err); + ctx.address().do_send(AgentMsg::Shutdown); + } + } } } @@ -89,44 +114,21 @@ impl Agent { let (host, port) = target; let server_info = format!("{}:{}", host, port); info!("connect to server: {}", server_info); + let server_stream = TcpStream::connect(&server_info).await; + if server_stream.is_err() { + info!("connect to server failed: {}", server_info); + } + let server_stream = server_stream.unwrap(); let addr = Agent::create(move |ctx| { - let mut builder = runtime::Builder::new_current_thread(); - builder.enable_all(); - - let runtime = builder.build().unwrap(); - - let server_stream = runtime.block_on(TcpStream::connect(&server_info)); - - if server_stream.is_err() { - info!("connect to server failed: {}", server_info); - } - let server_stream = server_stream.unwrap(); let (r, w) = server_stream.into_split(); - // let r = FramedRead::new(r, TcpCodec {}); - let xx = poll_fn(move |_a| { - let mut buf = [0; 16384]; - match r.try_read(&mut buf) { - Ok(n) => { - if n == 0 { - return Poll::Pending; - } - return Poll::Ready(Some(Bytes::from(buf[..n].to_vec()))); - } - Err(e) => { - error!("error: {}", e); - if e.kind() == io::ErrorKind::WouldBlock { - return Poll::Pending; - } - return Poll::Ready(None); - } - }; - }); - Agent::add_stream(xx, ctx); + let r = FramedRead::new(r, TcpCodec {}); + Agent::add_stream(r, ctx); Self { id, server_info, writer: w, - runtime, + ws_addr: None, + pending: vec![], } }); Some(addr) diff --git a/backend/src/agent/remote.rs b/backend/src/agent/remote.rs index 5b95289..ae0eca7 100644 --- a/backend/src/agent/remote.rs +++ b/backend/src/agent/remote.rs @@ -47,7 +47,6 @@ pub async fn target_validate( } } -const SSH_VER: &str = "SSH-2.0-OpenSSH_7.4p1 Ubuntu-4ubuntu2.1"; #[post("/target/ssh")] pub async fn target_ssh( session: Session, @@ -60,9 +59,6 @@ pub async fn target_ssh( match agent { Some(addr) => { - let _ = addr - .send(agent::AgentMsg::SendToServer(SSH_VER.to_string())) - .await; // add to agent list let _ = data .agents @@ -70,7 +66,7 @@ pub async fn target_ssh( .await; // add session, so that the websocket can send message to the agent - let _ = session.set::("aid", aid); + let _ = session.insert("aid", aid); // send response let json = json!({ diff --git a/backend/src/agent/resolver.rs b/backend/src/agent/resolver.rs index 4b09505..a72519e 100644 --- a/backend/src/agent/resolver.rs +++ b/backend/src/agent/resolver.rs @@ -57,7 +57,6 @@ // } use std::net::IpAddr; -use tokio::runtime::{self, Runtime}; use trust_dns_resolver::{ config::*, name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime}, @@ -68,7 +67,6 @@ use log::*; pub struct DnsResolver { resolver: AsyncResolver>, - runtime: Runtime, } impl DnsResolver { @@ -80,19 +78,13 @@ impl DnsResolver { ) .unwrap(); - let mut builder = runtime::Builder::new_current_thread(); - builder.enable_all(); - - let runtime = builder.build().unwrap(); - - Self { resolver, runtime } + Self { resolver } } pub async fn lockup(&self, name: String) -> Option { let lookup = self.resolver.lookup_ip(name.clone()); - // todo!("work out how to run it async"); - if let Ok(response) = self.runtime.block_on(lookup) { + if let Ok(response) = lookup.await { if let Some(address) = response.iter().next() { info!("Resolved {} to {}", name, address); Some(address) diff --git a/backend/src/agent/ws.rs b/backend/src/agent/ws.rs index 070080e..974ebb7 100644 --- a/backend/src/agent/ws.rs +++ b/backend/src/agent/ws.rs @@ -1,5 +1,7 @@ -use actix::{Actor, Addr, StreamHandler}; +use actix::{Actor, Addr, Message, StreamHandler}; +use actix::{AsyncContext, Handler}; use actix_session::Session; +use actix_web::web::Bytes; use actix_web::*; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; @@ -7,15 +9,40 @@ use log::*; use crate::AppData; -use super::agent::{Agent, AgentManagerMsg, AgentManagerResult}; +use super::agent::*; + +#[derive(Message)] +#[rtype(result = "()")] +pub enum WsMsg { + SendToClient(Bytes), +} /// Define Websocket actor -struct WsSession { +pub struct WsSession { agent: Addr, } impl Actor for WsSession { type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + // start heartbeats otherwise server will disconnect after 10 seconds + self.agent.do_send(AgentMsg::Ready(ctx.address())); + info!("Websocket connection is established."); + } +} + +impl Handler for WsSession { + type Result = (); + + fn handle(&mut self, msg: WsMsg, ctx: &mut Self::Context) -> () { + match msg { + WsMsg::SendToClient(data) => { + ctx.binary(data); + } + }; + () + } } /// Handler for ws::Message message @@ -24,7 +51,9 @@ impl StreamHandler> for WsSession { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), - Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + Ok(ws::Message::Binary(bin)) => { + self.agent.do_send(AgentMsg::SendToServer(bin)); + } _ => (), } } diff --git a/backend/src/main.rs b/backend/src/main.rs index 00b2e97..bad95c3 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -63,7 +63,7 @@ async fn main() -> std::io::Result<()> { let private_key = rand::thread_rng().gen::<[u8; 32]>(); HttpServer::new(move || { App::new() - .data(AppData::new()) + .app_data(AppData::new()) .wrap(CookieSession::signed(&private_key).secure(false)) .wrap(middleware::Compress::new(ContentEncoding::Gzip)) .service(index)