Browse Source

add InterserverActor trait for cross-server communication

pbs
jake 4 years ago
parent
commit
2926938201
  1. 4
      src/bin/main.rs
  2. 57
      src/common/interserver.rs
  3. 66
      src/common/mainloop/client.rs
  4. 238
      src/common/mainloop/interserver.rs
  5. 53
      src/common/mainloop/mod.rs
  6. 1
      src/common/mod.rs
  7. 1
      src/common/serverstate.rs
  8. 3
      src/entity/character.rs
  9. 23
      src/login/login.rs
  10. 21
      src/ship/ship.rs

4
src/bin/main.rs

@ -196,7 +196,7 @@ fn main() {
let thread_entity_gateway = entity_gateway.clone();
info!("[auth] starting server");
let login_state = LoginServerState::new(thread_entity_gateway);
let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT);
let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT, elseware::login::login::COMMUNICATION_PORT);
let thread_entity_gateway = entity_gateway.clone();
info!("[character] starting server");
@ -206,7 +206,7 @@ fn main() {
let thread_entity_gateway = entity_gateway.clone();
info!("[ship] starting server");
let ship_state = ShipServerState::new(thread_entity_gateway);
let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT);
let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT);
futures::future::join_all(vec![patch_loop, login_loop, character_loop, ship_loop]).await;
});

57
src/common/interserver.rs

