diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 826d06f..915d18e 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -15,17 +15,29 @@ repository = "https://www.github.com/HsuJv/webgateway" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix = "0.10.0" +tokio = {version="1.13.0", feature="io-util"} +tokio-io = "0.1.13" +tokio-core = "0.1.18" +tokio-codec = "0.1.2" +tokio-util = "0.6.9" + +actix = "0.10" actix-session = "0.4" actix-web = "3.3.2" actix-files = "0.5.0" actix-web-actors = "3.0.0" +actix-codec = "0.4" + urlencoding = "2.1.0" +bytes = "1.1.0" serde = "1.0" serde_json = "1.0" trust-dns-resolver = "0.20" rand = "0.8" +futures = "0.3.17" +futures-util= "0.3" + # log systems femme = "1.3" log = "0.4" diff --git a/backend/src/agent/agent.rs b/backend/src/agent/agent.rs index c89e3a3..9051aa2 100644 --- a/backend/src/agent/agent.rs +++ b/backend/src/agent/agent.rs @@ -1,64 +1,198 @@ -use actix::{Actor, Addr, Context, Handler, Message, MessageResponse}; +use actix::prelude::*; +use actix_codec::{Decoder, Encoder}; use actix_web::web::Bytes; -use std::net::*; +use bytes::BytesMut; +use futures::stream::*; +use std::io; +use std::{collections::HashMap, task::Poll}; +use tokio::net::{tcp::OwnedWriteHalf, TcpStream}; +use tokio::runtime::{self, Runtime}; +use tokio_util::codec::FramedRead; + +use log::*; + +struct TcpCodec; + +impl Encoder for TcpCodec { + type Error = io::Error; + + fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { + info!("encoding: {:?}", item); + Ok(()) + } +} + +impl Decoder for TcpCodec { + type Item = Bytes; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + info!("recv from server: {:?}", src); + Ok(Some(Bytes::from(src.to_vec()))) + } +} -use log::info; #[derive(MessageResponse)] -pub enum AgentResp { +pub enum AgentResult { Success, Failed, } #[derive(Message)] -#[rtype(result = "AgentResp")] +#[rtype(result = "AgentResult")] pub enum AgentMsg { - ConnectServer(SocketAddr), - SendToServer(Bytes), + ReadReady, + SendToServer(String), SendToClient(Bytes), } pub struct Agent { id: u32, - server_info: Option, - server_stream: Option, - // client_info: SocketAddr, + server_info: String, + writer: OwnedWriteHalf, + runtime: Runtime, } impl Actor for Agent { type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + info!("Agent {} started", self.id); + // ctx.address().do_send(AgentMsg::ReadReady); + } } impl Handler for Agent { - type Result = AgentResp; + type Result = AgentResult; fn handle(&mut self, msg: AgentMsg, _ctx: &mut Context) -> Self::Result { match msg { - AgentMsg::ConnectServer(addr) => { - info!("connect to server: {}", addr); - self.server_info = Some(addr); - if let Ok(stream) = TcpStream::connect(addr) { - stream - .set_nonblocking(true) - .expect("set_nonblocking call failed"); - self.server_stream = Some(stream); - AgentResp::Success - } else { - AgentResp::Failed - } + AgentMsg::SendToServer(data) => { + self.writer.try_write(data.as_bytes()).unwrap(); + AgentResult::Success } - AgentMsg::SendToServer(_data) => AgentResp::Success, - AgentMsg::SendToClient(_data) => AgentResp::Success, + AgentMsg::SendToClient(_data) => panic!("unexpected message"), + _ => panic!("unexpected message"), } } } +impl StreamHandler for Agent { + fn handle(&mut self, msg: Bytes, ctx: &mut Context) { + info!("recv from server: {:?}", msg); + ctx.address().do_send(AgentMsg::SendToClient(msg)); + } +} + impl Agent { - pub fn new(id: u32) -> Addr { + pub async fn new(id: u32, target: (String, u16)) -> Option> { + let (host, port) = target; + let server_info = format!("{}:{}", host, port); + info!("connect to server: {}", server_info); + let addr = Agent::create(move |ctx| { + let mut builder = runtime::Builder::new_current_thread(); + builder.enable_all(); + + let runtime = builder.build().unwrap(); + + let server_stream = runtime.block_on(TcpStream::connect(&server_info)); + + if server_stream.is_err() { + info!("connect to server failed: {}", server_info); + } + let server_stream = server_stream.unwrap(); + let (r, w) = server_stream.into_split(); + // let r = FramedRead::new(r, TcpCodec {}); + let xx = poll_fn(move |_a| { + let mut buf = [0; 16384]; + match r.try_read(&mut buf) { + Ok(n) => { + if n == 0 { + return Poll::Pending; + } + return Poll::Ready(Some(Bytes::from(buf[..n].to_vec()))); + } + Err(e) => { + error!("error: {}", e); + if e.kind() == io::ErrorKind::WouldBlock { + return Poll::Pending; + } + return Poll::Ready(None); + } + }; + }); + Agent::add_stream(xx, ctx); + Self { + id, + server_info, + writer: w, + runtime, + } + }); + Some(addr) + } +} + +#[derive(MessageResponse)] +pub enum AgentManagerResult { + Success(Addr), + Failed, + NoReturn, +} + +#[derive(Message)] +#[rtype(result = "AgentManagerResult")] +pub enum AgentManagerMsg { + Add((u32, Addr)), + Get(u32), + Del(u32), +} + +pub struct AgentManager { + agents: HashMap>, +} + +impl AgentManager { + pub fn new() -> Addr { Self { - id, - server_info: None, - server_stream: None, + agents: HashMap::new(), } .start() } } + +impl Actor for AgentManager { + type Context = Context; + + fn started(&mut self, _ctx: &mut Context) { + info!("AgentManager started"); + } + + fn stopped(&mut self, _ctx: &mut Context) { + info!("AgentManager stopped"); + } +} + +impl Handler for AgentManager { + type Result = AgentManagerResult; + + fn handle(&mut self, msg: AgentManagerMsg, _ctx: &mut Context) -> Self::Result { + match msg { + AgentManagerMsg::Add(addr) => { + self.agents.insert(addr.0, addr.1); + AgentManagerResult::NoReturn + } + AgentManagerMsg::Get(aid) => { + if let Some(addr) = self.agents.get(&aid) { + AgentManagerResult::Success(addr.clone()) + } else { + AgentManagerResult::Failed + } + } + AgentManagerMsg::Del(id) => { + self.agents.remove(&id); + AgentManagerResult::NoReturn + } + } + } +} diff --git a/backend/src/agent/remote.rs b/backend/src/agent/remote.rs index 89223b1..5b95289 100644 --- a/backend/src/agent/remote.rs +++ b/backend/src/agent/remote.rs @@ -6,7 +6,6 @@ use serde_json::json; use log::info; use rand::Rng; -use crate::agent::resolver::*; use crate::AppData; use super::agent; @@ -28,10 +27,10 @@ pub async fn target_validate( ) -> Result { let remote = params.into_inner(); info!("{:?}", remote); - let resolved = data.resolver.send(ResolveMsg::Resolve(remote.host)).await; + // let resolved = data.resolver.send(ResolveMsg::Resolve(remote.host)).await; - match resolved.unwrap() { - ResolveResp::Success(ipaddr) => { + match data.resolver.lockup(remote.host).await { + Some(ipaddr) => { let json = json!({ "status": "success", "ip": ipaddr @@ -48,6 +47,7 @@ pub async fn target_validate( } } +const SSH_VER: &str = "SSH-2.0-OpenSSH_7.4p1 Ubuntu-4ubuntu2.1"; #[post("/target/ssh")] pub async fn target_ssh( session: Session, @@ -56,17 +56,18 @@ pub async fn target_ssh( ) -> Result { let aid = rand::thread_rng().gen::(); let remote = params.into_inner(); - let agent = agent::Agent::new(aid); + let agent = agent::Agent::new(aid, (remote.ip, remote.port)).await; - match agent - .send(agent::AgentMsg::ConnectServer( - format!("{}:{}", remote.ip, remote.port).parse().unwrap(), - )) - .await - { - Ok(agent::AgentResp::Success) => { + match agent { + Some(addr) => { + let _ = addr + .send(agent::AgentMsg::SendToServer(SSH_VER.to_string())) + .await; // add to agent list - data.agents.write().unwrap().insert(aid, agent); + let _ = data + .agents + .send(agent::AgentManagerMsg::Add((aid, addr))) + .await; // add session, so that the websocket can send message to the agent let _ = session.set::("aid", aid); diff --git a/backend/src/agent/resolver.rs b/backend/src/agent/resolver.rs index b1a0c6b..4b09505 100644 --- a/backend/src/agent/resolver.rs +++ b/backend/src/agent/resolver.rs @@ -1,75 +1,108 @@ -use actix::{Actor, Addr, Context, Handler, Message, MessageResponse}; +// use actix::{Actor, Addr, Context, Handler, Message, MessageResponse}; -use std::net::*; -use trust_dns_resolver::config::*; -use trust_dns_resolver::Resolver; +// use std::net::*; +// use trust_dns_resolver::config::*; +// use trust_dns_resolver::Resolver; -use log::info; +// use log::info; -#[derive(MessageResponse)] -pub enum ResolveResp { - Success(IpAddr), - Failed, -} +// #[derive(MessageResponse)] +// pub enum ResolveResp { +// Success(IpAddr), +// Failed, +// } -#[derive(Message)] -#[rtype(result = "ResolveResp")] -pub enum ResolveMsg { - Resolve(String), -} +// #[derive(Message)] +// #[rtype(result = "ResolveResp")] +// pub enum ResolveMsg { +// Resolve(String), +// } + +// pub struct DnsResolver { +// resolver: Resolver, +// } + +// impl Actor for DnsResolver { +// type Context = Context; +// } + +// impl Handler for DnsResolver { +// type Result = ResolveResp; + +// fn handle(&mut self, msg: ResolveMsg, _: &mut Context) -> Self::Result { +// match msg { +// ResolveMsg::Resolve(name) => { +// if let Ok(response) = self.resolver.lookup_ip(name.clone()) { +// if let Some(address) = response.iter().next() { +// info!("Resolved {} to {}", name, address); +// ResolveResp::Success(address) +// } else { +// ResolveResp::Failed +// } +// } else { +// info!("Failed to resolve {}", name); +// ResolveResp::Failed +// } +// } +// } +// } +// } + +// impl DnsResolver { +// pub fn new() -> Addr { +// let resolver = Resolver::new(ResolverConfig::default(), ResolverOpts::default()).unwrap(); + +// DnsResolver { resolver }.start() +// } +// } +use std::net::IpAddr; + +use tokio::runtime::{self, Runtime}; +use trust_dns_resolver::{ + config::*, + name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime}, +}; +use trust_dns_resolver::{AsyncResolver, TokioHandle}; + +use log::*; pub struct DnsResolver { - resolver: Resolver, -} - -impl Actor for DnsResolver { - type Context = Context; -} - -impl Handler for DnsResolver { - type Result = ResolveResp; - - fn handle(&mut self, msg: ResolveMsg, _: &mut Context) -> Self::Result { - match msg { - ResolveMsg::Resolve(name) => { - if let Ok(response) = self.resolver.lookup_ip(name.clone()) { - if let Some(address) = response.iter().next() { - info!("Resolved {} to {}", name, address); - ResolveResp::Success(address) - } else { - ResolveResp::Failed - } - } else { - info!("Failed to resolve {}", name); - ResolveResp::Failed - } - } - } - } + resolver: AsyncResolver>, + runtime: Runtime, } impl DnsResolver { - pub fn new() -> Addr { - let resolver = Resolver::new(ResolverConfig::default(), ResolverOpts::default()).unwrap(); + pub fn new() -> Self { + let resolver = AsyncResolver::new( + ResolverConfig::default(), + ResolverOpts::default(), + TokioHandle, + ) + .unwrap(); - DnsResolver { resolver }.start() + let mut builder = runtime::Builder::new_current_thread(); + builder.enable_all(); + + let runtime = builder.build().unwrap(); + + Self { resolver, runtime } + } + + pub async fn lockup(&self, name: String) -> Option { + let lookup = self.resolver.lookup_ip(name.clone()); + + // todo!("work out how to run it async"); + if let Ok(response) = self.runtime.block_on(lookup) { + if let Some(address) = response.iter().next() { + info!("Resolved {} to {}", name, address); + Some(address) + } else { + info!("Failed to resolve {}", name); + None + } + } else { + info!("Failed to resolve {}", name); + None + } } } - -// Construct a new Resolver with default configuration options -// let mut resolver = Resolver::new(ResolverConfig::default(), ResolverOpts::default()).unwrap(); - -// On Unix/Posix systems, this will read the /etc/resolv.conf -// let mut resolver = Resolver::from_system_conf().unwrap(); - -// Lookup the IP addresses associated with a name. -// let mut response = resolver.lookup_ip("www.example.com.").unwrap(); - -// There can be many addresses associated with the name, -// this can return IPv4 and/or IPv6 addresses -// let address = response.iter().next().expect("no addresses returned!"); -// if address.is_ipv4() { -// assert_eq!(address, IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34))); -// } else { -// assert_eq!(address, IpAddr::V6(Ipv6Addr::new(0x2606, 0x2800, 0x220, 0x1, 0x248, 0x1893, 0x25c8, 0x1946))); -// } diff --git a/backend/src/agent/ws.rs b/backend/src/agent/ws.rs index 2eee717..070080e 100644 --- a/backend/src/agent/ws.rs +++ b/backend/src/agent/ws.rs @@ -7,9 +7,9 @@ use log::*; use crate::AppData; -use super::agent::Agent; +use super::agent::{Agent, AgentManagerMsg, AgentManagerResult}; -/// Define HTTP actor +/// Define Websocket actor struct WsSession { agent: Addr, } @@ -38,8 +38,13 @@ pub async fn ws_index( stream: web::Payload, ) -> Result { let aid = session.get::("aid").unwrap_or(Some(0)).unwrap(); - let agent = data.agents.read().unwrap().get(&aid).unwrap().clone(); - let resp = ws::start(WsSession { agent }, &req, stream); + + let resp = match data.agents.send(AgentManagerMsg::Get(aid)).await.unwrap() { + AgentManagerResult::Success(agent) => ws::start(WsSession { agent }, &req, stream), + _ => Err(Error::from(actix_web::error::ErrorInternalServerError( + "Agent not found", + ))), + }; match &resp { Ok(resp) => info!("{:?}", resp), diff --git a/backend/src/main.rs b/backend/src/main.rs index f5827ec..00b2e97 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,14 +1,13 @@ -use std::{collections::HashMap, sync::RwLock}; - use actix::Addr; use actix_files as fs; use actix_session::CookieSession; use actix_web::http::{ContentEncoding, StatusCode}; use actix_web::*; -use agent::{agent::Agent, resolver::DnsResolver}; +use agent::{agent::AgentManager, resolver::DnsResolver}; use log::info; use rand::Rng; +use user::auth::Authenticator; mod agent; mod user; @@ -19,15 +18,17 @@ const PAGE_NOT_FOUND: &str = "./static/p404.html"; pub struct AppData { // session: CookieSession, - resolver: Addr, - agents: RwLock>>, + resolver: DnsResolver, + authenticator: Addr, + agents: Addr, } impl AppData { pub fn new() -> Self { Self { resolver: DnsResolver::new(), - agents: RwLock::new(HashMap::new()), + authenticator: Authenticator::new(), + agents: AgentManager::new(), } } } diff --git a/backend/src/user/auth.rs b/backend/src/user/auth.rs index 64a41aa..9738005 100644 --- a/backend/src/user/auth.rs +++ b/backend/src/user/auth.rs @@ -1,22 +1,23 @@ -use actix::{Actor, Context, Handler, Message, MessageResponse}; -use actix_session::Session; +use actix::{Actor, Addr, Context, Handler, Message, MessageResponse}; use actix_web::*; use serde::{Deserialize, Serialize}; use serde_json::json; use log::info; +use crate::AppData; + #[derive(MessageResponse)] #[allow(dead_code)] -enum AuthResp { +enum AuthResult { AuthSuccess, AuthFailure, } #[derive(Message)] -#[rtype(result = "AuthResp")] +#[rtype(result = "AuthResult")] enum AuthMsg { - DoAuth, + DoAuth(AuthInfo), } #[derive(Serialize, Deserialize, Debug)] @@ -25,12 +26,19 @@ pub struct AuthInfo { password: String, } -impl Actor for AuthInfo { +pub struct Authenticator; + +impl Authenticator { + pub fn new() -> Addr { + Self {}.start() + } +} + +impl Actor for Authenticator { type Context = Context; fn started(&mut self, _ctx: &mut Self::Context) { info!("AuthInfo started"); - info!("{:?}", self); } fn stopped(&mut self, _ctx: &mut Self::Context) { @@ -38,23 +46,32 @@ impl Actor for AuthInfo { } } -impl Handler for AuthInfo { - type Result = AuthResp; +impl Handler for Authenticator { + type Result = AuthResult; - fn handle(&mut self, _msg: AuthMsg, _ctx: &mut Context) -> Self::Result { - info!("AuthInfo handle"); - AuthResp::AuthSuccess + fn handle(&mut self, msg: AuthMsg, _ctx: &mut Context) -> Self::Result { + match msg { + AuthMsg::DoAuth(auth_info) => { + if auth_info.username == "admin" && auth_info.password == "admin" { + AuthResult::AuthSuccess + } else { + AuthResult::AuthFailure + } + } + } } } #[post("/auth")] -pub async fn auth(params: web::Json) -> Result { - let auth = params.into_inner(); - let auth_addr = auth.start(); - let res = auth_addr.send(AuthMsg::DoAuth).await; +pub async fn auth( + params: web::Json, + data: web::Data, +) -> Result { + let auth_info = params.into_inner(); + let res = data.authenticator.send(AuthMsg::DoAuth(auth_info)).await; match res { - Ok(AuthResp::AuthSuccess) => Ok(HttpResponse::Ok().json(json!({ + Ok(AuthResult::AuthSuccess) => Ok(HttpResponse::Ok().json(json!({ "status": "success", }))), _ => Ok(HttpResponse::Ok().json(json!({ diff --git a/frontend/src/components/host.rs b/frontend/src/components/host.rs index a1aa19b..9653638 100644 --- a/frontend/src/components/host.rs +++ b/frontend/src/components/host.rs @@ -131,6 +131,7 @@ impl Component for Host {
+