update actix to make it can handle socket stream

This commit is contained in:
Jovi Hsu 2021-11-08 09:04:20 +08:00
parent 1f4a535110
commit b2d56e28c4
6 changed files with 99 additions and 80 deletions

View File

@ -21,11 +21,11 @@ tokio-core = "0.1.18"
tokio-codec = "0.1.2" tokio-codec = "0.1.2"
tokio-util = "0.6.9" tokio-util = "0.6.9"
actix = "0.10" actix = "0.12.0"
actix-session = "0.4" actix-session = "0.5.0-beta.3"
actix-web = "3.3.2" actix-web = "4.0.0-beta.10"
actix-files = "0.5.0" actix-files = "0.6.0-beta.8"
actix-web-actors = "3.0.0" actix-web-actors = "4.0.0-beta.7"
actix-codec = "0.4" actix-codec = "0.4"
urlencoding = "2.1.0" urlencoding = "2.1.0"

View File

@ -1,12 +1,11 @@
use crate::agent::ws;
use actix::prelude::*; use actix::prelude::*;
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use futures::stream::*; use std::collections::HashMap;
use std::io; use std::io;
use std::{collections::HashMap, task::Poll};
use tokio::net::{tcp::OwnedWriteHalf, TcpStream}; use tokio::net::{tcp::OwnedWriteHalf, TcpStream};
use tokio::runtime::{self, Runtime};
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use log::*; use log::*;
@ -16,7 +15,7 @@ struct TcpCodec;
impl Encoder<Bytes> for TcpCodec { impl Encoder<Bytes> for TcpCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Bytes, _dst: &mut BytesMut) -> Result<(), Self::Error> {
info!("encoding: {:?}", item); info!("encoding: {:?}", item);
Ok(()) Ok(())
} }
@ -32,25 +31,21 @@ impl Decoder for TcpCodec {
} }
} }
#[derive(MessageResponse)]
pub enum AgentResult {
Success,
Failed,
}
#[derive(Message)] #[derive(Message)]
#[rtype(result = "AgentResult")] #[rtype(result = "()")]
pub enum AgentMsg { pub enum AgentMsg {
ReadReady, Ready(Addr<ws::WsSession>),
SendToServer(String), SendToServer(Bytes),
SendToClient(Bytes), SendToClient(Bytes),
Shutdown,
} }
pub struct Agent { pub struct Agent {
id: u32, id: u32,
server_info: String, server_info: String,
writer: OwnedWriteHalf, writer: OwnedWriteHalf,
runtime: Runtime, ws_addr: Option<Addr<ws::WsSession>>,
pending: Vec<Bytes>,
} }
impl Actor for Agent { impl Actor for Agent {
@ -63,24 +58,54 @@ impl Actor for Agent {
} }
impl Handler<AgentMsg> for Agent { impl Handler<AgentMsg> for Agent {
type Result = AgentResult; type Result = ();
fn handle(&mut self, msg: AgentMsg, _ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: AgentMsg, _ctx: &mut Context<Self>) -> Self::Result {
match msg { match msg {
AgentMsg::SendToServer(data) => { AgentMsg::Ready(ws_addr) => {
self.writer.try_write(data.as_bytes()).unwrap(); self.ws_addr = Some(ws_addr);
AgentResult::Success info!("Agent {} - Websocket connect ready", self.server_info);
for msg in self.pending.drain(..) {
self.ws_addr
.as_ref()
.unwrap()
.do_send(ws::WsMsg::SendToClient(msg));
}
}
AgentMsg::SendToServer(data) => {
let to_send = data.to_vec();
self.writer.try_write(&to_send).unwrap();
}
AgentMsg::SendToClient(data) => {
if self.ws_addr.is_some() {
self.ws_addr
.as_ref()
.unwrap()
.do_send(ws::WsMsg::SendToClient(data));
}
} }
AgentMsg::SendToClient(_data) => panic!("unexpected message"),
_ => panic!("unexpected message"), _ => panic!("unexpected message"),
} }
} }
} }
impl StreamHandler<Bytes> for Agent { impl StreamHandler<Result<Bytes, io::Error>> for Agent {
fn handle(&mut self, msg: Bytes, ctx: &mut Context<Self>) { fn handle(&mut self, msg: Result<Bytes, io::Error>, ctx: &mut Context<Self>) {
info!("recv from server: {:?}", msg); match msg {
ctx.address().do_send(AgentMsg::SendToClient(msg)); Ok(data) => {
info!("recv from server: {:?}", data);
if self.ws_addr.is_some() {
ctx.address().do_send(AgentMsg::SendToClient(data));
} else {
info!("Websocket session not ready");
self.pending.push(data);
}
}
Err(err) => {
error!("error: {:?}", err);
ctx.address().do_send(AgentMsg::Shutdown);
}
}
} }
} }
@ -89,44 +114,21 @@ impl Agent {
let (host, port) = target; let (host, port) = target;
let server_info = format!("{}:{}", host, port); let server_info = format!("{}:{}", host, port);
info!("connect to server: {}", server_info); info!("connect to server: {}", server_info);
let server_stream = TcpStream::connect(&server_info).await;
if server_stream.is_err() {
info!("connect to server failed: {}", server_info);
}
let server_stream = server_stream.unwrap();
let addr = Agent::create(move |ctx| { let addr = Agent::create(move |ctx| {
let mut builder = runtime::Builder::new_current_thread();
builder.enable_all();
let runtime = builder.build().unwrap();
let server_stream = runtime.block_on(TcpStream::connect(&server_info));
if server_stream.is_err() {
info!("connect to server failed: {}", server_info);
}
let server_stream = server_stream.unwrap();
let (r, w) = server_stream.into_split(); let (r, w) = server_stream.into_split();
// let r = FramedRead::new(r, TcpCodec {}); let r = FramedRead::new(r, TcpCodec {});
let xx = poll_fn(move |_a| { Agent::add_stream(r, ctx);
let mut buf = [0; 16384];
match r.try_read(&mut buf) {
Ok(n) => {
if n == 0 {
return Poll::Pending;
}
return Poll::Ready(Some(Bytes::from(buf[..n].to_vec())));
}
Err(e) => {
error!("error: {}", e);
if e.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(None);
}
};
});
Agent::add_stream(xx, ctx);
Self { Self {
id, id,
server_info, server_info,
writer: w, writer: w,
runtime, ws_addr: None,
pending: vec![],
} }
}); });
Some(addr) Some(addr)

View File

@ -47,7 +47,6 @@ pub async fn target_validate(
} }
} }
const SSH_VER: &str = "SSH-2.0-OpenSSH_7.4p1 Ubuntu-4ubuntu2.1";
#[post("/target/ssh")] #[post("/target/ssh")]
pub async fn target_ssh( pub async fn target_ssh(
session: Session, session: Session,
@ -60,9 +59,6 @@ pub async fn target_ssh(
match agent { match agent {
Some(addr) => { Some(addr) => {
let _ = addr
.send(agent::AgentMsg::SendToServer(SSH_VER.to_string()))
.await;
// add to agent list // add to agent list
let _ = data let _ = data
.agents .agents
@ -70,7 +66,7 @@ pub async fn target_ssh(
.await; .await;
// add session, so that the websocket can send message to the agent // add session, so that the websocket can send message to the agent
let _ = session.set::<u32>("aid", aid); let _ = session.insert("aid", aid);
// send response // send response
let json = json!({ let json = json!({

View File

@ -57,7 +57,6 @@
// } // }
use std::net::IpAddr; use std::net::IpAddr;
use tokio::runtime::{self, Runtime};
use trust_dns_resolver::{ use trust_dns_resolver::{
config::*, config::*,
name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime}, name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime},
@ -68,7 +67,6 @@ use log::*;
pub struct DnsResolver { pub struct DnsResolver {
resolver: AsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>, resolver: AsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>,
runtime: Runtime,
} }
impl DnsResolver { impl DnsResolver {
@ -80,19 +78,13 @@ impl DnsResolver {
) )
.unwrap(); .unwrap();
let mut builder = runtime::Builder::new_current_thread(); Self { resolver }
builder.enable_all();
let runtime = builder.build().unwrap();
Self { resolver, runtime }
} }
pub async fn lockup(&self, name: String) -> Option<IpAddr> { pub async fn lockup(&self, name: String) -> Option<IpAddr> {
let lookup = self.resolver.lookup_ip(name.clone()); let lookup = self.resolver.lookup_ip(name.clone());
// todo!("work out how to run it async"); if let Ok(response) = lookup.await {
if let Ok(response) = self.runtime.block_on(lookup) {
if let Some(address) = response.iter().next() { if let Some(address) = response.iter().next() {
info!("Resolved {} to {}", name, address); info!("Resolved {} to {}", name, address);
Some(address) Some(address)

View File

@ -1,5 +1,7 @@
use actix::{Actor, Addr, StreamHandler}; use actix::{Actor, Addr, Message, StreamHandler};
use actix::{AsyncContext, Handler};
use actix_session::Session; use actix_session::Session;
use actix_web::web::Bytes;
use actix_web::*; use actix_web::*;
use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws; use actix_web_actors::ws;
@ -7,15 +9,40 @@ use log::*;
use crate::AppData; use crate::AppData;
use super::agent::{Agent, AgentManagerMsg, AgentManagerResult}; use super::agent::*;
#[derive(Message)]
#[rtype(result = "()")]
pub enum WsMsg {
SendToClient(Bytes),
}
/// Define Websocket actor /// Define Websocket actor
struct WsSession { pub struct WsSession {
agent: Addr<Agent>, agent: Addr<Agent>,
} }
impl Actor for WsSession { impl Actor for WsSession {
type Context = ws::WebsocketContext<Self>; type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// start heartbeats otherwise server will disconnect after 10 seconds
self.agent.do_send(AgentMsg::Ready(ctx.address()));
info!("Websocket connection is established.");
}
}
impl Handler<WsMsg> for WsSession {
type Result = ();
fn handle(&mut self, msg: WsMsg, ctx: &mut Self::Context) -> () {
match msg {
WsMsg::SendToClient(data) => {
ctx.binary(data);
}
};
()
}
} }
/// Handler for ws::Message message /// Handler for ws::Message message
@ -24,7 +51,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
match msg { match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin), Ok(ws::Message::Binary(bin)) => {
self.agent.do_send(AgentMsg::SendToServer(bin));
}
_ => (), _ => (),
} }
} }

View File

@ -63,7 +63,7 @@ async fn main() -> std::io::Result<()> {
let private_key = rand::thread_rng().gen::<[u8; 32]>(); let private_key = rand::thread_rng().gen::<[u8; 32]>();
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(AppData::new()) .app_data(AppData::new())
.wrap(CookieSession::signed(&private_key).secure(false)) .wrap(CookieSession::signed(&private_key).secure(false))
.wrap(middleware::Compress::new(ContentEncoding::Gzip)) .wrap(middleware::Compress::new(ContentEncoding::Gzip))
.service(index) .service(index)