@ -0,0 +1,57 @@
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use crate::entity::character::CharacterEntityId;
#[derive(Debug, Copy, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct ServerId(pub usize);
#[derive(Debug, Serialize, Deserialize)]
pub struct AuthToken(pub String);
#[derive(Debug, Serialize, Deserialize)]
pub struct Ship {
name: String,
ip: String,
port: u16,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum LoginMessage {
SendMail {
character_id: CharacterEntityId,
title: String,
message: String,
},
ShipList {
ships: Vec<Ship>,
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ShipMessage {
Authenticate(AuthToken),
NewShip(Ship),
SendMail {
character_id: CharacterEntityId,
title: String,
message: String,
},
RequestShipList,
}
#[async_trait::async_trait]
pub trait InterserverActor {
type SendMessage: Serialize;
type RecvMessage: DeserializeOwned;
type Error;
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error>;
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
}

66
src/common/mainloop.rs → src/common/mainloop/client.rs

@ -1,6 +1,5 @@
#![allow(dead_code)]
use std::pin::Pin;
use futures::future::Future;
use futures::future::{Future, join_all};
use log::{trace, info, warn};
use async_std::sync::{Arc, Mutex};
use async_std::io::prelude::{ReadExt, WriteExt};
@ -12,13 +11,7 @@ use libpso::crypto::{PSOCipher, NullCipher, CipherError};
use libpso::PacketParseError;
use crate::common::serverstate::ClientId;
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
use crate::common::interserver::InterserverActor;
use crate::patch::patch::PatchServerState;
use crate::login::login::LoginServerState;
use crate::login::character::CharacterServerState;
use crate::ship::ship::ShipServerState;
use crate::entity::gateway::entitygateway::EntityGateway;
#[derive(Debug)]
pub enum NetworkError {
@ -128,6 +121,7 @@ async fn send_pkt<S: SendServerPacket + Send + std::fmt::Debug>(socket: Arc<asyn
-> Result<(), NetworkError>
{
let buf = pkt.as_bytes();
println!("sndbuf: {:?}", buf);
let cbuf = cipher.lock().await.encrypt(&buf)?;
let mut ssock = &*socket;
ssock.write_all(&cbuf).await?;
@ -210,7 +204,8 @@ async fn client_recv_loop<S, R>(client_id: ClientId,
socket: Arc<async_std::net::TcpStream>,
cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
server_sender: async_std::sync::Sender<ClientAction<ServerStateAction<S>, R>>,
client_sender: async_std::sync::Sender<ServerStateAction<S>>) where
client_sender: async_std::sync::Sender<ServerStateAction<S>>)
where
S: SendServerPacket + std::fmt::Debug + Send + 'static,
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
{
@ -244,12 +239,11 @@ async fn client_recv_loop<S, R>(client_id: ClientId,
}
async fn client_send_loop<S>(client_id: ClientId,
socket: Arc<async_std::net::TcpStream>,
cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
client_receiver: async_std::sync::Receiver<ServerStateAction<S>>,
) where
socket: Arc<async_std::net::TcpStream>,
cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
client_receiver: async_std::sync::Receiver<ServerStateAction<S>>)
where
S: SendServerPacket + std::fmt::Debug + Send + 'static,
{
async_std::task::spawn(async move {
@ -375,25 +369,24 @@ async fn state_client_loop<STATE, S, R, E>(state: Arc<Mutex<STATE>>,
}
pub fn client_accept_mainloop<STATE, S, R, E>(state: Arc<Mutex<STATE>>, client_port: u16) -> impl Future<Output = ()>
pub fn client_accept_mainloop<STATE, S, R, E>(state: Arc<Mutex<STATE>>, client_port: u16) -> Pin<Box<dyn Future<Output = ()>>>
where
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
E: std::fmt::Debug + Send,
{
async_std::task::spawn(async move {
Box::pin(async_std::task::spawn(async move {
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap();
let mut id = 1;
let mut id = 0;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
state_client_loop(state, server_state_receiver).await;
loop {
let (sock, addr) = listener.accept().await.unwrap();
let client_id = crate::common::serverstate::ClientId(id);
id += 1;
let client_id = crate::common::serverstate::ClientId(id);
info!("new client {:?} {:?} {:?}", client_id, sock, addr);
@ -405,37 +398,6 @@ where
client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await;
client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await;
}
})
}))
}
pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let patch_state = Arc::new(Mutex::new(patch_state));
let client_mainloop = client_accept_mainloop(patch_state, patch_port);
Box::pin(client_mainloop)
}
pub fn login_mainloop<EG: EntityGateway + 'static>(login_state: LoginServerState<EG>, login_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let login_state = Arc::new(Mutex::new(login_state));
let client_mainloop = client_accept_mainloop(login_state.clone(), login_port);
//let ship_communication_mainloop = interserver_listen_mainloop(login_state.clone(), ship_listen_port);
Box::pin(client_mainloop)
}
pub fn character_mainloop<EG: EntityGateway + 'static>(character_state: CharacterServerState<EG>, character_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let character_state = Arc::new(Mutex::new(character_state));
let client_mainloop = client_accept_mainloop(character_state, character_port);
Box::pin(client_mainloop)
}
pub fn ship_mainloop<EG: EntityGateway + 'static>(ship_state: ShipServerState<EG>, ship_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let ship_state = Arc::new(Mutex::new(ship_state));
let client_mainloop = client_accept_mainloop(ship_state, ship_port);
//let login_mainloop = ship_to_login_mainloop(ship_state, login_port);
//let admin_mainloop = ship_admin_mainloop(ship_state, admin_port);
//futures::future::join_all(vec![client_mainloop, login_mainloop, admin_mainloop])
Box::pin(client_mainloop)
}

238
src/common/mainloop/interserver.rs

@ -0,0 +1,238 @@
use std::time::Duration;
use std::pin::Pin;
use futures::future::{Future, join_all, FutureExt};
use log::{trace, info, warn};
use async_std::sync::{Arc, Mutex};
use async_std::io::prelude::{ReadExt, WriteExt};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage};
use crate::common::mainloop::client::client_accept_mainloop;
pub use crate::common::mainloop::client::NetworkError;
use crate::patch::patch::PatchServerState;
use crate::login::login::LoginServerState;
use crate::login::character::CharacterServerState;
use crate::ship::ship::ShipServerState;
use crate::entity::gateway::entitygateway::EntityGateway;
#[derive(Debug)]
enum MessageReceiverError {
InvalidSize,
InvalidPayload,
NetworkError(std::io::Error),
Disconnected,
}
struct MessageReceiver {
socket: async_std::net::TcpStream,
}
impl MessageReceiver {
fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
MessageReceiver {
socket: socket,
}
}
async fn recv<R: DeserializeOwned + std::fmt::Debug + Send>(&mut self) -> Result<R, MessageReceiverError> {
let mut size_buf = [0u8; 4];
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
let size = u32::from_le_bytes(size_buf) as usize;
info!("expected len: {:?}", size);
let mut payload = vec![0u8; size];
self.socket.read_exact(&mut payload).await.map_err(|err| MessageReceiverError::Disconnected)?;
let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
info!("payload: {:?}", payload);
let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
info!("msg: {:?}", msg);
Ok(msg)
}
}
#[derive(Debug)]
enum InterserverInputAction<S, R> {
NewConnection(ServerId, async_std::sync::Sender<S>),
Message(ServerId, R),
Disconnect(ServerId),
}
/*struct LoginOutputAction {
Message
}*/
async fn interserver_state_loop<A, S, R>(state: Arc<Mutex<A>>, action_receiver: async_std::sync::Receiver<InterserverInputAction<S, R>>)
where
A: InterserverActor<SendMessage=S, RecvMessage=R, Error=()> + Send + 'static,
S: Serialize + Send + 'static,
R: DeserializeOwned + Send + 'static,
{
async_std::task::spawn(async move {
let mut ships = HashMap::new();
loop {
info!("interserver loop");
let action = action_receiver.recv().await.unwrap();
let mut state = state.lock().await;
match action {
InterserverInputAction::NewConnection(server_id, ship_action_sender) => {
ships.insert(server_id, ship_action_sender);
for (server, action) in state.on_connect(server_id).await {
if let Some(sender) = ships.get_mut(&server) {
sender.send(action).await;
}
}
},
InterserverInputAction::Message(server_id, message) => {
let actions = state.action(server_id, message).await;
match actions {
Ok(actions) => {
for (server, action) in actions{
if let Some(sender) = ships.get_mut(&server) {
sender.send(action).await;
}
}
},
Err(err) => {
warn!("[server {:?} state handler error] {:?}", server_id, err);
}
}
},
InterserverInputAction::Disconnect(server_id) => {
let actions = state.on_disconnect(server_id).await;
for (server, action) in actions {
if let Some(sender) = ships.get_mut(&server) {
sender.send(action).await;
}
}
break;
}
}
}
});
}
async fn login_recv_loop<S, R>(server_id: ServerId,
socket: async_std::net::TcpStream,
state_loop_sender: async_std::sync::Sender<InterserverInputAction<S, R>>,
output_loop_sender: async_std::sync::Sender<S>)
where
S: Serialize + std::fmt::Debug + Send + 'static,
R: DeserializeOwned + std::fmt::Debug + Send + 'static,
{
async_std::task::spawn(async move {
state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await;
let mut msg_receiver = MessageReceiver::new(socket);
loop {
info!("login recv loop");
match msg_receiver.recv().await {
Ok(msg) => {
info!("[login recv loop msg] {:?}", msg);
state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await
},
Err(err) => {
if let MessageReceiverError::Disconnected = err {
info!("[login recv loop disconnect] {:?}", server_id);
state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await;
break;
}
info!("[login recv loop err] {:?}", err);
}
}
}
});
}
async fn interserver_send_loop<S>(server_id: ServerId,
mut socket: async_std::net::TcpStream,
output_loop_receiver: async_std::sync::Receiver<S>)
where
S: Serialize + std::fmt::Debug + Send + 'static,
{
async_std::task::spawn(async move {
loop {
info!("login send loop");
let msg = output_loop_receiver.recv().await.unwrap();
let payload = serde_json::to_string(&msg);
if let Ok(payload) = payload {
let len_bytes = u32::to_le_bytes(payload.len() as u32);
match socket.write_all(&len_bytes).await {
Ok(_) => {},
Err(err) => warn!("send failed: {:?}", err),
}
match socket.write_all(&payload.as_bytes()).await {
Ok(_) => {},
Err(err) => warn!("send failed: {:?}", err),
}
}
}
});
}
pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<LoginServerState<EG>>>, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async_std::task::spawn(async move {
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
let mut id = 0;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
interserver_state_loop(state, server_state_receiver).await;
loop {
let (socket, addr) = listener.accept().await.unwrap();
info!("new ship server: {:?} {:?}", socket, addr);
id += 1;
let server_id = crate::common::interserver::ServerId(id);
let (client_sender, client_receiver) = async_std::sync::channel(64);
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
}
}))
}
pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async_std::task::spawn(async move {
let mut id = 0;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
interserver_state_loop(state, server_state_receiver).await;
loop {
//let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
// TOOD: err check and loop with timeout
let socket = async_std::net::TcpStream::connect((ip, port)).await.unwrap();
//let (socket, addr) = listener.accept().await.unwrap();
info!("ship connected to login: {:?}", socket);
id += 1;
let server_id = crate::common::interserver::ServerId(id);
let (client_sender, client_receiver) = async_std::sync::channel(64);
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
loop {
if let Err(_) = socket.peer_addr() {
info!("ship connected to login: {:?}", socket);
break;
}
async_std::task::sleep(Duration::from_secs(10)).await;
}
}
}))
}

