Browse Source

refactor mainloops

pbs
jake 4 years ago
parent
commit
c00f851f7e
  1. 24
      src/bin/main.rs
  2. 144
      src/common/mainloop.rs

24
src/bin/main.rs

@ -11,6 +11,7 @@ use elseware::entity::character::NewCharacterEntity;
use elseware::entity::item::{NewItemEntity, ItemDetail, ItemLocation}; use elseware::entity::item::{NewItemEntity, ItemDetail, ItemLocation};
use elseware::entity::item; use elseware::entity::item;
use elseware::common::mainloop::*;
fn setup_logger() { fn setup_logger() {
let colors = fern::colors::ColoredLevelConfig::new() let colors = fern::colors::ColoredLevelConfig::new()
@ -185,39 +186,28 @@ fn main() {
}).await; }).await;
} }
let patch = async_std::task::spawn(async {
info!("[patch] starting server"); info!("[patch] starting server");
let patch_config = load_config(); let patch_config = load_config();
let patch_motd = load_motd(); let patch_motd = load_motd();
let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str());
let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd);
elseware::common::mainloop::mainloop_async(patch_state, patch_config.port).await;
});
let patch_loop = patch_mainloop(patch_state, patch_config.port);
let thread_entity_gateway = entity_gateway.clone(); let thread_entity_gateway = entity_gateway.clone();
let auth = async_std::task::spawn(async {
info!("[auth] starting server"); info!("[auth] starting server");
let auth_state = LoginServerState::new(thread_entity_gateway);
elseware::common::mainloop::mainloop_async(auth_state, elseware::login::login::LOGIN_PORT).await;
});
let login_state = LoginServerState::new(thread_entity_gateway);
let login_loop = login_mainloop(login_state, elseware::login::login::LOGIN_PORT);
let thread_entity_gateway = entity_gateway.clone(); let thread_entity_gateway = entity_gateway.clone();
let character = async_std::task::spawn(async {
info!("[character] starting server"); info!("[character] starting server");
let char_state = CharacterServerState::new(thread_entity_gateway); let char_state = CharacterServerState::new(thread_entity_gateway);
elseware::common::mainloop::mainloop_async(char_state, elseware::login::character::CHARACTER_PORT).await;
});
let character_loop = character_mainloop(char_state, elseware::login::character::CHARACTER_PORT);
let thread_entity_gateway = entity_gateway.clone(); let thread_entity_gateway = entity_gateway.clone();
let ship = async_std::task::spawn(async {
info!("[ship] starting server"); info!("[ship] starting server");
let ship_state = ShipServerState::new(thread_entity_gateway); let ship_state = ShipServerState::new(thread_entity_gateway);
elseware::common::mainloop::mainloop_async(ship_state, elseware::ship::ship::SHIP_PORT).await;
});
let ship_loop = ship_mainloop(ship_state, elseware::ship::ship::SHIP_PORT);
futures::join!(patch, auth, character, ship);
futures::future::join_all(vec![patch_loop, login_loop, character_loop, ship_loop]).await;
}); });
} }

144
src/common/mainloop.rs

