From 2926938201589c049f5f0662bce9458b37a69280 Mon Sep 17 00:00:00 2001 From: jake Date: Wed, 19 Aug 2020 21:21:09 -0600 Subject: [PATCH] add InterserverActor trait for cross-server communication --- src/bin/main.rs | 4 +- src/common/interserver.rs | 57 +++++ .../{mainloop.rs => mainloop/client.rs} | 66 ++--- src/common/mainloop/interserver.rs | 238 ++++++++++++++++++ src/common/mainloop/mod.rs | 53 ++++ src/common/mod.rs | 1 + src/common/serverstate.rs | 1 + src/entity/character.rs | 3 +- src/login/login.rs | 23 ++ src/ship/ship.rs | 21 ++ 10 files changed, 412 insertions(+), 55 deletions(-) create mode 100644 src/common/interserver.rs rename src/common/{mainloop.rs => mainloop/client.rs} (86%) create mode 100644 src/common/mainloop/interserver.rs create mode 100644 src/common/mainloop/mod.rs diff --git a/src/bin/main.rs b/src/bin/main.rs index e3a2a83..0e23ac6 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -196,7 +196,7 @@ fn main() { let thread_entity_gateway = entity_gateway.clone(); info!("[auth] starting server"); let login_state = LoginServerState::new(thread_entity_gateway); - let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT); + let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT, elseware::login::login::COMMUNICATION_PORT); let thread_entity_gateway = entity_gateway.clone(); info!("[character] starting server"); @@ -206,7 +206,7 @@ fn main() { let thread_entity_gateway = entity_gateway.clone(); info!("[ship] starting server"); let ship_state = ShipServerState::new(thread_entity_gateway); - let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT); + let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT); futures::future::join_all(vec![patch_loop, login_loop, character_loop, ship_loop]).await; }); diff --git a/src/common/interserver.rs b/src/common/interserver.rs new file mode 100644 index 0000000..26a2d65 --- /dev/null +++ b/src/common/interserver.rs @@ -0,0 +1,57 @@ +use serde::{Serialize, Deserialize}; +use serde::de::DeserializeOwned; +use crate::entity::character::CharacterEntityId; + +#[derive(Debug, Copy, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] +pub struct ServerId(pub usize); +#[derive(Debug, Serialize, Deserialize)] +pub struct AuthToken(pub String); + +#[derive(Debug, Serialize, Deserialize)] +pub struct Ship { + name: String, + ip: String, + port: u16, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum LoginMessage { + SendMail { + character_id: CharacterEntityId, + title: String, + message: String, + }, + ShipList { + ships: Vec, + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ShipMessage { + Authenticate(AuthToken), + NewShip(Ship), + SendMail { + character_id: CharacterEntityId, + title: String, + message: String, + }, + RequestShipList, + +} + + + + + + + +#[async_trait::async_trait] +pub trait InterserverActor { + type SendMessage: Serialize; + type RecvMessage: DeserializeOwned; + type Error; + + async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; + async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error>; + async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; +} diff --git a/src/common/mainloop.rs b/src/common/mainloop/client.rs similarity index 86% rename from src/common/mainloop.rs rename to src/common/mainloop/client.rs index 09cb6a7..7f037cd 100644 --- a/src/common/mainloop.rs +++ b/src/common/mainloop/client.rs @@ -1,6 +1,5 @@ -#![allow(dead_code)] use std::pin::Pin; -use futures::future::Future; +use futures::future::{Future, join_all}; use log::{trace, info, warn}; use async_std::sync::{Arc, Mutex}; use async_std::io::prelude::{ReadExt, WriteExt}; @@ -12,13 +11,7 @@ use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use libpso::PacketParseError; use crate::common::serverstate::ClientId; use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; -use crate::common::interserver::InterserverActor; -use crate::patch::patch::PatchServerState; -use crate::login::login::LoginServerState; -use crate::login::character::CharacterServerState; -use crate::ship::ship::ShipServerState; -use crate::entity::gateway::entitygateway::EntityGateway; #[derive(Debug)] pub enum NetworkError { @@ -128,6 +121,7 @@ async fn send_pkt(socket: Arc Result<(), NetworkError> { let buf = pkt.as_bytes(); + println!("sndbuf: {:?}", buf); let cbuf = cipher.lock().await.encrypt(&buf)?; let mut ssock = &*socket; ssock.write_all(&cbuf).await?; @@ -210,7 +204,8 @@ async fn client_recv_loop(client_id: ClientId, socket: Arc, cipher: Arc>>, server_sender: async_std::sync::Sender, R>>, - client_sender: async_std::sync::Sender>) where + client_sender: async_std::sync::Sender>) +where S: SendServerPacket + std::fmt::Debug + Send + 'static, R: RecvServerPacket + std::fmt::Debug + Send + 'static, { @@ -244,12 +239,11 @@ async fn client_recv_loop(client_id: ClientId, } async fn client_send_loop(client_id: ClientId, - socket: Arc, - cipher_in: Arc>>, - cipher_out: Arc>>, - client_receiver: async_std::sync::Receiver>, - -) where + socket: Arc, + cipher_in: Arc>>, + cipher_out: Arc>>, + client_receiver: async_std::sync::Receiver>) +where S: SendServerPacket + std::fmt::Debug + Send + 'static, { async_std::task::spawn(async move { @@ -375,25 +369,24 @@ async fn state_client_loop(state: Arc>, } -pub fn client_accept_mainloop(state: Arc>, client_port: u16) -> impl Future +pub fn client_accept_mainloop(state: Arc>, client_port: u16) -> Pin>> where STATE: ServerState + Send + 'static, S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, E: std::fmt::Debug + Send, { - async_std::task::spawn(async move { + Box::pin(async_std::task::spawn(async move { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap(); - let mut id = 1; + let mut id = 0; let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); - state_client_loop(state, server_state_receiver).await; loop { let (sock, addr) = listener.accept().await.unwrap(); - let client_id = crate::common::serverstate::ClientId(id); id += 1; + let client_id = crate::common::serverstate::ClientId(id); info!("new client {:?} {:?} {:?}", client_id, sock, addr); @@ -405,37 +398,6 @@ where client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await; client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await; } - }) + })) } - - -pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin>> { - let patch_state = Arc::new(Mutex::new(patch_state)); - let client_mainloop = client_accept_mainloop(patch_state, patch_port); - Box::pin(client_mainloop) -} - -pub fn login_mainloop(login_state: LoginServerState, login_port: u16) -> Pin>> { - let login_state = Arc::new(Mutex::new(login_state)); - let client_mainloop = client_accept_mainloop(login_state.clone(), login_port); - //let ship_communication_mainloop = interserver_listen_mainloop(login_state.clone(), ship_listen_port); - Box::pin(client_mainloop) -} - -pub fn character_mainloop(character_state: CharacterServerState, character_port: u16) -> Pin>> { - let character_state = Arc::new(Mutex::new(character_state)); - let client_mainloop = client_accept_mainloop(character_state, character_port); - Box::pin(client_mainloop) -} - - -pub fn ship_mainloop(ship_state: ShipServerState, ship_port: u16) -> Pin>> { - let ship_state = Arc::new(Mutex::new(ship_state)); - let client_mainloop = client_accept_mainloop(ship_state, ship_port); - //let login_mainloop = ship_to_login_mainloop(ship_state, login_port); - //let admin_mainloop = ship_admin_mainloop(ship_state, admin_port); - - //futures::future::join_all(vec![client_mainloop, login_mainloop, admin_mainloop]) - Box::pin(client_mainloop) -} diff --git a/src/common/mainloop/interserver.rs b/src/common/mainloop/interserver.rs new file mode 100644 index 0000000..1f112da --- /dev/null +++ b/src/common/mainloop/interserver.rs @@ -0,0 +1,238 @@ +use std::time::Duration; +use std::pin::Pin; +use futures::future::{Future, join_all, FutureExt}; +use log::{trace, info, warn}; +use async_std::sync::{Arc, Mutex}; +use async_std::io::prelude::{ReadExt, WriteExt}; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde::de::DeserializeOwned; + +use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; +use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage}; +use crate::common::mainloop::client::client_accept_mainloop; +pub use crate::common::mainloop::client::NetworkError; + +use crate::patch::patch::PatchServerState; +use crate::login::login::LoginServerState; +use crate::login::character::CharacterServerState; +use crate::ship::ship::ShipServerState; +use crate::entity::gateway::entitygateway::EntityGateway; + +#[derive(Debug)] +enum MessageReceiverError { + InvalidSize, + InvalidPayload, + NetworkError(std::io::Error), + Disconnected, +} + +struct MessageReceiver { + socket: async_std::net::TcpStream, +} + +impl MessageReceiver { + fn new(socket: async_std::net::TcpStream) -> MessageReceiver { + MessageReceiver { + socket: socket, + } + } + + async fn recv(&mut self) -> Result { + let mut size_buf = [0u8; 4]; + self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?; + let size = u32::from_le_bytes(size_buf) as usize; + info!("expected len: {:?}", size); + + let mut payload = vec![0u8; size]; + self.socket.read_exact(&mut payload).await.map_err(|err| MessageReceiverError::Disconnected)?; + let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?; + + info!("payload: {:?}", payload); + let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?; + info!("msg: {:?}", msg); + Ok(msg) + } +} + + +#[derive(Debug)] +enum InterserverInputAction { + NewConnection(ServerId, async_std::sync::Sender), + Message(ServerId, R), + Disconnect(ServerId), +} + +/*struct LoginOutputAction { + Message +}*/ + +async fn interserver_state_loop(state: Arc>, action_receiver: async_std::sync::Receiver>) +where + A: InterserverActor + Send + 'static, + S: Serialize + Send + 'static, + R: DeserializeOwned + Send + 'static, +{ + async_std::task::spawn(async move { + let mut ships = HashMap::new(); + + loop { + info!("interserver loop"); + let action = action_receiver.recv().await.unwrap(); + let mut state = state.lock().await; + + match action { + InterserverInputAction::NewConnection(server_id, ship_action_sender) => { + ships.insert(server_id, ship_action_sender); + for (server, action) in state.on_connect(server_id).await { + if let Some(sender) = ships.get_mut(&server) { + sender.send(action).await; + } + } + }, + InterserverInputAction::Message(server_id, message) => { + let actions = state.action(server_id, message).await; + match actions { + Ok(actions) => { + for (server, action) in actions{ + if let Some(sender) = ships.get_mut(&server) { + sender.send(action).await; + } + } + }, + Err(err) => { + warn!("[server {:?} state handler error] {:?}", server_id, err); + } + } + }, + InterserverInputAction::Disconnect(server_id) => { + let actions = state.on_disconnect(server_id).await; + for (server, action) in actions { + if let Some(sender) = ships.get_mut(&server) { + sender.send(action).await; + } + } + break; + } + } + } + }); +} + +async fn login_recv_loop(server_id: ServerId, + socket: async_std::net::TcpStream, + state_loop_sender: async_std::sync::Sender>, + output_loop_sender: async_std::sync::Sender) +where + S: Serialize + std::fmt::Debug + Send + 'static, + R: DeserializeOwned + std::fmt::Debug + Send + 'static, +{ + async_std::task::spawn(async move { + state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await; + let mut msg_receiver = MessageReceiver::new(socket); + + loop { + info!("login recv loop"); + match msg_receiver.recv().await { + Ok(msg) => { + info!("[login recv loop msg] {:?}", msg); + state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await + }, + Err(err) => { + if let MessageReceiverError::Disconnected = err { + info!("[login recv loop disconnect] {:?}", server_id); + state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await; + break; + } + info!("[login recv loop err] {:?}", err); + } + } + } + }); +} + +async fn interserver_send_loop(server_id: ServerId, + mut socket: async_std::net::TcpStream, + output_loop_receiver: async_std::sync::Receiver) +where + S: Serialize + std::fmt::Debug + Send + 'static, +{ + async_std::task::spawn(async move { + loop { + info!("login send loop"); + let msg = output_loop_receiver.recv().await.unwrap(); + + let payload = serde_json::to_string(&msg); + if let Ok(payload) = payload { + let len_bytes = u32::to_le_bytes(payload.len() as u32); + + match socket.write_all(&len_bytes).await { + Ok(_) => {}, + Err(err) => warn!("send failed: {:?}", err), + } + match socket.write_all(&payload.as_bytes()).await { + Ok(_) => {}, + Err(err) => warn!("send failed: {:?}", err), + } + } + } + }); +} + + + +pub fn login_listen_mainloop(state: Arc>>, port: u16) -> Pin>> { + Box::pin(async_std::task::spawn(async move { + let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); + let mut id = 0; + + let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); + interserver_state_loop(state, server_state_receiver).await; + + loop { + let (socket, addr) = listener.accept().await.unwrap(); + info!("new ship server: {:?} {:?}", socket, addr); + + id += 1; + let server_id = crate::common::interserver::ServerId(id); + + let (client_sender, client_receiver) = async_std::sync::channel(64); + + login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; + interserver_send_loop(server_id, socket.clone(), client_receiver).await; + } + })) +} + +pub fn ship_connect_mainloop(state: Arc>>, ip: std::net::Ipv4Addr, port: u16) -> Pin>> { + Box::pin(async_std::task::spawn(async move { + let mut id = 0; + + let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); + interserver_state_loop(state, server_state_receiver).await; + + loop { + //let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); + // TOOD: err check and loop with timeout + let socket = async_std::net::TcpStream::connect((ip, port)).await.unwrap(); + //let (socket, addr) = listener.accept().await.unwrap(); + info!("ship connected to login: {:?}", socket); + + id += 1; + let server_id = crate::common::interserver::ServerId(id); + + let (client_sender, client_receiver) = async_std::sync::channel(64); + + login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; + interserver_send_loop(server_id, socket.clone(), client_receiver).await; + loop { + if let Err(_) = socket.peer_addr() { + info!("ship connected to login: {:?}", socket); + break; + } + async_std::task::sleep(Duration::from_secs(10)).await; + } + } + })) +} + diff --git a/src/common/mainloop/mod.rs b/src/common/mainloop/mod.rs new file mode 100644 index 0000000..31c41e7 --- /dev/null +++ b/src/common/mainloop/mod.rs @@ -0,0 +1,53 @@ +mod client; +mod interserver; + +use std::time::Duration; +use std::pin::Pin; +use futures::future::{Future, join_all, FutureExt}; +use log::{trace, info, warn}; +use async_std::sync::{Arc, Mutex}; +use async_std::io::prelude::{ReadExt, WriteExt}; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde::de::DeserializeOwned; + +use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; +use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage}; +use crate::common::mainloop::client::client_accept_mainloop; +use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop}; +pub use crate::common::mainloop::client::NetworkError; + +use crate::patch::patch::PatchServerState; +use crate::login::login::LoginServerState; +use crate::login::character::CharacterServerState; +use crate::ship::ship::ShipServerState; +use crate::entity::gateway::entitygateway::EntityGateway; + + + +pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin>> { + let patch_state = Arc::new(Mutex::new(patch_state)); + let client_mainloop = client_accept_mainloop(patch_state, patch_port); + Box::pin(client_mainloop) +} + +pub fn login_mainloop(login_state: LoginServerState, login_port: u16, comm_port: u16) -> Pin>> { + let login_state = Arc::new(Mutex::new(login_state)); + let client_mainloop = client_accept_mainloop(login_state.clone(), login_port); + let ship_communication_mainloop = login_listen_mainloop(login_state.clone(), comm_port); + Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ())) +} + +pub fn character_mainloop(character_state: CharacterServerState, character_port: u16) -> Pin>> { + let character_state = Arc::new(Mutex::new(character_state)); + let client_mainloop = client_accept_mainloop(character_state, character_port); + Box::pin(client_mainloop) +} + + +pub fn ship_mainloop(ship_state: ShipServerState, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin>> { + let ship_state = Arc::new(Mutex::new(ship_state)); + let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port); + let login_communication_mainloop = ship_connect_mainloop(ship_state.clone(), comm_ip, comm_port); + Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ())) +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 954e8d0..d02076e 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -2,6 +2,7 @@ pub mod cipherkeys; pub mod serverstate; pub mod mainloop; pub mod leveltable; +pub mod interserver; // https://www.reddit.com/r/rust/comments/33xhhu/how_to_create_an_array_of_structs_that_havent/ #[macro_export] diff --git a/src/common/serverstate.rs b/src/common/serverstate.rs index 8417f7a..f1573ae 100644 --- a/src/common/serverstate.rs +++ b/src/common/serverstate.rs @@ -17,6 +17,7 @@ pub trait SendServerPacket: Sized + Sync { fn as_bytes(&self) -> Vec; } +// TODO: rename this trait, this isn't the state but the actionability of the state re: the client #[async_trait::async_trait] pub trait ServerState { type SendPacket: SendServerPacket; diff --git a/src/entity/character.rs b/src/entity/character.rs index f146998..fc87569 100644 --- a/src/entity/character.rs +++ b/src/entity/character.rs @@ -1,5 +1,6 @@ use std::convert::{From, Into}; use std::collections::HashMap; +use serde::{Serialize, Deserialize}; use libpso::packet::ship::{UpdateConfig, WriteInfoboard}; use libpso::character::character::{DEFAULT_PALETTE_CONFIG, DEFAULT_TECH_MENU}; @@ -235,7 +236,7 @@ pub struct CharacterMaterials { pub tp: u32, } -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct CharacterEntityId(pub u32); #[derive(Clone)] diff --git a/src/login/login.rs b/src/login/login.rs index bbced2a..3c2e21a 100644 --- a/src/login/login.rs +++ b/src/login/login.rs @@ -12,11 +12,13 @@ use libpso::util::array_to_utf8; use crate::common::cipherkeys::{ELSEWHERE_PRIVATE_KEY, ELSEWHERE_PARRAY}; use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ServerState, OnConnect, ClientId}; +use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage}; use crate::entity::gateway::EntityGateway; use crate::entity::account::{UserAccountEntity}; pub const LOGIN_PORT: u16 = 12000; +pub const COMMUNICATION_PORT: u16 = 12123; #[derive(Debug)] pub enum LoginError { @@ -138,6 +140,27 @@ impl ServerState for LoginServerState { } } + +#[async_trait::async_trait] +impl InterserverActor for LoginServerState { + type SendMessage = LoginMessage; + type RecvMessage = ShipMessage; + type Error = (); + + async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + Vec::new() + } + + async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error> { + Ok(Vec::new()) + } + + async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + Vec::new() + } +} + + #[cfg(test)] mod test { use std::time::SystemTime; diff --git a/src/ship/ship.rs b/src/ship/ship.rs index 69c0870..91341ca 100644 --- a/src/ship/ship.rs +++ b/src/ship/ship.rs @@ -15,6 +15,7 @@ use libpso::packet::ship::{BLOCK_MENU_ID, ROOM_MENU_ID}; use crate::common::cipherkeys::{ELSEWHERE_PRIVATE_KEY, ELSEWHERE_PARRAY}; use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ServerState, OnConnect, ClientId}; use crate::common::leveltable::CharacterLevelTable; +use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage}; use crate::entity::gateway::EntityGateway; use crate::entity::account::{UserAccountEntity, UserSettingsEntity}; @@ -490,3 +491,23 @@ impl ServerState for ShipServerState { }).collect() } } + + +#[async_trait::async_trait] +impl InterserverActor for ShipServerState { + type SendMessage = ShipMessage; + type RecvMessage = LoginMessage; + type Error = (); + + async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + Vec::new() + } + + async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error> { + Ok(Vec::new()) + } + + async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + Vec::new() + } +}