53
src/common/mainloop/mod.rs

@ -0,0 +1,53 @@
mod client;
mod interserver;
use std::time::Duration;
use std::pin::Pin;
use futures::future::{Future, join_all, FutureExt};
use log::{trace, info, warn};
use async_std::sync::{Arc, Mutex};
use async_std::io::prelude::{ReadExt, WriteExt};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage};
use crate::common::mainloop::client::client_accept_mainloop;
use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop};
pub use crate::common::mainloop::client::NetworkError;
use crate::patch::patch::PatchServerState;
use crate::login::login::LoginServerState;
use crate::login::character::CharacterServerState;
use crate::ship::ship::ShipServerState;
use crate::entity::gateway::entitygateway::EntityGateway;
pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let patch_state = Arc::new(Mutex::new(patch_state));
let client_mainloop = client_accept_mainloop(patch_state, patch_port);
Box::pin(client_mainloop)
}
pub fn login_mainloop<EG: EntityGateway + 'static>(login_state: LoginServerState<EG>, login_port: u16, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let login_state = Arc::new(Mutex::new(login_state));
let client_mainloop = client_accept_mainloop(login_state.clone(), login_port);
let ship_communication_mainloop = login_listen_mainloop(login_state.clone(), comm_port);
Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ()))
}
pub fn character_mainloop<EG: EntityGateway + 'static>(character_state: CharacterServerState<EG>, character_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let character_state = Arc::new(Mutex::new(character_state));
let client_mainloop = client_accept_mainloop(character_state, character_port);
Box::pin(client_mainloop)
}
pub fn ship_mainloop<EG: EntityGateway + 'static>(ship_state: ShipServerState<EG>, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let ship_state = Arc::new(Mutex::new(ship_state));
let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port);
let login_communication_mainloop = ship_connect_mainloop(ship_state.clone(), comm_ip, comm_port);
Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ()))
}