@ -1,14 +1,24 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::pin::Pin;
use futures::future::Future;
use log::{trace, info, warn}; use log::{trace, info, warn};
use async_std::sync::{Arc, Mutex}; use async_std::sync::{Arc, Mutex};
use async_std::io::prelude::{ReadExt, WriteExt}; use async_std::io::prelude::{ReadExt, WriteExt};
use std::collections::HashMap; use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use libpso::crypto::{PSOCipher, NullCipher, CipherError};
use libpso::PacketParseError; use libpso::PacketParseError;
use crate::common::serverstate::ClientId; use crate::common::serverstate::ClientId;
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
use crate::common::interserver::InterserverActor;
use crate::patch::patch::PatchServerState;
use crate::login::login::LoginServerState;
use crate::login::character::CharacterServerState;
use crate::ship::ship::ShipServerState;
use crate::entity::gateway::entitygateway::EntityGateway;
#[derive(Debug)] #[derive(Debug)]
pub enum NetworkError { pub enum NetworkError {
@ -265,7 +275,7 @@ async fn client_send_loop<S>(client_id: ClientId,
} }
pub async fn mainloop_async<STATE, S, R, E>(state: STATE, port: u16) where
pub async fn simple_mainloop<STATE, S, R, E>(state: STATE, port: u16) where
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static, STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
@ -299,3 +309,135 @@ pub async fn mainloop_async<STATE, S, R, E>(state: STATE, port: u16) where
listener.await listener.await
} }
////////////////////////////////////////////////////////////
async fn state_client_loop<STATE, S, R, E>(state: Arc<Mutex<STATE>>,
server_state_receiver: async_std::sync::Receiver<ClientAction<ServerStateAction<S>, R>>) where
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
S: SendServerPacket + std::fmt::Debug + Send + 'static,
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
E: std::fmt::Debug + Send,
{
async_std::task::spawn(async move {
let mut clients = HashMap::new();
loop {
let action = server_state_receiver.recv().await.unwrap();
let mut state = state.lock().await;
match action {
ClientAction::NewClient(client_id, sender) => {
clients.insert(client_id, sender.clone());
for action in state.on_connect(client_id) {
match action {
OnConnect::Cipher((inc, outc)) => {
sender.send(ServerStateAction::Cipher(inc, outc)).await;
},
OnConnect::Packet(pkt) => {
sender.send(ServerStateAction::Packet(pkt)).await;
}
}
}
},
ClientAction::Packet(client_id, pkt) => {
let pkts = state.handle(client_id, &pkt).await;
match pkts {
Ok(pkts) => {
for (client_id, pkt) in pkts {
if let Some(client) = clients.get_mut(&client_id) {
client.send(ServerStateAction::Packet(pkt)).await;
}
}
},
Err(err) => {
warn!("[client {:?} state handler error] {:?}", client_id, err);
}
}
},
ClientAction::Disconnect(client_id) => {
let pkts = state.on_disconnect(client_id);
for (client_id, pkt) in pkts {
if let Some(client) = clients.get_mut(&client_id) {
client.send(ServerStateAction::Packet(pkt)).await;
}
}
if let Some(client) = clients.get_mut(&client_id) {
client.send(ServerStateAction::Disconnect).await;
}
}
}
}
});
}
pub fn client_accept_mainloop<STATE, S, R, E>(state: Arc<Mutex<STATE>>, client_port: u16) -> impl Future<Output = ()>
where
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
E: std::fmt::Debug + Send,
{
let listener = async_std::task::spawn(async move {
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap();
let mut id = 1;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
state_client_loop(state, server_state_receiver).await;
loop {
let (sock, addr) = listener.accept().await.unwrap();
let client_id = crate::common::serverstate::ClientId(id);
id += 1;
info!("new client {:?} {:?} {:?}", client_id, sock, addr);
let (client_sender, client_receiver) = async_std::sync::channel(64);
let socket = Arc::new(sock);
let cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
let cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await;
client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await;
}
});
listener
}
pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let patch_state = Arc::new(Mutex::new(patch_state));
let client_mainloop = client_accept_mainloop(patch_state, patch_port);
Box::pin(client_mainloop)
}
pub fn login_mainloop<EG: EntityGateway + 'static>(login_state: LoginServerState<EG>, login_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let login_state = Arc::new(Mutex::new(login_state));
let client_mainloop = client_accept_mainloop(login_state.clone(), login_port);
//let ship_communication_mainloop = interserver_listen_mainloop(login_state.clone(), ship_listen_port);
Box::pin(client_mainloop)
}
pub fn character_mainloop<EG: EntityGateway + 'static>(character_state: CharacterServerState<EG>, character_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let character_state = Arc::new(Mutex::new(character_state));
let client_mainloop = client_accept_mainloop(character_state, character_port);
Box::pin(client_mainloop)
}
pub fn ship_mainloop<EG: EntityGateway + 'static>(ship_state: ShipServerState<EG>, ship_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
let ship_state = Arc::new(Mutex::new(ship_state));
let client_mainloop = client_accept_mainloop(ship_state, ship_port);
//let login_mainloop = ship_to_login_mainloop(ship_state, login_port);
//let admin_mainloop = ship_admin_mainloop(ship_state, admin_port);
//futures::future::join_all(vec![client_mainloop, login_mainloop, admin_mainloop])
Box::pin(client_mainloop)
}
Loading…
Cancel
Save