diff --git a/src/common/interserver.rs b/src/common/interserver.rs index ac37d65..a645f95 100644 --- a/src/common/interserver.rs +++ b/src/common/interserver.rs @@ -56,6 +56,5 @@ pub trait InterserverActor: Clone { async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error>; async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; - //fn set_sender(&mut self, server_id: ServerId, func: Arc Box>>>); - fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender); + async fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender); } diff --git a/src/common/mainloop/client.rs b/src/common/mainloop/client.rs index 456fb82..6cf7de4 100644 --- a/src/common/mainloop/client.rs +++ b/src/common/mainloop/client.rs @@ -1,10 +1,10 @@ -use std::pin::pin; +use std::collections::HashMap; +use std::fmt::Debug; +use async_std::channel; +use async_std::io::prelude::{ReadExt, WriteExt}; +use async_std::sync::{Arc, RwLock}; use futures::future::Future; use log::{trace, info, warn}; -use async_std::sync::{Arc, Mutex}; -use async_std::io::prelude::{ReadExt, WriteExt}; -use std::collections::HashMap; -use std::pin::Pin; use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use libpso::PacketParseError; @@ -42,7 +42,6 @@ impl From for NetworkError { pub struct PacketReceiver { socket: async_std::net::TcpStream, - //cipher: Arc>>, cipher: C, recv_buffer: Vec, incoming_data: Vec, @@ -115,209 +114,160 @@ impl PacketReceiver { Ok(result) } } -/* - -async fn send_pkt(socket: Arc, - cipher: Arc>>, pkt: S) - -> Result<(), NetworkError> -{ - let buf = pkt.as_bytes(); - trace!("[send buf] {:?}", buf); - let cbuf = cipher.lock().await.encrypt(&buf)?; - let mut ssock = &*socket; - ssock.write_all(&cbuf).await?; - Ok(()) -} - -enum ClientAction { - NewClient(ClientId, async_std::channel::Sender), - Packet(ClientId, R), - Disconnect(ClientId), -} - -enum ServerStateAction { - Cipher(Box, Box), - Packet(S), - Disconnect, -} - -fn client_recv_loop(client_id: ClientId, - socket: Arc, - cipher: Arc>>, - server_sender: async_std::channel::Sender, R>>, - client_sender: async_std::channel::Sender>) +async fn recv_loop(mut state: STATE, + socket: async_std::net::TcpStream, + client_id: ClientId, + cipher: C, + clients: Arc>>>) where - S: SendServerPacket + std::fmt::Debug + Send + 'static, - R: RecvServerPacket + std::fmt::Debug + Send + 'static, + STATE: ServerState + Send, + S: SendServerPacket + Debug + Send, + R: RecvServerPacket + Debug + Send, + C: PSOCipher + Send, + E: std::fmt::Debug + Send, { - async_std::task::spawn(async move { - server_sender.send(ClientAction::NewClient(client_id, client_sender)).await.unwrap(); - /* - let mut pkt_receiver = PacketReceiver::new(*socket, cipher); - - loop { - match pkt_receiver.recv_pkts().await { - Ok(pkts) => { - for pkt in pkts { - info!("[recv from {:?}] {:#?}", client_id, pkt); - server_sender.send(ClientAction::Packet(client_id, pkt)).await.unwrap(); - } - }, - Err(err) => { - match err { - NetworkError::ClientDisconnected => { - trace!("[client disconnected] {:?}", client_id); - server_sender.send(ClientAction::Disconnect(client_id)).await.unwrap(); - break; + let mut pkt_receiver = PacketReceiver::new(socket, cipher); + loop { + match pkt_receiver.recv_pkts::().await { + Ok(pkts) => { + for pkt in pkts { + info!("[recv from {:?}] {:#?}", client_id, pkt); + match state.handle(client_id, pkt).await { + Ok(response) => { + for resp in response { + clients + .read() + .await + .get(&resp.0) + .unwrap() + .send(resp.1) + .await + .unwrap(); + } + }, + Err(err) => { + warn!("[client recv {:?}] error {:?} ", client_id, err); } - _ => { - warn!("[client {:?} recv error] {:?}", client_id, err); + } + } + }, + Err(err) => { + match err { + NetworkError::ClientDisconnected => { + info!("[client recv {:?}] disconnected", client_id); + for pkt in state.on_disconnect(client_id).await.unwrap() { + clients + .read() + .await + .get(&pkt.0) + .unwrap() + .send(pkt.1) + .await + .unwrap(); } + clients + .write() + .await + .remove(&client_id); + break; + } + _ => { + warn!("[client {:?} recv error] {:?}", client_id, err); } } } } - */ - }); + } } -fn client_send_loop(client_id: ClientId, - socket: Arc, - cipher_in: Arc>>, - cipher_out: Arc>>, - client_receiver: async_std::channel::Receiver>) + +async fn send_pkt(socket: &mut async_std::net::TcpStream, + cipher: &mut C, + pkt: &S) + -> Result<(), NetworkError> where - S: SendServerPacket + std::fmt::Debug + Send + 'static, + S: SendServerPacket + std::fmt::Debug, + C: PSOCipher, { - async_std::task::spawn(async move { - loop { - let action = client_receiver.recv().await.unwrap(); - match action { - ServerStateAction::Cipher(inc, outc) => { - *cipher_in.lock().await = inc; - *cipher_out.lock().await = outc; - } - ServerStateAction::Packet(pkt) => { - info!("[send to {:?}] {:#?}", client_id, pkt); - if let Err(err) = send_pkt(socket.clone(), cipher_out.clone(), pkt).await { - warn!("[client {:?} send error ] {:?}", client_id, err); - } - }, - ServerStateAction::Disconnect => { - break; - } - }; - } - }); + let buf = pkt.as_bytes(); + trace!("[send buf] {:?}", buf); + let cbuf = cipher.encrypt(&buf)?; + socket.write_all(&cbuf).await?; + Ok(()) } -fn state_client_loop(state: Arc>, - server_state_receiver: async_std::channel::Receiver, R>>) where - STATE: ServerState + Send + 'static, - S: SendServerPacket + std::fmt::Debug + Send + 'static, - R: RecvServerPacket + std::fmt::Debug + Send + 'static, - E: std::fmt::Debug + Send, +async fn send_loop(mut socket: async_std::net::TcpStream, client_id: ClientId, mut cipher: C, packet_queue: channel::Receiver) +where + S: SendServerPacket + std::fmt::Debug, + C: PSOCipher, { - async_std::task::spawn(async move { - let mut clients = HashMap::new(); - - loop { - let action = server_state_receiver.recv().await.unwrap(); - let mut state = state.lock().await; - - match action { - ClientAction::NewClient(client_id, sender) => { - let actions = state.on_connect(client_id).await; - match actions { - Ok(actions) => { - for action in actions { - match action { - OnConnect::Cipher((inc, outc)) => { - sender.send(ServerStateAction::Cipher(inc, outc)).await.unwrap(); - }, - OnConnect::Packet(pkt) => { - sender.send(ServerStateAction::Packet(pkt)).await.unwrap(); - } - } - } - }, - Err(err) => { - warn!("[client {:?} state on_connect error] {:?}", client_id, err); - } - } - clients.insert(client_id, sender); - }, - ClientAction::Packet(client_id, pkt) => { - let pkts = state.handle(client_id, &pkt).await; - match pkts { - Ok(pkts) => { - for (client_id, pkt) in pkts { - if let Some(client) = clients.get_mut(&client_id) { - client.send(ServerStateAction::Packet(pkt)).await.unwrap(); - } - } - }, - Err(err) => { - warn!("[client {:?} state handler error] {:?}", client_id, err); - } - } - }, - ClientAction::Disconnect(client_id) => { - let pkts = state.on_disconnect(client_id).await; - match pkts { - Ok(pkts) => { - for (client_id, pkt) in pkts { - if let Some(client) = clients.get_mut(&client_id) { - client.send(ServerStateAction::Packet(pkt)).await.unwrap(); - } - } - - if let Some(client) = clients.get_mut(&client_id) { - client.send(ServerStateAction::Disconnect).await.unwrap(); - } - } - Err(err) => { - warn!("[client {:?} state on_disconnect error] {:?}", client_id, err); - } - } + loop { + match packet_queue.recv().await { + Ok(pkt) => { + if let Err(err) = send_pkt(&mut socket, &mut cipher, &pkt).await { + warn!("error sending pkt {:#?} to {:?} {:?}", pkt, client_id, err); } + }, + Err(err) => { + info!("send to {:?} failed: {:?}", client_id, err); + break; } } - }); + } } - -pub fn client_accept_mainloop(state: Arc>, client_port: u16) -> Pin>> +pub async fn run_server(mut state: STATE, port: u16) where - STATE: ServerState + Send + 'static, - S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, - R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, + STATE: ServerState + Send + 'static, + S: SendServerPacket + std::fmt::Debug + Send + 'static, + R: RecvServerPacket + std::fmt::Debug + Send, + C: PSOCipher + Send + 'static, E: std::fmt::Debug + Send, { - Box::pin(async_std::task::spawn(async move { - let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap(); - let mut id = 0; + let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); + let mut id = 0; - let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024); - state_client_loop(state, server_state_receiver); + let clients = Arc::new(RwLock::new(HashMap::new())); - loop { - let (sock, addr) = listener.accept().await.unwrap(); - id += 1; - let client_id = crate::common::serverstate::ClientId(id); + loop { + let (mut socket, addr) = listener.accept().await.unwrap(); + id += 1; - info!("new client {:?} {:?} {:?}", client_id, sock, addr); + let client_id = crate::common::serverstate::ClientId(id); + info!("new client {:?} {:?} {:?}", client_id, socket, addr); - let (client_sender, client_receiver) = async_std::channel::bounded(64); - let socket = Arc::new(sock); - let cipher_in: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); - let cipher_out: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); + let (client_tx, client_rx) = async_std::channel::unbounded(); - client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender); - client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver); + clients + .write() + .await + .insert(client_id, client_tx.clone()); + + let mut cipher_in: Option = None; + let mut cipher_out: Option = None; + + for action in state.on_connect(client_id).await.unwrap() { + match action { + OnConnect::Cipher(cin, cout) => { + cipher_in = Some(cin); + cipher_out = Some(cout); + }, + OnConnect::Packet(pkt) => { + send_pkt(&mut socket, &mut NullCipher {}, &pkt).await.unwrap(); + } + } } - })) -} -*/ + let rstate = state.clone(); + let rsocket = socket.clone(); + let rclients = clients.clone(); + async_std::task::spawn(async move { + recv_loop(rstate, rsocket, client_id, cipher_in.unwrap(), rclients).await + }); + + async_std::task::spawn(async move { + send_loop(socket, client_id, cipher_out.unwrap(), client_rx).await + }); + } +} diff --git a/src/common/mainloop/interserver.rs b/src/common/mainloop/interserver.rs index aab00bb..dbe940d 100644 --- a/src/common/mainloop/interserver.rs +++ b/src/common/mainloop/interserver.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::pin::Pin; use futures::future::Future; use log::{info, warn}; -use async_std::sync::{Arc, Mutex}; +use async_std::sync::{Arc, RwLock}; use async_std::io::prelude::{ReadExt, WriteExt}; use std::collections::HashMap; use serde::Serialize; @@ -17,6 +17,8 @@ use crate::login::character::CharacterServerState; use crate::entity::gateway::entitygateway::EntityGateway; use async_std::channel; +use std::fmt::Debug; + #[derive(Debug)] enum MessageReceiverError { @@ -37,7 +39,7 @@ impl MessageReceiver { } } - async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { let mut size_buf = [0u8; 4]; self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?; let size = u32::from_le_bytes(size_buf) as usize; @@ -50,219 +52,182 @@ impl MessageReceiver { Ok(msg) } } -/* - -#[derive(Debug)] -enum InterserverInputAction { - NewConnection(ServerId, async_std::channel::Sender), - Message(ServerId, R), - Disconnect(ServerId), -} -async fn interserver_state_loop(state: Arc>, action_receiver: async_std::channel::Receiver>) +async fn interserver_recv_loop(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc>>>) where - A: InterserverActor + Send + 'static, - S: Serialize + Send + 'static, - R: DeserializeOwned + Send + 'static, + STATE: InterserverActor + Send, + S: serde::Serialize + Debug + Send, + R: serde::de::DeserializeOwned + Debug + Send, + E: Debug + Send, { - async_std::task::spawn(async move { - let mut ships = HashMap::new(); - - loop { - info!("interserver loop"); - let action = match action_receiver.recv().await { - Ok(action) => action, - Err(err) => { - warn!("error in iterserver state loop {:?}", err); - continue; - } - }; - let mut state = state.lock().await; - - match action { - InterserverInputAction::NewConnection(server_id, ship_action_sender) => { - ships.insert(server_id, ship_action_sender); - for (server, action) in state.on_connect(server_id).await { - if let Some(sender) = ships.get_mut(&server) { - sender.send(action).await.unwrap(); - } - } - }, - InterserverInputAction::Message(server_id, message) => { - let actions = state.action(server_id, message).await; - match actions { - Ok(actions) => { - for (server, action) in actions{ - if let Some(sender) = ships.get_mut(&server) { - sender.send(action).await.unwrap(); - } - } - }, - Err(err) => { - warn!("[server {:?} state handler error] {:?}", server_id, err); + let mut msg_receiver = MessageReceiver::new(socket); + + loop { + match msg_receiver.recv::().await { + Ok(msg) => { + info!("[interserver recv {:?}] {:?}", server_id, msg); + match state.on_action(server_id, msg).await { + Ok(response) => { + for resp in response { + ships + .read() + .await + .get(&resp.0) + .unwrap() + .send(resp.1) + .await + .unwrap(); } + }, + Err(err) => { + warn!("[interserver recv {:?}] error {:?}", server_id, err); } - }, - InterserverInputAction::Disconnect(server_id) => { - let actions = state.on_disconnect(server_id).await; - ships.remove(&server_id); - for (server, action) in actions { - if let Some(sender) = ships.get_mut(&server) { - sender.send(action).await.unwrap(); + } + }, + Err(err) => { + if let MessageReceiverError::Disconnected = err { + info!("[interserver recv {:?}] disconnected", server_id); + for (_, _sender) in ships.read().await.iter() { + for pkt in state.on_disconnect(server_id).await { + ships + .read() + .await + .get(&pkt.0) + .unwrap() + .send(pkt.1) + .await + .unwrap(); } } + ships + .write() + .await + .remove(&server_id); + break; } + info!("[interserver recv {:?}] error {:?}", server_id, err); } } - }); + } } -async fn login_recv_loop(server_id: ServerId, - socket: async_std::net::TcpStream, - state_loop_sender: async_std::channel::Sender>, - output_loop_sender: async_std::channel::Sender) +async fn interserver_send_loop(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver) where - S: Serialize + std::fmt::Debug + Send + 'static, - R: DeserializeOwned + std::fmt::Debug + Send + 'static, + S: serde::Serialize + std::fmt::Debug, { - async_std::task::spawn(async move { - state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await.unwrap(); - let mut msg_receiver = MessageReceiver::new(socket); - - loop { - info!("login recv loop"); - match msg_receiver.recv().await { - Ok(msg) => { - info!("[login recv loop msg] {:?}", msg); - state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await.unwrap(); - }, - Err(err) => { - if let MessageReceiverError::Disconnected = err { - info!("[login recv loop disconnect] {:?}", server_id); - state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await.unwrap(); - break; - } - info!("[login recv loop err] {:?}", err); - } + loop { + let msg = to_send.recv().await.unwrap(); + let payload = serde_json::to_string(&msg); + + if let Ok(payload) = payload { + let len_bytes = u32::to_le_bytes(payload.len() as u32); + if let Err(err) = socket.write_all(&len_bytes).await { + warn!("[interserver send {:?}] failed: {:?}", server_id, err); + break; + } + if let Err(err) = socket.write_all(payload.as_bytes()).await { + warn!("[interserver send {:?}] failed: {:?}", server_id, err); + break; } } - }); + } } -async fn interserver_send_loop(server_id: ServerId, - mut socket: async_std::net::TcpStream, - output_loop_receiver: async_std::channel::Receiver) +pub async fn run_interserver_listen(mut state: STATE, port: u16) where - S: Serialize + std::fmt::Debug + Send + 'static, + STATE: InterserverActor + Send + 'static, + S: serde::Serialize + Debug + Send + 'static, + R: serde::de::DeserializeOwned + Debug + Send, + E: Debug + Send, { - async_std::task::spawn(async move { - loop { - info!("login send loop"); - match output_loop_receiver.recv().await { - Ok(msg) => { - let payload = serde_json::to_string(&msg); - if let Ok(payload) = payload { - let len_bytes = u32::to_le_bytes(payload.len() as u32); - - if let Err(err) = socket.write_all(&len_bytes).await { - warn!("interserver send failed: {:?}", err); - break; - } - if let Err(err) = socket.write_all(payload.as_bytes()).await { - warn!("intserserver send failed: {:?}", err); - break; - } - } - }, - Err(err) => { - warn!("error in send_loop: {:?}, {:?}", server_id, err); - break; - } + let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); + let mut id = 0; + let ships = Arc::new(RwLock::new(HashMap::new())); + + loop { + let (socket, addr) = listener.accept().await.unwrap(); + info!("[interserver listen] new server: {:?} {:?}", socket, addr); + + id += 1; + let server_id = crate::common::interserver::ServerId(id); + let (client_tx, client_rx) = async_std::channel::unbounded(); + state.set_sender(server_id, client_tx.clone()).await; + + ships + .write() + .await + .insert(server_id, client_tx.clone()); + + for msg in state.on_connect(server_id).await { + if let Some(ship_sender) = ships.read().await.get(&msg.0) { + ship_sender.send(msg.1).await.unwrap(); } } - }); -} - - - -pub fn login_listen_mainloop(state: Arc>>, port: u16) -> Pin>> { - Box::pin(async_std::task::spawn(async move { - let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); - let mut id = 0; - - let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024); - interserver_state_loop(state.clone(), server_state_receiver).await; - loop { - let (socket, addr) = listener.accept().await.unwrap(); - info!("new ship server: {:?} {:?}", socket, addr); - - id += 1; - let server_id = crate::common::interserver::ServerId(id); - let (client_sender, client_receiver) = async_std::channel::bounded(64); + let rstate = state.clone(); + let rsocket = socket.clone(); + let rships = ships.clone(); + async_std::task::spawn(async move { + interserver_recv_loop(rstate, server_id, rsocket, rships).await; + }); + async_std::task::spawn(async move { + interserver_send_loop(server_id, socket, client_rx).await; + }); + } +} - { - let mut state = state.lock().await; - let local_sender = client_sender.clone(); - state.set_sender(server_id, Box::new(move |message| { - async_std::task::block_on(local_sender.send(message)).unwrap(); - })) +pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16) +where + STATE: InterserverActor + Send + 'static, + S: serde::Serialize + Debug + Send + 'static, + R: serde::de::DeserializeOwned + Debug + Send, + E: Debug + Send, +{ + let mut id = 0; + + loop { + info!("[interserver connect] trying to connect to server"); + let socket = match async_std::net::TcpStream::connect((ip, port)).await { + Ok(socket) => socket, + Err(err) => { + info!("err trying to connect to loginserv {:?}", err); + async_std::task::sleep(std::time::Duration::from_secs(10)).await; + continue; } + }; + id += 1; + let server_id = crate::common::interserver::ServerId(id); + info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket); - login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; - interserver_send_loop(server_id, socket.clone(), client_receiver).await; - } - })) -} -*/ - -/* -pub fn ship_connect_mainloop(state: Arc>>, ip: std::net::Ipv4Addr, port: u16) -> Pin>> { - Box::pin(async_std::task::spawn(async move { - let mut id = 0; - let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024); + let (client_tx, client_rx) = async_std::channel::unbounded(); + state.set_sender(server_id, client_tx.clone()).await; - interserver_state_loop(state.clone(), server_state_receiver).await; + for msg in state.on_connect(server_id).await { + client_tx.send(msg.1).await.unwrap(); + } + let other_server = vec![(server_id, client_tx.clone())].into_iter().collect(); + let rstate = state.clone(); + let rsocket = socket.clone(); + async_std::task::spawn(async move { + interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await; + }); + let ssocket = socket.clone(); + async_std::task::spawn(async move { + interserver_send_loop(server_id, ssocket, client_rx).await; + }); + + let mut buf = [0u8; 1]; loop { - info!("trying to connect to loginserv"); - let socket = match async_std::net::TcpStream::connect((ip, port)).await { - Ok(socket) => socket, - Err(err) => { - info!("err trying to connect to loginserv {:?}", err); - async_std::task::sleep(Duration::from_secs(10)).await; - continue; - } - }; - id += 1; - let server_id = crate::common::interserver::ServerId(id); - info!("found loginserv: {:?} {:?}", server_id, socket); - let (client_sender, client_receiver) = async_std::channel::bounded(64); - - { - let mut state = state.lock().await; - let local_sender = client_sender.clone(); - state.set_sender(Box::new(move |message| { - async_std::task::block_on(local_sender.send(message)).unwrap(); - })) - } - - login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; - interserver_send_loop(server_id, socket.clone(), client_receiver).await; - - let mut buf = [0u8; 1]; - loop { - let peek = socket.peek(&mut buf).await; - match peek { - Ok(len) if len == 0 => { - break - }, - _ => { - } + let peek = socket.peek(&mut buf).await; + match peek { + Ok(len) if len == 0 => { + break + }, + _ => { } } } - })) -} + } -*/ +} diff --git a/src/common/mainloop/mod.rs b/src/common/mainloop/mod.rs index a83375d..d63d682 100644 --- a/src/common/mainloop/mod.rs +++ b/src/common/mainloop/mod.rs @@ -2,547 +2,5 @@ mod client; mod interserver; -use std::collections::HashMap; -use log::{trace, info, warn}; -use std::pin::Pin; -use futures::future::{Future, join_all, FutureExt}; -use async_std::sync::{Arc, Mutex, RwLock}; - -use std::fmt::Debug; - -use async_std::io::prelude::{ReadExt, WriteExt}; -//use crate::common::mainloop::client::client_accept_mainloop; -//use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop}; -pub use crate::common::mainloop::client::NetworkError; -use crate::common::mainloop::client::PacketReceiver; - -use crate::common::serverstate::ClientId; -use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; - -use crate::common::interserver::{ServerId, InterserverActor}; - -use crate::patch::patch::PatchServerState; -use crate::login::login::LoginServerState; -use crate::login::character::CharacterServerState; -//use crate::ship::ship::ShipServerState; -use crate::entity::gateway::entitygateway::EntityGateway; - -use libpso::crypto::{PSOCipher, NullCipher, CipherError}; - -use async_std::channel; - - - - -/* -pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin>> { - let patch_state = Arc::new(Mutex::new(patch_state)); - let client_mainloop = client_accept_mainloop(patch_state, patch_port); - Box::pin(client_mainloop) -} - -pub fn login_mainloop(login_state: LoginServerState, login_port: u16) -> Pin>> { - let login_state = Arc::new(Mutex::new(login_state)); - let client_mainloop = client_accept_mainloop(login_state, login_port); - Box::pin(client_mainloop) -} - -pub fn character_mainloop(character_state: CharacterServerState, character_port: u16, comm_port: u16) -> Pin>> { - let character_state = Arc::new(Mutex::new(character_state)); - let client_mainloop = client_accept_mainloop(character_state.clone(), character_port); - let ship_communication_mainloop = login_listen_mainloop(character_state, comm_port); - Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ())) -} - - -pub fn ship_mainloop(ship_state: ShipServerState, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin>> { - let ship_state = Arc::new(Mutex::new(ship_state)); - let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port); - let login_communication_mainloop = ship_connect_mainloop(ship_state, comm_ip, comm_port); - Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ())) -} -*/ - - -#[derive(Debug)] -enum MessageReceiverError { - //InvalidSize, - InvalidPayload, - //NetworkError(std::io::Error), - Disconnected, -} - -struct MessageReceiver { - socket: async_std::net::TcpStream, -} - -impl MessageReceiver { - fn new(socket: async_std::net::TcpStream) -> MessageReceiver { - MessageReceiver { - socket, - } - } - - async fn recv(&mut self) -> Result { - let mut size_buf = [0u8; 4]; - self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?; - let size = u32::from_le_bytes(size_buf) as usize; - - let mut payload = vec![0u8; size]; - self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?; - let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?; - - let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?; - Ok(msg) - } -} - - -/* -enum ServerAction { - NewClient(ClientId, channel::Sender), - Packet(ClientId, S), - Disconnect(ClientId), -} -*/ - -async fn recv_loop(mut state: STATE, - socket: async_std::net::TcpStream, - client_id: ClientId, - cipher: C, - clients: Arc>>>) -where - STATE: ServerState + Send, - S: SendServerPacket + Debug + Send, - R: RecvServerPacket + Debug + Send, - C: PSOCipher + Send, - E: std::fmt::Debug + Send, -{ - let mut pkt_receiver = PacketReceiver::new(socket, cipher); - loop { - match pkt_receiver.recv_pkts::().await { - Ok(pkts) => { - for pkt in pkts { - info!("[recv from {:?}] {:#?}", client_id, pkt); - match state.handle(client_id, pkt).await { - Ok(response) => { - for resp in response { - clients - .read() - .await - .get(&resp.0) - .unwrap() - .send(resp.1) - .await; - } - }, - Err(err) => { - warn!("[client recv {:?}] error {:?} ", client_id, err); - } - } - } - }, - Err(err) => { - match err { - NetworkError::ClientDisconnected => { - info!("[client recv {:?}] disconnected", client_id); - for pkt in state.on_disconnect(client_id).await.unwrap() { - clients - .read() - .await - .get(&pkt.0) - .unwrap() - .send(pkt.1) - .await; - } - clients - .write() - .await - .remove(&client_id); - break; - } - _ => { - warn!("[client {:?} recv error] {:?}", client_id, err); - } - } - } - } - } -} - - -async fn send_pkt(socket: &mut async_std::net::TcpStream, - cipher: &mut C, - pkt: &S) - -> Result<(), NetworkError> -where - S: SendServerPacket + std::fmt::Debug, - C: PSOCipher, -{ - let buf = pkt.as_bytes(); - trace!("[send buf] {:?}", buf); - let cbuf = cipher.encrypt(&buf)?; - socket.write_all(&cbuf).await?; - Ok(()) -} - -async fn send_loop(mut socket: async_std::net::TcpStream, client_id: ClientId, mut cipher: C, packet_queue: channel::Receiver) -where - S: SendServerPacket + std::fmt::Debug, - C: PSOCipher, -{ - loop { - let pkt = packet_queue.recv().await.unwrap(); - if let Err(err) = send_pkt(&mut socket, &mut cipher, &pkt).await { - warn!("error sending pkt {:#?} to {:?} {:?}", pkt, client_id, err); - } - } -} - -/* -pub async fn server_multiplex(state: STATE, packet_queue: channel::Receiver>) -where - STATE: ServerState, - S: SendServerPacket + std::fmt::Debug, - R: RecvServerPacket + std::fmt::Debug, - E: std::fmt::Debug, -{ - let mut clients = HashMap::new(); - loop { - let action = packet_queue.recv().await.unwrap(); - - match action { - ServerAction::NewClient(client_id, sender) => { - clients.insert(client_id, sender); - }, - ServerAction::Packet(client_id, pkt) => { - if let Some(sender) = clients.get(&client_id) { - sender.send(pkt).await; - } - }, - ServerAction::Disconnect(client_id) => { - clients.remove(&client_id); - } - } - - } -} -*/ - -pub async fn run_server(mut state: STATE, port: u16) -where - STATE: ServerState + Send + 'static, - S: SendServerPacket + std::fmt::Debug + Send + 'static, - R: RecvServerPacket + std::fmt::Debug + Send, - C: PSOCipher + Send + 'static, - E: std::fmt::Debug + Send, -{ - let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); - let mut id = 0; - //let (packet_sender, packet_receiver) = async_std::channel::unbounded(); - - let clients = Arc::new(RwLock::new(HashMap::new())); - - //let cstate = state.clone(); - /* - async_std::task::spawn(async move { - server_multiplex(cstate, packet_receiver).await - }); - */ - - loop { - let (mut socket, addr) = listener.accept().await.unwrap(); - id += 1; - - let client_id = crate::common::serverstate::ClientId(id); - info!("new client {:?} {:?} {:?}", client_id, socket, addr); - - let (client_tx, client_rx) = async_std::channel::unbounded(); - //packet_sender.send(ServerAction::NewClient()).await; - - clients - .write() - .await - .insert(client_id, client_tx.clone()); - - let mut cipher_in: Option = None; - let mut cipher_out: Option = None; - - for action in state.on_connect(client_id).await.unwrap() { - match action { - OnConnect::Cipher(cin, cout) => { - cipher_in = Some(cin); - cipher_out = Some(cout); - }, - OnConnect::Packet(pkt) => { - send_pkt(&mut socket, &mut NullCipher {}, &pkt).await; - } - } - } - - let rstate = state.clone(); - let rsocket = socket.clone(); - let rclients = clients.clone(); - async_std::task::spawn(async move { - /* - rstate; - rsocket; - client_id; - cipher_in.unwrap(); - rclients; - */ - //client_tx.send(12).await - recv_loop(rstate, rsocket, client_id, cipher_in.unwrap(), rclients).await - //recv_loop2(rstate, rsocket, client_id, cipher_in.unwrap()).await - }); - - //let sstate = state.clone(); - async_std::task::spawn(async move { - send_loop(socket, client_id, cipher_out.unwrap(), client_rx).await - }); - } -} - -/* -pub async fn listen_interserver(state: STATE, port: u16) -where - STATE: InterserverActor, - S: serde::Serialize, - R: serde::de::DeserializeOwned, -{ - let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); - let mut id = 0; - - loop { - let (socket, addr) = listener.accept().await.unwrap(); - info!("new interserver connection: {:?} {:?}", socket, addr); - - id += 1; - let server_id = crate::common::interserver::ServerId(id); - } -} - - -pub async fn run_interserver_receiver(state: STATE, ip: std::net::Ipv4Addr, port: u16) -where - STATE: InterserverActor, - S: serde::Serialize, - R: serde::de::DeserializeOwned, -{ - loop { - - } - - -} - -pub async fn run_interserver_sender(state: STATE, to_send: channel::Receiver) -where - STATE: InterserverActor, - S: serde::Serialize, - R: serde::de::DeserializeOwned, -{ - loop { - let msg = to_send.recv().await.unwrap(); - - let response = state.on_action(msg); - - - - - } - -} -*/ - -async fn interserver_recv_loop(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc>>>) -where - STATE: InterserverActor + Send, - S: serde::Serialize + Debug + Send, - R: serde::de::DeserializeOwned + Debug + Send, - E: Debug + Send, -{ - let mut msg_receiver = MessageReceiver::new(socket); - - loop { - match msg_receiver.recv::().await { - Ok(msg) => { - info!("[interserver recv {:?}] {:?}", server_id, msg); - match state.on_action(server_id, msg).await { - Ok(response) => { - for resp in response { - ships - .read() - .await - .get(&resp.0) - .unwrap() - .send(resp.1) - .await; - } - }, - Err(err) => { - warn!("[interserver recv {:?}] error {:?}", server_id, err); - } - } - }, - Err(err) => { - if let MessageReceiverError::Disconnected = err { - info!("[interserver recv {:?}] disconnected", server_id); - for (_, sender) in ships.read().await.iter() { - for pkt in state.on_disconnect(server_id).await { - ships - .read() - .await - .get(&pkt.0) - .unwrap() - .send(pkt.1) - .await; - } - } - ships - .write() - .await - .remove(&server_id); - break; - } - info!("[interserver recv {:?}] error {:?}", server_id, err); - } - } - } -} - -async fn interserver_send_loop(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver) -where - S: serde::Serialize + std::fmt::Debug, -{ - loop { - let msg = to_send.recv().await.unwrap(); - let payload = serde_json::to_string(&msg); - - if let Ok(payload) = payload { - let len_bytes = u32::to_le_bytes(payload.len() as u32); - if let Err(err) = socket.write_all(&len_bytes).await { - warn!("[interserver send {:?}] failed: {:?}", server_id, err); - break; - } - if let Err(err) = socket.write_all(payload.as_bytes()).await { - warn!("[interserver send {:?}] failed: {:?}", server_id, err); - break; - } - } - } -} - -pub async fn run_interserver_listen(mut state: STATE, port: u16) -where - STATE: InterserverActor + Send + 'static, - S: serde::Serialize + Debug + Send + 'static, - R: serde::de::DeserializeOwned + Debug + Send, - E: Debug + Send, -{ - let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); - let mut id = 0; - let ships = Arc::new(RwLock::new(HashMap::new())); - - loop { - let (socket, addr) = listener.accept().await.unwrap(); - info!("[interserver listen] new server: {:?} {:?}", socket, addr); - - id += 1; - let server_id = crate::common::interserver::ServerId(id); - let (client_tx, client_rx) = async_std::channel::unbounded(); - - //let sclient_tx = client_tx.clone(); - /* - state.set_sender(server_id, Arc::new(Box::new(move |msg| { - let sclient_tx = sclient_tx.clone(); - Box::new(async move { - sclient_tx.send(msg).await; - })}))); - */ - state.set_sender(server_id, client_tx.clone()); - - ships - .write() - .await - .insert(server_id, client_tx.clone()); - - for msg in state.on_connect(server_id).await { - if let Some(ship_sender) = ships.read().await.get(&msg.0) { - ship_sender.send(msg.1).await; - } - } - - let rstate = state.clone(); - let rsocket = socket.clone(); - let rships = ships.clone(); - async_std::task::spawn(async move { - interserver_recv_loop(rstate, server_id, rsocket, rships).await; - }); - async_std::task::spawn(async move { - interserver_send_loop(server_id, socket, client_rx).await; - }); - } -} - -pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16) -where - STATE: InterserverActor + Send + 'static, - S: serde::Serialize + Debug + Send + 'static, - R: serde::de::DeserializeOwned + Debug + Send, - E: Debug + Send, -{ - let mut id = 0; - - loop { - info!("[interserver connect] trying to connect to server"); - let socket = match async_std::net::TcpStream::connect((ip, port)).await { - Ok(socket) => socket, - Err(err) => { - info!("err trying to connect to loginserv {:?}", err); - async_std::task::sleep(std::time::Duration::from_secs(10)).await; - continue; - } - }; - id += 1; - let server_id = crate::common::interserver::ServerId(id); - info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket); - - let (client_tx, client_rx) = async_std::channel::unbounded(); - - state.set_sender(server_id, client_tx.clone()); - /* - let sclient_tx = client_tx.clone(); - state.set_sender(server_id, Arc::new(Box::new(move |msg| { - let sclient_tx = sclient_tx.clone(); - Box::new(async move { - sclient_tx.send(msg).await; - })}))); - */ - - let other_server = vec![(server_id, client_tx.clone())].into_iter().collect(); - - let rstate = state.clone(); - let rsocket = socket.clone(); - async_std::task::spawn(async move { - interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await; - }); - let ssocket = socket.clone(); - async_std::task::spawn(async move { - interserver_send_loop(server_id, ssocket, client_rx).await; - }); - - let mut buf = [0u8; 1]; - loop { - let peek = socket.peek(&mut buf).await; - match peek { - Ok(len) if len == 0 => { - break - }, - _ => { - } - } - } - } - -} +pub use self::client::*; +pub use self::interserver::*;