1
src/common/mod.rs

@ -2,6 +2,7 @@ pub mod cipherkeys;
pub mod serverstate;
pub mod mainloop;
pub mod leveltable;
pub mod interserver;
// https://www.reddit.com/r/rust/comments/33xhhu/how_to_create_an_array_of_structs_that_havent/
#[macro_export]

1
src/common/serverstate.rs

@ -17,6 +17,7 @@ pub trait SendServerPacket: Sized + Sync {
fn as_bytes(&self) -> Vec<u8>;
}
// TODO: rename this trait, this isn't the state but the actionability of the state re: the client
#[async_trait::async_trait]
pub trait ServerState {
type SendPacket: SendServerPacket;

3
src/entity/character.rs

@ -1,5 +1,6 @@
use std::convert::{From, Into};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use libpso::packet::ship::{UpdateConfig, WriteInfoboard};
use libpso::character::character::{DEFAULT_PALETTE_CONFIG, DEFAULT_TECH_MENU};
@ -235,7 +236,7 @@ pub struct CharacterMaterials {
pub tp: u32,
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct CharacterEntityId(pub u32);
#[derive(Clone)]

23
src/login/login.rs

@ -12,11 +12,13 @@ use libpso::util::array_to_utf8;
use crate::common::cipherkeys::{ELSEWHERE_PRIVATE_KEY, ELSEWHERE_PARRAY};
use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ServerState, OnConnect, ClientId};
use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage};
use crate::entity::gateway::EntityGateway;
use crate::entity::account::{UserAccountEntity};
pub const LOGIN_PORT: u16 = 12000;
pub const COMMUNICATION_PORT: u16 = 12123;
#[derive(Debug)]
pub enum LoginError {
@ -138,6 +140,27 @@ impl<EG: EntityGateway> ServerState for LoginServerState<EG> {
}
}
#[async_trait::async_trait]
impl<EG: EntityGateway> InterserverActor for LoginServerState<EG> {
type SendMessage = LoginMessage;
type RecvMessage = ShipMessage;
type Error = ();
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> {
Vec::new()
}
async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error> {
Ok(Vec::new())
}
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> {
Vec::new()
}
}
#[cfg(test)]
mod test {
use std::time::SystemTime;

21
src/ship/ship.rs

@ -15,6 +15,7 @@ use libpso::packet::ship::{BLOCK_MENU_ID, ROOM_MENU_ID};
use crate::common::cipherkeys::{ELSEWHERE_PRIVATE_KEY, ELSEWHERE_PARRAY};
use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ServerState, OnConnect, ClientId};
use crate::common::leveltable::CharacterLevelTable;
use crate::common::interserver::{ServerId, InterserverActor, LoginMessage, ShipMessage};
use crate::entity::gateway::EntityGateway;
use crate::entity::account::{UserAccountEntity, UserSettingsEntity};
@ -490,3 +491,23 @@ impl<EG: EntityGateway> ServerState for ShipServerState<EG> {
}).collect()
}
}
#[async_trait::async_trait]
impl<EG: EntityGateway> InterserverActor for ShipServerState<EG> {
type SendMessage = ShipMessage;
type RecvMessage = LoginMessage;
type Error = ();
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> {
Vec::new()
}
async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error> {
Ok(Vec::new())
}
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> {
Vec::new()
}
}
Loading…
Cancel
Save