You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
8.0 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. use std::time::Duration;
  2. use std::pin::Pin;
  3. use futures::future::Future;
  4. use log::{info, warn};
  5. use async_std::sync::{Arc, RwLock};
  6. use async_std::io::prelude::{ReadExt, WriteExt};
  7. use std::collections::HashMap;
  8. use serde::Serialize;
  9. use serde::de::DeserializeOwned;
  10. use crate::common::interserver::{ServerId, InterserverActor};
  11. use libpso::crypto::{PSOCipher, NullCipher, CipherError};
  12. use crate::common::serverstate::{ServerState, SendServerPacket, RecvServerPacket};
  13. use crate::login::character::CharacterServerState;
  14. use entity::gateway::entitygateway::EntityGateway;
  15. use async_std::channel;
  16. use std::fmt::Debug;
  17. #[derive(Debug)]
  18. enum MessageReceiverError {
  19. //InvalidSize,
  20. InvalidPayload,
  21. //NetworkError(std::io::Error),
  22. Disconnected,
  23. }
  24. struct MessageReceiver {
  25. socket: async_std::net::TcpStream,
  26. }
  27. impl MessageReceiver {
  28. fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
  29. MessageReceiver {
  30. socket,
  31. }
  32. }
  33. async fn recv<R: serde::de::DeserializeOwned + std::fmt::Debug>(&mut self) -> Result<R, MessageReceiverError> {
  34. let mut size_buf = [0u8; 4];
  35. self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
  36. let size = u32::from_le_bytes(size_buf) as usize;
  37. let mut payload = vec![0u8; size];
  38. self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?;
  39. let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
  40. let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
  41. Ok(msg)
  42. }
  43. }
  44. async fn interserver_recv_loop<STATE, S, R, E>(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>)
  45. where
  46. STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send,
  47. S: serde::Serialize + Debug + Send,
  48. R: serde::de::DeserializeOwned + Debug + Send,
  49. E: Debug + Send,
  50. {
  51. let mut msg_receiver = MessageReceiver::new(socket);
  52. loop {
  53. match msg_receiver.recv::<R>().await {
  54. Ok(msg) => {
  55. info!("[interserver recv {:?}] {:?}", server_id, msg);
  56. match state.on_action(server_id, msg).await {
  57. Ok(response) => {
  58. for resp in response {
  59. ships
  60. .read()
  61. .await
  62. .get(&resp.0)
  63. .unwrap()
  64. .send(resp.1)
  65. .await
  66. .unwrap();
  67. }
  68. },
  69. Err(err) => {
  70. warn!("[interserver recv {:?}] error {:?}", server_id, err);
  71. }
  72. }
  73. },
  74. Err(err) => {
  75. if let MessageReceiverError::Disconnected = err {
  76. info!("[interserver recv {:?}] disconnected", server_id);
  77. for (_, _sender) in ships.read().await.iter() {
  78. for pkt in state.on_disconnect(server_id).await {
  79. ships
  80. .read()
  81. .await
  82. .get(&pkt.0)
  83. .unwrap()
  84. .send(pkt.1)
  85. .await
  86. .unwrap();
  87. }
  88. }
  89. ships
  90. .write()
  91. .await
  92. .remove(&server_id);
  93. break;
  94. }
  95. info!("[interserver recv {:?}] error {:?}", server_id, err);
  96. }
  97. }
  98. }
  99. }
  100. async fn interserver_send_loop<S>(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver<S>)
  101. where
  102. S: serde::Serialize + std::fmt::Debug,
  103. {
  104. loop {
  105. let msg = to_send.recv().await.unwrap();
  106. let payload = serde_json::to_string(&msg);
  107. if let Ok(payload) = payload {
  108. let len_bytes = u32::to_le_bytes(payload.len() as u32);
  109. if let Err(err) = socket.write_all(&len_bytes).await {
  110. warn!("[interserver send {:?}] failed: {:?}", server_id, err);
  111. break;
  112. }
  113. if let Err(err) = socket.write_all(payload.as_bytes()).await {
  114. warn!("[interserver send {:?}] failed: {:?}", server_id, err);
  115. break;
  116. }
  117. }
  118. }
  119. }
  120. pub async fn run_interserver_listen<STATE, S, R, E>(mut state: STATE, port: u16)
  121. where
  122. STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
  123. S: serde::Serialize + Debug + Send + 'static,
  124. R: serde::de::DeserializeOwned + Debug + Send,
  125. E: Debug + Send,
  126. {
  127. let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
  128. let mut id = 0;
  129. let ships = Arc::new(RwLock::new(HashMap::new()));
  130. loop {
  131. let (socket, addr) = listener.accept().await.unwrap();
  132. info!("[interserver listen] new server: {:?} {:?}", socket, addr);
  133. id += 1;
  134. let server_id = crate::common::interserver::ServerId(id);
  135. let (client_tx, client_rx) = async_std::channel::unbounded();
  136. state.set_sender(server_id, client_tx.clone()).await;
  137. ships
  138. .write()
  139. .await
  140. .insert(server_id, client_tx.clone());
  141. for msg in state.on_connect(server_id).await {
  142. if let Some(ship_sender) = ships.read().await.get(&msg.0) {
  143. ship_sender.send(msg.1).await.unwrap();
  144. }
  145. }
  146. let rstate = state.clone();
  147. let rsocket = socket.clone();
  148. let rships = ships.clone();
  149. async_std::task::spawn(async move {
  150. interserver_recv_loop(rstate, server_id, rsocket, rships).await;
  151. });
  152. async_std::task::spawn(async move {
  153. interserver_send_loop(server_id, socket, client_rx).await;
  154. });
  155. }
  156. }
  157. pub async fn run_interserver_connect<STATE, S, R, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16)
  158. where
  159. STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
  160. S: serde::Serialize + Debug + Send + 'static,
  161. R: serde::de::DeserializeOwned + Debug + Send,
  162. E: Debug + Send,
  163. {
  164. let mut id = 0;
  165. loop {
  166. info!("[interserver connect] trying to connect to server");
  167. let socket = match async_std::net::TcpStream::connect((ip, port)).await {
  168. Ok(socket) => socket,
  169. Err(err) => {
  170. info!("err trying to connect to loginserv {:?}", err);
  171. async_std::task::sleep(std::time::Duration::from_secs(10)).await;
  172. continue;
  173. }
  174. };
  175. id += 1;
  176. let server_id = crate::common::interserver::ServerId(id);
  177. info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket);
  178. let (client_tx, client_rx) = async_std::channel::unbounded();
  179. state.set_sender(server_id, client_tx.clone()).await;
  180. for msg in state.on_connect(server_id).await {
  181. client_tx.send(msg.1).await.unwrap();
  182. }
  183. let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
  184. let rstate = state.clone();
  185. let rsocket = socket.clone();
  186. async_std::task::spawn(async move {
  187. interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await;
  188. });
  189. let ssocket = socket.clone();
  190. async_std::task::spawn(async move {
  191. interserver_send_loop(server_id, ssocket, client_rx).await;
  192. });
  193. let mut buf = [0u8; 1];
  194. loop {
  195. let peek = socket.peek(&mut buf).await;
  196. if let Ok(0) = peek {
  197. break
  198. }
  199. }
  200. }
  201. }