#![allow(unused_imports)] mod client; mod interserver; use std::collections::HashMap; use log::{trace, info, warn}; use std::pin::Pin; use futures::future::{Future, join_all, FutureExt}; use async_std::sync::{Arc, Mutex, RwLock}; use std::fmt::Debug; use async_std::io::prelude::{ReadExt, WriteExt}; //use crate::common::mainloop::client::client_accept_mainloop; //use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop}; pub use crate::common::mainloop::client::NetworkError; use crate::common::mainloop::client::PacketReceiver; use crate::common::serverstate::ClientId; use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; use crate::common::interserver::{ServerId, InterserverActor}; use crate::patch::patch::PatchServerState; use crate::login::login::LoginServerState; use crate::login::character::CharacterServerState; //use crate::ship::ship::ShipServerState; use crate::entity::gateway::entitygateway::EntityGateway; use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use async_std::channel; /* pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin>> { let patch_state = Arc::new(Mutex::new(patch_state)); let client_mainloop = client_accept_mainloop(patch_state, patch_port); Box::pin(client_mainloop) } pub fn login_mainloop(login_state: LoginServerState, login_port: u16) -> Pin>> { let login_state = Arc::new(Mutex::new(login_state)); let client_mainloop = client_accept_mainloop(login_state, login_port); Box::pin(client_mainloop) } pub fn character_mainloop(character_state: CharacterServerState, character_port: u16, comm_port: u16) -> Pin>> { let character_state = Arc::new(Mutex::new(character_state)); let client_mainloop = client_accept_mainloop(character_state.clone(), character_port); let ship_communication_mainloop = login_listen_mainloop(character_state, comm_port); Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ())) } pub fn ship_mainloop(ship_state: ShipServerState, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin>> { let ship_state = Arc::new(Mutex::new(ship_state)); let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port); let login_communication_mainloop = ship_connect_mainloop(ship_state, comm_ip, comm_port); Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ())) } */ #[derive(Debug)] enum MessageReceiverError { //InvalidSize, InvalidPayload, //NetworkError(std::io::Error), Disconnected, } struct MessageReceiver { socket: async_std::net::TcpStream, } impl MessageReceiver { fn new(socket: async_std::net::TcpStream) -> MessageReceiver { MessageReceiver { socket, } } async fn recv(&mut self) -> Result { let mut size_buf = [0u8; 4]; self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?; let size = u32::from_le_bytes(size_buf) as usize; let mut payload = vec![0u8; size]; self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?; let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?; let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?; Ok(msg) } } /* enum ServerAction { NewClient(ClientId, channel::Sender), Packet(ClientId, S), Disconnect(ClientId), } */ async fn recv_loop(mut state: STATE, socket: async_std::net::TcpStream, client_id: ClientId, cipher: C, clients: Arc>>>) where STATE: ServerState + Send, S: SendServerPacket + Debug + Send, R: RecvServerPacket + Debug + Send, C: PSOCipher + Send, E: std::fmt::Debug + Send, { let mut pkt_receiver = PacketReceiver::new(socket, cipher); loop { match pkt_receiver.recv_pkts::().await { Ok(pkts) => { for pkt in pkts { info!("[recv from {:?}] {:#?}", client_id, pkt); match state.handle(client_id, pkt).await { Ok(response) => { for resp in response { clients .read() .await .get(&resp.0) .unwrap() .send(resp.1) .await; } }, Err(err) => { warn!("[client recv {:?}] error {:?} ", client_id, err); } } } }, Err(err) => { match err { NetworkError::ClientDisconnected => { info!("[client recv {:?}] disconnected", client_id); for pkt in state.on_disconnect(client_id).await.unwrap() { clients .read() .await .get(&pkt.0) .unwrap() .send(pkt.1) .await; } clients .write() .await .remove(&client_id); break; } _ => { warn!("[client {:?} recv error] {:?}", client_id, err); } } } } } } async fn send_pkt(socket: &mut async_std::net::TcpStream, cipher: &mut C, pkt: &S) -> Result<(), NetworkError> where S: SendServerPacket + std::fmt::Debug, C: PSOCipher, { let buf = pkt.as_bytes(); trace!("[send buf] {:?}", buf); let cbuf = cipher.encrypt(&buf)?; socket.write_all(&cbuf).await?; Ok(()) } async fn send_loop(mut socket: async_std::net::TcpStream, client_id: ClientId, mut cipher: C, packet_queue: channel::Receiver) where S: SendServerPacket + std::fmt::Debug, C: PSOCipher, { loop { let pkt = packet_queue.recv().await.unwrap(); if let Err(err) = send_pkt(&mut socket, &mut cipher, &pkt).await { warn!("error sending pkt {:#?} to {:?} {:?}", pkt, client_id, err); } } } /* pub async fn server_multiplex(state: STATE, packet_queue: channel::Receiver>) where STATE: ServerState, S: SendServerPacket + std::fmt::Debug, R: RecvServerPacket + std::fmt::Debug, E: std::fmt::Debug, { let mut clients = HashMap::new(); loop { let action = packet_queue.recv().await.unwrap(); match action { ServerAction::NewClient(client_id, sender) => { clients.insert(client_id, sender); }, ServerAction::Packet(client_id, pkt) => { if let Some(sender) = clients.get(&client_id) { sender.send(pkt).await; } }, ServerAction::Disconnect(client_id) => { clients.remove(&client_id); } } } } */ pub async fn run_server(mut state: STATE, port: u16) where STATE: ServerState + Send + 'static, S: SendServerPacket + std::fmt::Debug + Send + 'static, R: RecvServerPacket + std::fmt::Debug + Send, C: PSOCipher + Send + 'static, E: std::fmt::Debug + Send, { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); let mut id = 0; //let (packet_sender, packet_receiver) = async_std::channel::unbounded(); let clients = Arc::new(RwLock::new(HashMap::new())); //let cstate = state.clone(); /* async_std::task::spawn(async move { server_multiplex(cstate, packet_receiver).await }); */ loop { let (mut socket, addr) = listener.accept().await.unwrap(); id += 1; let client_id = crate::common::serverstate::ClientId(id); info!("new client {:?} {:?} {:?}", client_id, socket, addr); let (client_tx, client_rx) = async_std::channel::unbounded(); //packet_sender.send(ServerAction::NewClient()).await; clients .write() .await .insert(client_id, client_tx.clone()); let mut cipher_in: Option = None; let mut cipher_out: Option = None; for action in state.on_connect(client_id).await.unwrap() { match action { OnConnect::Cipher(cin, cout) => { cipher_in = Some(cin); cipher_out = Some(cout); }, OnConnect::Packet(pkt) => { send_pkt(&mut socket, &mut NullCipher {}, &pkt).await; } } } let rstate = state.clone(); let rsocket = socket.clone(); let rclients = clients.clone(); async_std::task::spawn(async move { /* rstate; rsocket; client_id; cipher_in.unwrap(); rclients; */ //client_tx.send(12).await recv_loop(rstate, rsocket, client_id, cipher_in.unwrap(), rclients).await //recv_loop2(rstate, rsocket, client_id, cipher_in.unwrap()).await }); //let sstate = state.clone(); async_std::task::spawn(async move { send_loop(socket, client_id, cipher_out.unwrap(), client_rx).await }); } } /* pub async fn listen_interserver(state: STATE, port: u16) where STATE: InterserverActor, S: serde::Serialize, R: serde::de::DeserializeOwned, { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); let mut id = 0; loop { let (socket, addr) = listener.accept().await.unwrap(); info!("new interserver connection: {:?} {:?}", socket, addr); id += 1; let server_id = crate::common::interserver::ServerId(id); } } pub async fn run_interserver_receiver(state: STATE, ip: std::net::Ipv4Addr, port: u16) where STATE: InterserverActor, S: serde::Serialize, R: serde::de::DeserializeOwned, { loop { } } pub async fn run_interserver_sender(state: STATE, to_send: channel::Receiver) where STATE: InterserverActor, S: serde::Serialize, R: serde::de::DeserializeOwned, { loop { let msg = to_send.recv().await.unwrap(); let response = state.on_action(msg); } } */ async fn interserver_recv_loop(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc>>>) where STATE: InterserverActor + Send, S: serde::Serialize + Debug + Send, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let mut msg_receiver = MessageReceiver::new(socket); loop { match msg_receiver.recv::().await { Ok(msg) => { info!("[interserver recv {:?}] {:?}", server_id, msg); match state.on_action(server_id, msg).await { Ok(response) => { for resp in response { ships .read() .await .get(&resp.0) .unwrap() .send(resp.1) .await; } }, Err(err) => { warn!("[interserver recv {:?}] error {:?}", server_id, err); } } }, Err(err) => { if let MessageReceiverError::Disconnected = err { info!("[interserver recv {:?}] disconnected", server_id); for (_, sender) in ships.read().await.iter() { for pkt in state.on_disconnect(server_id).await { ships .read() .await .get(&pkt.0) .unwrap() .send(pkt.1) .await; } } ships .write() .await .remove(&server_id); break; } info!("[interserver recv {:?}] error {:?}", server_id, err); } } } } async fn interserver_send_loop(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver) where S: serde::Serialize + std::fmt::Debug, { loop { let msg = to_send.recv().await.unwrap(); let payload = serde_json::to_string(&msg); if let Ok(payload) = payload { let len_bytes = u32::to_le_bytes(payload.len() as u32); if let Err(err) = socket.write_all(&len_bytes).await { warn!("[interserver send {:?}] failed: {:?}", server_id, err); break; } if let Err(err) = socket.write_all(payload.as_bytes()).await { warn!("[interserver send {:?}] failed: {:?}", server_id, err); break; } } } } pub async fn run_interserver_listen(mut state: STATE, port: u16) where STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); let mut id = 0; let ships = Arc::new(RwLock::new(HashMap::new())); loop { let (socket, addr) = listener.accept().await.unwrap(); info!("[interserver listen] new server: {:?} {:?}", socket, addr); id += 1; let server_id = crate::common::interserver::ServerId(id); let (client_tx, client_rx) = async_std::channel::unbounded(); //let sclient_tx = client_tx.clone(); /* state.set_sender(server_id, Arc::new(Box::new(move |msg| { let sclient_tx = sclient_tx.clone(); Box::new(async move { sclient_tx.send(msg).await; })}))); */ state.set_sender(server_id, client_tx.clone()); ships .write() .await .insert(server_id, client_tx.clone()); for msg in state.on_connect(server_id).await { if let Some(ship_sender) = ships.read().await.get(&msg.0) { ship_sender.send(msg.1).await; } } let rstate = state.clone(); let rsocket = socket.clone(); let rships = ships.clone(); async_std::task::spawn(async move { interserver_recv_loop(rstate, server_id, rsocket, rships).await; }); async_std::task::spawn(async move { interserver_send_loop(server_id, socket, client_rx).await; }); } } pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16) where STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let mut id = 0; loop { info!("[interserver connect] trying to connect to server"); let socket = match async_std::net::TcpStream::connect((ip, port)).await { Ok(socket) => socket, Err(err) => { info!("err trying to connect to loginserv {:?}", err); async_std::task::sleep(std::time::Duration::from_secs(10)).await; continue; } }; id += 1; let server_id = crate::common::interserver::ServerId(id); info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket); let (client_tx, client_rx) = async_std::channel::unbounded(); state.set_sender(server_id, client_tx.clone()); /* let sclient_tx = client_tx.clone(); state.set_sender(server_id, Arc::new(Box::new(move |msg| { let sclient_tx = sclient_tx.clone(); Box::new(async move { sclient_tx.send(msg).await; })}))); */ let other_server = vec![(server_id, client_tx.clone())].into_iter().collect(); let rstate = state.clone(); let rsocket = socket.clone(); async_std::task::spawn(async move { interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await; }); let ssocket = socket.clone(); async_std::task::spawn(async move { interserver_send_loop(server_id, ssocket, client_rx).await; }); let mut buf = [0u8; 1]; loop { let peek = socket.peek(&mut buf).await; match peek { Ok(len) if len == 0 => { break }, _ => { } } } } }