Browse Source

properly handle disconnections in interserver connections

pbs
jake 4 years ago
parent
commit
578e4c7cf8
  1. 40
      src/common/mainloop/interserver.rs

40
src/common/mainloop/interserver.rs

@ -39,7 +39,7 @@ impl MessageReceiver {
let size = u32::from_le_bytes(size_buf) as usize; let size = u32::from_le_bytes(size_buf) as usize;
let mut payload = vec![0u8; size]; let mut payload = vec![0u8; size];
self.socket.read_exact(&mut payload).await.map_err(|err| MessageReceiverError::Disconnected)?;
self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?;
let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?; let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?; let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
@ -65,7 +65,13 @@ where
loop { loop {
info!("interserver loop"); info!("interserver loop");
let action = action_receiver.recv().await.unwrap();
let action = match action_receiver.recv().await {
Ok(action) => action,
Err(err) => {
warn!("error in iterserver state loop {:?}", err);
continue;
}
};
let mut state = state.lock().await; let mut state = state.lock().await;
match action { match action {
@ -94,12 +100,12 @@ where
}, },
InterserverInputAction::Disconnect(server_id) => { InterserverInputAction::Disconnect(server_id) => {
let actions = state.on_disconnect(server_id).await; let actions = state.on_disconnect(server_id).await;
ships.remove(&server_id);
for (server, action) in actions { for (server, action) in actions {
if let Some(sender) = ships.get_mut(&server) { if let Some(sender) = ships.get_mut(&server) {
sender.send(action).await; sender.send(action).await;
} }
} }
break;
} }
} }
} }
@ -164,7 +170,8 @@ where
} }
}, },
Err(err) => { Err(err) => {
warn!("error in send_loop: {:?}, {:?}", server_id, err)
warn!("error in send_loop: {:?}, {:?}", server_id, err);
break;
} }
} }
} }
@ -204,20 +211,33 @@ pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipS
interserver_state_loop(state, server_state_receiver).await; interserver_state_loop(state, server_state_receiver).await;
loop { loop {
let socket = async_std::net::TcpStream::connect((ip, port)).await.unwrap();
info!("trying to connect to loginserv");
let socket = match async_std::net::TcpStream::connect((ip, port)).await {
Ok(socket) => socket,
Err(err) => {
info!("err trying to connect to loginserv {:?}", err);
async_std::task::sleep(Duration::from_secs(10)).await;
continue;
}
};
id += 1; id += 1;
let server_id = crate::common::interserver::ServerId(id); let server_id = crate::common::interserver::ServerId(id);
info!("found loginserv: {:?} {:?}", server_id, socket);
let (client_sender, client_receiver) = async_std::sync::channel(64); let (client_sender, client_receiver) = async_std::sync::channel(64);
info!("ship connected to login: {:?}", socket);
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
interserver_send_loop(server_id, socket.clone(), client_receiver).await; interserver_send_loop(server_id, socket.clone(), client_receiver).await;
let mut buf = [0u8; 1];
loop { loop {
if let Err(_) = socket.peer_addr() {
info!("ship connected to login: {:?}", socket);
break;
let peek = socket.peek(&mut buf).await;
match peek {
Ok(len) if len == 0 => {
break
},
_ => {
}
} }
async_std::task::sleep(Duration::from_secs(10)).await;
} }
} }
})) }))

Loading…
Cancel
Save