Browse Source

add ability for servers to manually send messages to eachother

pbs
jake 4 years ago
parent
commit
a3db5b65e2
  1. 24
      src/common/mainloop/interserver.rs
  2. 7
      src/login/character.rs
  3. 7
      src/ship/ship.rs

24
src/common/mainloop/interserver.rs

@ -186,7 +186,7 @@ pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<Chara
let mut id = 0; let mut id = 0;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
interserver_state_loop(state, server_state_receiver).await;
interserver_state_loop(state.clone(), server_state_receiver).await;
loop { loop {
let (socket, addr) = listener.accept().await.unwrap(); let (socket, addr) = listener.accept().await.unwrap();
@ -194,9 +194,17 @@ pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<Chara
id += 1; id += 1;
let server_id = crate::common::interserver::ServerId(id); let server_id = crate::common::interserver::ServerId(id);
let (client_sender, client_receiver) = async_std::sync::channel(64); let (client_sender, client_receiver) = async_std::sync::channel(64);
{
let mut state = state.lock().await;
let local_sender = client_sender.clone();
state.set_sender(server_id, Box::new(|message| {
async_std::task::block_on(local_sender.send(message));
}))
}
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
interserver_send_loop(server_id, socket.clone(), client_receiver).await; interserver_send_loop(server_id, socket.clone(), client_receiver).await;
} }
@ -206,9 +214,9 @@ pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<Chara
pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) -> Pin<Box<dyn Future<Output = ()>>> { pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async_std::task::spawn(async move { Box::pin(async_std::task::spawn(async move {
let mut id = 0; let mut id = 0;
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
interserver_state_loop(state, server_state_receiver).await;
interserver_state_loop(state.clone(), server_state_receiver).await;
loop { loop {
info!("trying to connect to loginserv"); info!("trying to connect to loginserv");
@ -225,6 +233,14 @@ pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipS
info!("found loginserv: {:?} {:?}", server_id, socket); info!("found loginserv: {:?} {:?}", server_id, socket);
let (client_sender, client_receiver) = async_std::sync::channel(64); let (client_sender, client_receiver) = async_std::sync::channel(64);
{
let mut state = state.lock().await;
let local_sender = client_sender.clone();
state.set_sender(Box::new(move |message| {
async_std::task::block_on(local_sender.send(message));
}))
}
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await; login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
interserver_send_loop(server_id, socket.clone(), client_receiver).await; interserver_send_loop(server_id, socket.clone(), client_receiver).await;

7
src/login/character.rs

@ -178,6 +178,7 @@ pub struct CharacterServerState<EG: EntityGateway> {
level_table: CharacterLevelTable, level_table: CharacterLevelTable,
auth_token: AuthToken, auth_token: AuthToken,
authenticated_ships: BTreeSet<ServerId>, authenticated_ships: BTreeSet<ServerId>,
ship_sender: BTreeMap<ServerId, Box<dyn Fn(LoginMessage) + Send>>,
} }
@ -296,9 +297,14 @@ impl<EG: EntityGateway> CharacterServerState<EG> {
level_table: CharacterLevelTable::new(), level_table: CharacterLevelTable::new(),
auth_token: auth_token, auth_token: auth_token,
authenticated_ships: BTreeSet::new(), authenticated_ships: BTreeSet::new(),
ship_sender: BTreeMap::new(),
} }
} }
pub fn set_sender(&mut self, server_id: ServerId, sender: Box<dyn Fn(LoginMessage) + Send>) {
self.ship_sender.insert(server_id, sender);
}
async fn validate_login(&mut self, id: ClientId, pkt: &Login) -> Result<Vec<SendCharacterPacket>, anyhow::Error> { async fn validate_login(&mut self, id: ClientId, pkt: &Login) -> Result<Vec<SendCharacterPacket>, anyhow::Error> {
let client = self.clients.get_mut(&id).ok_or(CharacterError::ClientNotFound(id))?; let client = self.clients.get_mut(&id).ok_or(CharacterError::ClientNotFound(id))?;
Ok(match get_login_status(&self.entity_gateway, pkt).await.and_then(check_if_already_online) { Ok(match get_login_status(&self.entity_gateway, pkt).await.and_then(check_if_already_online) {
@ -595,6 +601,7 @@ impl<EG: EntityGateway> InterserverActor for CharacterServerState<EG> {
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> {
self.ships.remove(&id); self.ships.remove(&id);
self.ship_sender.remove(&id);
Vec::new() Vec::new()
} }
} }

7
src/ship/ship.rs

@ -351,6 +351,7 @@ impl<EG: EntityGateway> ShipServerStateBuilder<EG> {
port: self.port.unwrap_or(SHIP_PORT), port: self.port.unwrap_or(SHIP_PORT),
shops: Box::new(ItemShops::new()), shops: Box::new(ItemShops::new()),
auth_token: self.auth_token.unwrap_or(AuthToken("".into())), auth_token: self.auth_token.unwrap_or(AuthToken("".into())),
shipgate_sender: None,
} }
} }
} }
@ -368,6 +369,8 @@ pub struct ShipServerState<EG: EntityGateway> {
port: u16, port: u16,
shops: Box<ItemShops>, shops: Box<ItemShops>,
auth_token: AuthToken, auth_token: AuthToken,
shipgate_sender: Option<Box<dyn Fn(ShipMessage) + Send>>,
} }
impl<EG: EntityGateway> ShipServerState<EG> { impl<EG: EntityGateway> ShipServerState<EG> {
@ -375,6 +378,10 @@ impl<EG: EntityGateway> ShipServerState<EG> {
ShipServerStateBuilder::new() ShipServerStateBuilder::new()
} }
pub fn set_sender(&mut self, sender: Box<dyn Fn(ShipMessage) + Send>) {
self.shipgate_sender = Some(sender);
}
async fn message(&mut self, id: ClientId, msg: &Message) -> Result<Box<dyn Iterator<Item = (ClientId, SendShipPacket)> + Send>, anyhow::Error> { async fn message(&mut self, id: ClientId, msg: &Message) -> Result<Box<dyn Iterator<Item = (ClientId, SendShipPacket)> + Send>, anyhow::Error> {
Ok(match &msg.msg { Ok(match &msg.msg {
GameMessage::RequestExp(request_exp) => { GameMessage::RequestExp(request_exp) => {

Loading…
Cancel
Save