clean up new mainloop stuff
This commit is contained in:
parent
9843274bd8
commit
4b1ded6f7d
@ -56,6 +56,5 @@ pub trait InterserverActor: Clone {
|
|||||||
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
||||||
async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error>;
|
async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error>;
|
||||||
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
||||||
//fn set_sender(&mut self, server_id: ServerId, func: Arc<Box<dyn Fn(Self::SendMessage) -> Box<dyn futures::future::Future<Output = ()>>>>);
|
async fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender<Self::SendMessage>);
|
||||||
fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender<Self::SendMessage>);
|
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use std::pin::pin;
|
use std::collections::HashMap;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use async_std::channel;
|
||||||
|
use async_std::io::prelude::{ReadExt, WriteExt};
|
||||||
|
use async_std::sync::{Arc, RwLock};
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use log::{trace, info, warn};
|
use log::{trace, info, warn};
|
||||||
use async_std::sync::{Arc, Mutex};
|
|
||||||
use async_std::io::prelude::{ReadExt, WriteExt};
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::pin::Pin;
|
|
||||||
|
|
||||||
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
||||||
use libpso::PacketParseError;
|
use libpso::PacketParseError;
|
||||||
@ -42,7 +42,6 @@ impl From<PacketParseError> for NetworkError {
|
|||||||
|
|
||||||
pub struct PacketReceiver<C: PSOCipher> {
|
pub struct PacketReceiver<C: PSOCipher> {
|
||||||
socket: async_std::net::TcpStream,
|
socket: async_std::net::TcpStream,
|
||||||
//cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
|
||||||
cipher: C,
|
cipher: C,
|
||||||
recv_buffer: Vec<u8>,
|
recv_buffer: Vec<u8>,
|
||||||
incoming_data: Vec<u8>,
|
incoming_data: Vec<u8>,
|
||||||
@ -115,209 +114,160 @@ impl<C: PSOCipher> PacketReceiver<C> {
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
|
|
||||||
async fn send_pkt<S: SendServerPacket + Send + std::fmt::Debug>(socket: Arc<async_std::net::TcpStream>,
|
async fn recv_loop<STATE, S, R, C, E>(mut state: STATE,
|
||||||
cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>, pkt: S)
|
socket: async_std::net::TcpStream,
|
||||||
-> Result<(), NetworkError>
|
client_id: ClientId,
|
||||||
|
cipher: C,
|
||||||
|
clients: Arc<RwLock<HashMap<ClientId, channel::Sender<S>>>>)
|
||||||
|
where
|
||||||
|
STATE: ServerState<SendPacket=S, RecvPacket=R, Cipher=C, PacketError=E> + Send,
|
||||||
|
S: SendServerPacket + Debug + Send,
|
||||||
|
R: RecvServerPacket + Debug + Send,
|
||||||
|
C: PSOCipher + Send,
|
||||||
|
E: std::fmt::Debug + Send,
|
||||||
|
{
|
||||||
|
let mut pkt_receiver = PacketReceiver::new(socket, cipher);
|
||||||
|
loop {
|
||||||
|
match pkt_receiver.recv_pkts::<R>().await {
|
||||||
|
Ok(pkts) => {
|
||||||
|
for pkt in pkts {
|
||||||
|
info!("[recv from {:?}] {:#?}", client_id, pkt);
|
||||||
|
match state.handle(client_id, pkt).await {
|
||||||
|
Ok(response) => {
|
||||||
|
for resp in response {
|
||||||
|
clients
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.get(&resp.0)
|
||||||
|
.unwrap()
|
||||||
|
.send(resp.1)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
warn!("[client recv {:?}] error {:?} ", client_id, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
match err {
|
||||||
|
NetworkError::ClientDisconnected => {
|
||||||
|
info!("[client recv {:?}] disconnected", client_id);
|
||||||
|
for pkt in state.on_disconnect(client_id).await.unwrap() {
|
||||||
|
clients
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.get(&pkt.0)
|
||||||
|
.unwrap()
|
||||||
|
.send(pkt.1)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
clients
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.remove(&client_id);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
warn!("[client {:?} recv error] {:?}", client_id, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn send_pkt<S, C>(socket: &mut async_std::net::TcpStream,
|
||||||
|
cipher: &mut C,
|
||||||
|
pkt: &S)
|
||||||
|
-> Result<(), NetworkError>
|
||||||
|
where
|
||||||
|
S: SendServerPacket + std::fmt::Debug,
|
||||||
|
C: PSOCipher,
|
||||||
{
|
{
|
||||||
let buf = pkt.as_bytes();
|
let buf = pkt.as_bytes();
|
||||||
trace!("[send buf] {:?}", buf);
|
trace!("[send buf] {:?}", buf);
|
||||||
let cbuf = cipher.lock().await.encrypt(&buf)?;
|
let cbuf = cipher.encrypt(&buf)?;
|
||||||
let mut ssock = &*socket;
|
socket.write_all(&cbuf).await?;
|
||||||
ssock.write_all(&cbuf).await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_loop<S, C>(mut socket: async_std::net::TcpStream, client_id: ClientId, mut cipher: C, packet_queue: channel::Receiver<S>)
|
||||||
enum ClientAction<S, R> {
|
|
||||||
NewClient(ClientId, async_std::channel::Sender<S>),
|
|
||||||
Packet(ClientId, R),
|
|
||||||
Disconnect(ClientId),
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ServerStateAction<S> {
|
|
||||||
Cipher(Box<dyn PSOCipher + Send + Sync>, Box<dyn PSOCipher + Send + Sync>),
|
|
||||||
Packet(S),
|
|
||||||
Disconnect,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn client_recv_loop<S, R>(client_id: ClientId,
|
|
||||||
socket: Arc<async_std::net::TcpStream>,
|
|
||||||
cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
|
||||||
server_sender: async_std::channel::Sender<ClientAction<ServerStateAction<S>, R>>,
|
|
||||||
client_sender: async_std::channel::Sender<ServerStateAction<S>>)
|
|
||||||
where
|
where
|
||||||
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
S: SendServerPacket + std::fmt::Debug,
|
||||||
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
|
C: PSOCipher,
|
||||||
{
|
{
|
||||||
async_std::task::spawn(async move {
|
loop {
|
||||||
server_sender.send(ClientAction::NewClient(client_id, client_sender)).await.unwrap();
|
match packet_queue.recv().await {
|
||||||
/*
|
Ok(pkt) => {
|
||||||
let mut pkt_receiver = PacketReceiver::new(*socket, cipher);
|
if let Err(err) = send_pkt(&mut socket, &mut cipher, &pkt).await {
|
||||||
|
warn!("error sending pkt {:#?} to {:?} {:?}", pkt, client_id, err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
info!("send to {:?} failed: {:?}", client_id, err);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
pub async fn run_server<STATE, S, R, C, E>(mut state: STATE, port: u16)
|
||||||
match pkt_receiver.recv_pkts().await {
|
where
|
||||||
Ok(pkts) => {
|
STATE: ServerState<SendPacket=S, RecvPacket=R, Cipher=C, PacketError=E> + Send + 'static,
|
||||||
for pkt in pkts {
|
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
||||||
info!("[recv from {:?}] {:#?}", client_id, pkt);
|
R: RecvServerPacket + std::fmt::Debug + Send,
|
||||||
server_sender.send(ClientAction::Packet(client_id, pkt)).await.unwrap();
|
C: PSOCipher + Send + 'static,
|
||||||
}
|
E: std::fmt::Debug + Send,
|
||||||
|
{
|
||||||
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
||||||
|
let mut id = 0;
|
||||||
|
|
||||||
|
let clients = Arc::new(RwLock::new(HashMap::new()));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (mut socket, addr) = listener.accept().await.unwrap();
|
||||||
|
id += 1;
|
||||||
|
|
||||||
|
let client_id = crate::common::serverstate::ClientId(id);
|
||||||
|
info!("new client {:?} {:?} {:?}", client_id, socket, addr);
|
||||||
|
|
||||||
|
let (client_tx, client_rx) = async_std::channel::unbounded();
|
||||||
|
|
||||||
|
clients
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(client_id, client_tx.clone());
|
||||||
|
|
||||||
|
let mut cipher_in: Option<C> = None;
|
||||||
|
let mut cipher_out: Option<C> = None;
|
||||||
|
|
||||||
|
for action in state.on_connect(client_id).await.unwrap() {
|
||||||
|
match action {
|
||||||
|
OnConnect::Cipher(cin, cout) => {
|
||||||
|
cipher_in = Some(cin);
|
||||||
|
cipher_out = Some(cout);
|
||||||
},
|
},
|
||||||
Err(err) => {
|
OnConnect::Packet(pkt) => {
|
||||||
match err {
|
send_pkt(&mut socket, &mut NullCipher {}, &pkt).await.unwrap();
|
||||||
NetworkError::ClientDisconnected => {
|
|
||||||
trace!("[client disconnected] {:?}", client_id);
|
|
||||||
server_sender.send(ClientAction::Disconnect(client_id)).await.unwrap();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
warn!("[client {:?} recv error] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
});
|
let rstate = state.clone();
|
||||||
|
let rsocket = socket.clone();
|
||||||
|
let rclients = clients.clone();
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
recv_loop(rstate, rsocket, client_id, cipher_in.unwrap(), rclients).await
|
||||||
|
});
|
||||||
|
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
send_loop(socket, client_id, cipher_out.unwrap(), client_rx).await
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client_send_loop<S>(client_id: ClientId,
|
|
||||||
socket: Arc<async_std::net::TcpStream>,
|
|
||||||
cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
|
||||||
cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
|
||||||
client_receiver: async_std::channel::Receiver<ServerStateAction<S>>)
|
|
||||||
where
|
|
||||||
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
|
||||||
{
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
let action = client_receiver.recv().await.unwrap();
|
|
||||||
match action {
|
|
||||||
ServerStateAction::Cipher(inc, outc) => {
|
|
||||||
*cipher_in.lock().await = inc;
|
|
||||||
*cipher_out.lock().await = outc;
|
|
||||||
}
|
|
||||||
ServerStateAction::Packet(pkt) => {
|
|
||||||
info!("[send to {:?}] {:#?}", client_id, pkt);
|
|
||||||
if let Err(err) = send_pkt(socket.clone(), cipher_out.clone(), pkt).await {
|
|
||||||
warn!("[client {:?} send error ] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
ServerStateAction::Disconnect => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn state_client_loop<STATE, S, R, E>(state: Arc<Mutex<STATE>>,
|
|
||||||
server_state_receiver: async_std::channel::Receiver<ClientAction<ServerStateAction<S>, R>>) where
|
|
||||||
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
|
|
||||||
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
|
||||||
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
|
|
||||||
E: std::fmt::Debug + Send,
|
|
||||||
{
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
let mut clients = HashMap::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let action = server_state_receiver.recv().await.unwrap();
|
|
||||||
let mut state = state.lock().await;
|
|
||||||
|
|
||||||
match action {
|
|
||||||
ClientAction::NewClient(client_id, sender) => {
|
|
||||||
let actions = state.on_connect(client_id).await;
|
|
||||||
match actions {
|
|
||||||
Ok(actions) => {
|
|
||||||
for action in actions {
|
|
||||||
match action {
|
|
||||||
OnConnect::Cipher((inc, outc)) => {
|
|
||||||
sender.send(ServerStateAction::Cipher(inc, outc)).await.unwrap();
|
|
||||||
},
|
|
||||||
OnConnect::Packet(pkt) => {
|
|
||||||
sender.send(ServerStateAction::Packet(pkt)).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[client {:?} state on_connect error] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
clients.insert(client_id, sender);
|
|
||||||
},
|
|
||||||
ClientAction::Packet(client_id, pkt) => {
|
|
||||||
let pkts = state.handle(client_id, &pkt).await;
|
|
||||||
match pkts {
|
|
||||||
Ok(pkts) => {
|
|
||||||
for (client_id, pkt) in pkts {
|
|
||||||
if let Some(client) = clients.get_mut(&client_id) {
|
|
||||||
client.send(ServerStateAction::Packet(pkt)).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[client {:?} state handler error] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
ClientAction::Disconnect(client_id) => {
|
|
||||||
let pkts = state.on_disconnect(client_id).await;
|
|
||||||
match pkts {
|
|
||||||
Ok(pkts) => {
|
|
||||||
for (client_id, pkt) in pkts {
|
|
||||||
if let Some(client) = clients.get_mut(&client_id) {
|
|
||||||
client.send(ServerStateAction::Packet(pkt)).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(client) = clients.get_mut(&client_id) {
|
|
||||||
client.send(ServerStateAction::Disconnect).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[client {:?} state on_disconnect error] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub fn client_accept_mainloop<STATE, S, R, E>(state: Arc<Mutex<STATE>>, client_port: u16) -> Pin<Box<dyn Future<Output = ()>>>
|
|
||||||
where
|
|
||||||
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
|
|
||||||
S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
E: std::fmt::Debug + Send,
|
|
||||||
{
|
|
||||||
Box::pin(async_std::task::spawn(async move {
|
|
||||||
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), client_port))).await.unwrap();
|
|
||||||
let mut id = 0;
|
|
||||||
|
|
||||||
let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024);
|
|
||||||
state_client_loop(state, server_state_receiver);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (sock, addr) = listener.accept().await.unwrap();
|
|
||||||
id += 1;
|
|
||||||
let client_id = crate::common::serverstate::ClientId(id);
|
|
||||||
|
|
||||||
info!("new client {:?} {:?} {:?}", client_id, sock, addr);
|
|
||||||
|
|
||||||
let (client_sender, client_receiver) = async_std::channel::bounded(64);
|
|
||||||
let socket = Arc::new(sock);
|
|
||||||
let cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
|
|
||||||
let cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
|
|
||||||
|
|
||||||
client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender);
|
|
||||||
client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver);
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
@ -2,7 +2,7 @@ use std::time::Duration;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use async_std::sync::{Arc, Mutex};
|
use async_std::sync::{Arc, RwLock};
|
||||||
use async_std::io::prelude::{ReadExt, WriteExt};
|
use async_std::io::prelude::{ReadExt, WriteExt};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
@ -17,6 +17,8 @@ use crate::login::character::CharacterServerState;
|
|||||||
use crate::entity::gateway::entitygateway::EntityGateway;
|
use crate::entity::gateway::entitygateway::EntityGateway;
|
||||||
|
|
||||||
use async_std::channel;
|
use async_std::channel;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum MessageReceiverError {
|
enum MessageReceiverError {
|
||||||
@ -37,7 +39,7 @@ impl MessageReceiver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn recv<R: DeserializeOwned + std::fmt::Debug + Send>(&mut self) -> Result<R, MessageReceiverError> {
|
async fn recv<R: serde::de::DeserializeOwned + std::fmt::Debug>(&mut self) -> Result<R, MessageReceiverError> {
|
||||||
let mut size_buf = [0u8; 4];
|
let mut size_buf = [0u8; 4];
|
||||||
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
||||||
let size = u32::from_le_bytes(size_buf) as usize;
|
let size = u32::from_le_bytes(size_buf) as usize;
|
||||||
@ -50,219 +52,182 @@ impl MessageReceiver {
|
|||||||
Ok(msg)
|
Ok(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
async fn interserver_recv_loop<STATE, S, R, E>(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>)
|
||||||
enum InterserverInputAction<S, R> {
|
|
||||||
NewConnection(ServerId, async_std::channel::Sender<S>),
|
|
||||||
Message(ServerId, R),
|
|
||||||
Disconnect(ServerId),
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn interserver_state_loop<A, S, R>(state: Arc<Mutex<A>>, action_receiver: async_std::channel::Receiver<InterserverInputAction<S, R>>)
|
|
||||||
where
|
where
|
||||||
A: InterserverActor<SendMessage=S, RecvMessage=R, Error=()> + Send + 'static,
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send,
|
||||||
S: Serialize + Send + 'static,
|
S: serde::Serialize + Debug + Send,
|
||||||
R: DeserializeOwned + Send + 'static,
|
R: serde::de::DeserializeOwned + Debug + Send,
|
||||||
|
E: Debug + Send,
|
||||||
{
|
{
|
||||||
async_std::task::spawn(async move {
|
let mut msg_receiver = MessageReceiver::new(socket);
|
||||||
let mut ships = HashMap::new();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
info!("interserver loop");
|
match msg_receiver.recv::<R>().await {
|
||||||
let action = match action_receiver.recv().await {
|
Ok(msg) => {
|
||||||
Ok(action) => action,
|
info!("[interserver recv {:?}] {:?}", server_id, msg);
|
||||||
Err(err) => {
|
match state.on_action(server_id, msg).await {
|
||||||
warn!("error in iterserver state loop {:?}", err);
|
Ok(response) => {
|
||||||
continue;
|
for resp in response {
|
||||||
}
|
ships
|
||||||
};
|
.read()
|
||||||
let mut state = state.lock().await;
|
.await
|
||||||
|
.get(&resp.0)
|
||||||
match action {
|
.unwrap()
|
||||||
InterserverInputAction::NewConnection(server_id, ship_action_sender) => {
|
.send(resp.1)
|
||||||
ships.insert(server_id, ship_action_sender);
|
.await
|
||||||
for (server, action) in state.on_connect(server_id).await {
|
.unwrap();
|
||||||
if let Some(sender) = ships.get_mut(&server) {
|
|
||||||
sender.send(action).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
InterserverInputAction::Message(server_id, message) => {
|
|
||||||
let actions = state.action(server_id, message).await;
|
|
||||||
match actions {
|
|
||||||
Ok(actions) => {
|
|
||||||
for (server, action) in actions{
|
|
||||||
if let Some(sender) = ships.get_mut(&server) {
|
|
||||||
sender.send(action).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[server {:?} state handler error] {:?}", server_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
InterserverInputAction::Disconnect(server_id) => {
|
|
||||||
let actions = state.on_disconnect(server_id).await;
|
|
||||||
ships.remove(&server_id);
|
|
||||||
for (server, action) in actions {
|
|
||||||
if let Some(sender) = ships.get_mut(&server) {
|
|
||||||
sender.send(action).await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
warn!("[interserver recv {:?}] error {:?}", server_id, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
}
|
Err(err) => {
|
||||||
});
|
if let MessageReceiverError::Disconnected = err {
|
||||||
}
|
info!("[interserver recv {:?}] disconnected", server_id);
|
||||||
|
for (_, _sender) in ships.read().await.iter() {
|
||||||
async fn login_recv_loop<S, R>(server_id: ServerId,
|
for pkt in state.on_disconnect(server_id).await {
|
||||||
socket: async_std::net::TcpStream,
|
ships
|
||||||
state_loop_sender: async_std::channel::Sender<InterserverInputAction<S, R>>,
|
.read()
|
||||||
output_loop_sender: async_std::channel::Sender<S>)
|
.await
|
||||||
where
|
.get(&pkt.0)
|
||||||
S: Serialize + std::fmt::Debug + Send + 'static,
|
.unwrap()
|
||||||
R: DeserializeOwned + std::fmt::Debug + Send + 'static,
|
.send(pkt.1)
|
||||||
{
|
.await
|
||||||
async_std::task::spawn(async move {
|
.unwrap();
|
||||||
state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await.unwrap();
|
|
||||||
let mut msg_receiver = MessageReceiver::new(socket);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
info!("login recv loop");
|
|
||||||
match msg_receiver.recv().await {
|
|
||||||
Ok(msg) => {
|
|
||||||
info!("[login recv loop msg] {:?}", msg);
|
|
||||||
state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await.unwrap();
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
if let MessageReceiverError::Disconnected = err {
|
|
||||||
info!("[login recv loop disconnect] {:?}", server_id);
|
|
||||||
state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await.unwrap();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
info!("[login recv loop err] {:?}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn interserver_send_loop<S>(server_id: ServerId,
|
|
||||||
mut socket: async_std::net::TcpStream,
|
|
||||||
output_loop_receiver: async_std::channel::Receiver<S>)
|
|
||||||
where
|
|
||||||
S: Serialize + std::fmt::Debug + Send + 'static,
|
|
||||||
{
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
info!("login send loop");
|
|
||||||
match output_loop_receiver.recv().await {
|
|
||||||
Ok(msg) => {
|
|
||||||
let payload = serde_json::to_string(&msg);
|
|
||||||
if let Ok(payload) = payload {
|
|
||||||
let len_bytes = u32::to_le_bytes(payload.len() as u32);
|
|
||||||
|
|
||||||
if let Err(err) = socket.write_all(&len_bytes).await {
|
|
||||||
warn!("interserver send failed: {:?}", err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Err(err) = socket.write_all(payload.as_bytes()).await {
|
|
||||||
warn!("intserserver send failed: {:?}", err);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
ships
|
||||||
Err(err) => {
|
.write()
|
||||||
warn!("error in send_loop: {:?}, {:?}", server_id, err);
|
.await
|
||||||
|
.remove(&server_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
info!("[interserver recv {:?}] error {:?}", server_id, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn interserver_send_loop<S>(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver<S>)
|
||||||
|
where
|
||||||
|
S: serde::Serialize + std::fmt::Debug,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
let msg = to_send.recv().await.unwrap();
|
||||||
|
let payload = serde_json::to_string(&msg);
|
||||||
|
|
||||||
|
if let Ok(payload) = payload {
|
||||||
pub fn login_listen_mainloop<EG: EntityGateway + Clone + 'static>(state: Arc<Mutex<CharacterServerState<EG>>>, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
let len_bytes = u32::to_le_bytes(payload.len() as u32);
|
||||||
Box::pin(async_std::task::spawn(async move {
|
if let Err(err) = socket.write_all(&len_bytes).await {
|
||||||
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
||||||
let mut id = 0;
|
break;
|
||||||
|
}
|
||||||
let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024);
|
if let Err(err) = socket.write_all(payload.as_bytes()).await {
|
||||||
interserver_state_loop(state.clone(), server_state_receiver).await;
|
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
||||||
|
break;
|
||||||
loop {
|
|
||||||
let (socket, addr) = listener.accept().await.unwrap();
|
|
||||||
info!("new ship server: {:?} {:?}", socket, addr);
|
|
||||||
|
|
||||||
id += 1;
|
|
||||||
let server_id = crate::common::interserver::ServerId(id);
|
|
||||||
let (client_sender, client_receiver) = async_std::channel::bounded(64);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut state = state.lock().await;
|
|
||||||
let local_sender = client_sender.clone();
|
|
||||||
state.set_sender(server_id, Box::new(move |message| {
|
|
||||||
async_std::task::block_on(local_sender.send(message)).unwrap();
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
|
|
||||||
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
|
|
||||||
}
|
}
|
||||||
}))
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
pub async fn run_interserver_listen<STATE, S, R, E>(mut state: STATE, port: u16)
|
||||||
pub fn ship_connect_mainloop<EG: EntityGateway + Clone + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
where
|
||||||
Box::pin(async_std::task::spawn(async move {
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
||||||
let mut id = 0;
|
S: serde::Serialize + Debug + Send + 'static,
|
||||||
let (server_state_sender, server_state_receiver) = async_std::channel::bounded(1024);
|
R: serde::de::DeserializeOwned + Debug + Send,
|
||||||
|
E: Debug + Send,
|
||||||
|
{
|
||||||
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
||||||
|
let mut id = 0;
|
||||||
|
let ships = Arc::new(RwLock::new(HashMap::new()));
|
||||||
|
|
||||||
interserver_state_loop(state.clone(), server_state_receiver).await;
|
loop {
|
||||||
|
let (socket, addr) = listener.accept().await.unwrap();
|
||||||
|
info!("[interserver listen] new server: {:?} {:?}", socket, addr);
|
||||||
|
|
||||||
loop {
|
id += 1;
|
||||||
info!("trying to connect to loginserv");
|
let server_id = crate::common::interserver::ServerId(id);
|
||||||
let socket = match async_std::net::TcpStream::connect((ip, port)).await {
|
let (client_tx, client_rx) = async_std::channel::unbounded();
|
||||||
Ok(socket) => socket,
|
state.set_sender(server_id, client_tx.clone()).await;
|
||||||
Err(err) => {
|
|
||||||
info!("err trying to connect to loginserv {:?}", err);
|
|
||||||
async_std::task::sleep(Duration::from_secs(10)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
id += 1;
|
|
||||||
let server_id = crate::common::interserver::ServerId(id);
|
|
||||||
info!("found loginserv: {:?} {:?}", server_id, socket);
|
|
||||||
let (client_sender, client_receiver) = async_std::channel::bounded(64);
|
|
||||||
|
|
||||||
{
|
ships
|
||||||
let mut state = state.lock().await;
|
.write()
|
||||||
let local_sender = client_sender.clone();
|
.await
|
||||||
state.set_sender(Box::new(move |message| {
|
.insert(server_id, client_tx.clone());
|
||||||
async_std::task::block_on(local_sender.send(message)).unwrap();
|
|
||||||
}))
|
for msg in state.on_connect(server_id).await {
|
||||||
|
if let Some(ship_sender) = ships.read().await.get(&msg.0) {
|
||||||
|
ship_sender.send(msg.1).await.unwrap();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
|
let rstate = state.clone();
|
||||||
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
|
let rsocket = socket.clone();
|
||||||
|
let rships = ships.clone();
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
interserver_recv_loop(rstate, server_id, rsocket, rships).await;
|
||||||
|
});
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
interserver_send_loop(server_id, socket, client_rx).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut buf = [0u8; 1];
|
pub async fn run_interserver_connect<STATE, S, R, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16)
|
||||||
loop {
|
where
|
||||||
let peek = socket.peek(&mut buf).await;
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
||||||
match peek {
|
S: serde::Serialize + Debug + Send + 'static,
|
||||||
Ok(len) if len == 0 => {
|
R: serde::de::DeserializeOwned + Debug + Send,
|
||||||
break
|
E: Debug + Send,
|
||||||
},
|
{
|
||||||
_ => {
|
let mut id = 0;
|
||||||
}
|
|
||||||
|
loop {
|
||||||
|
info!("[interserver connect] trying to connect to server");
|
||||||
|
let socket = match async_std::net::TcpStream::connect((ip, port)).await {
|
||||||
|
Ok(socket) => socket,
|
||||||
|
Err(err) => {
|
||||||
|
info!("err trying to connect to loginserv {:?}", err);
|
||||||
|
async_std::task::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
id += 1;
|
||||||
|
let server_id = crate::common::interserver::ServerId(id);
|
||||||
|
info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket);
|
||||||
|
|
||||||
|
let (client_tx, client_rx) = async_std::channel::unbounded();
|
||||||
|
state.set_sender(server_id, client_tx.clone()).await;
|
||||||
|
|
||||||
|
for msg in state.on_connect(server_id).await {
|
||||||
|
client_tx.send(msg.1).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
|
||||||
|
let rstate = state.clone();
|
||||||
|
let rsocket = socket.clone();
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await;
|
||||||
|
});
|
||||||
|
let ssocket = socket.clone();
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
interserver_send_loop(server_id, ssocket, client_rx).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut buf = [0u8; 1];
|
||||||
|
loop {
|
||||||
|
let peek = socket.peek(&mut buf).await;
|
||||||
|
match peek {
|
||||||
|
Ok(len) if len == 0 => {
|
||||||
|
break
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}))
|
}
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
}
|
||||||
|
@ -2,547 +2,5 @@
|
|||||||
mod client;
|
mod client;
|
||||||
mod interserver;
|
mod interserver;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
pub use self::client::*;
|
||||||
use log::{trace, info, warn};
|
pub use self::interserver::*;
|
||||||
use std::pin::Pin;
|
|
||||||
use futures::future::{Future, join_all, FutureExt};
|
|
||||||
use async_std::sync::{Arc, Mutex, RwLock};
|
|
||||||
|
|
||||||
use std::fmt::Debug;
|
|
||||||
|
|
||||||
use async_std::io::prelude::{ReadExt, WriteExt};
|
|
||||||
//use crate::common::mainloop::client::client_accept_mainloop;
|
|
||||||
//use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop};
|
|
||||||
pub use crate::common::mainloop::client::NetworkError;
|
|
||||||
use crate::common::mainloop::client::PacketReceiver;
|
|
||||||
|
|
||||||
use crate::common::serverstate::ClientId;
|
|
||||||
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
|
|
||||||
|
|
||||||
use crate::common::interserver::{ServerId, InterserverActor};
|
|
||||||
|
|
||||||
use crate::patch::patch::PatchServerState;
|
|
||||||
use crate::login::login::LoginServerState;
|
|
||||||
use crate::login::character::CharacterServerState;
|
|
||||||
//use crate::ship::ship::ShipServerState;
|
|
||||||
use crate::entity::gateway::entitygateway::EntityGateway;
|
|
||||||
|
|
||||||
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
|
||||||
|
|
||||||
use async_std::channel;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
|
||||||
let patch_state = Arc::new(Mutex::new(patch_state));
|
|
||||||
let client_mainloop = client_accept_mainloop(patch_state, patch_port);
|
|
||||||
Box::pin(client_mainloop)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn login_mainloop<EG: EntityGateway + Clone + 'static>(login_state: LoginServerState<EG>, login_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
|
||||||
let login_state = Arc::new(Mutex::new(login_state));
|
|
||||||
let client_mainloop = client_accept_mainloop(login_state, login_port);
|
|
||||||
Box::pin(client_mainloop)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn character_mainloop<EG: EntityGateway + Clone + 'static>(character_state: CharacterServerState<EG>, character_port: u16, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
|
||||||
let character_state = Arc::new(Mutex::new(character_state));
|
|
||||||
let client_mainloop = client_accept_mainloop(character_state.clone(), character_port);
|
|
||||||
let ship_communication_mainloop = login_listen_mainloop(character_state, comm_port);
|
|
||||||
Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ()))
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub fn ship_mainloop<EG: EntityGateway + Clone + 'static>(ship_state: ShipServerState<EG>, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
|
||||||
let ship_state = Arc::new(Mutex::new(ship_state));
|
|
||||||
let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port);
|
|
||||||
let login_communication_mainloop = ship_connect_mainloop(ship_state, comm_ip, comm_port);
|
|
||||||
Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ()))
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum MessageReceiverError {
|
|
||||||
//InvalidSize,
|
|
||||||
InvalidPayload,
|
|
||||||
//NetworkError(std::io::Error),
|
|
||||||
Disconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct MessageReceiver {
|
|
||||||
socket: async_std::net::TcpStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageReceiver {
|
|
||||||
fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
|
|
||||||
MessageReceiver {
|
|
||||||
socket,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn recv<R: serde::de::DeserializeOwned + std::fmt::Debug>(&mut self) -> Result<R, MessageReceiverError> {
|
|
||||||
let mut size_buf = [0u8; 4];
|
|
||||||
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
|
||||||
let size = u32::from_le_bytes(size_buf) as usize;
|
|
||||||
|
|
||||||
let mut payload = vec![0u8; size];
|
|
||||||
self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
|
||||||
let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
|
||||||
|
|
||||||
let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
|
||||||
Ok(msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
enum ServerAction<S> {
|
|
||||||
NewClient(ClientId, channel::Sender<S>),
|
|
||||||
Packet(ClientId, S),
|
|
||||||
Disconnect(ClientId),
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
async fn recv_loop<STATE, S, R, C, E>(mut state: STATE,
|
|
||||||
socket: async_std::net::TcpStream,
|
|
||||||
client_id: ClientId,
|
|
||||||
cipher: C,
|
|
||||||
clients: Arc<RwLock<HashMap<ClientId, channel::Sender<S>>>>)
|
|
||||||
where
|
|
||||||
STATE: ServerState<SendPacket=S, RecvPacket=R, Cipher=C, PacketError=E> + Send,
|
|
||||||
S: SendServerPacket + Debug + Send,
|
|
||||||
R: RecvServerPacket + Debug + Send,
|
|
||||||
C: PSOCipher + Send,
|
|
||||||
E: std::fmt::Debug + Send,
|
|
||||||
{
|
|
||||||
let mut pkt_receiver = PacketReceiver::new(socket, cipher);
|
|
||||||
loop {
|
|
||||||
match pkt_receiver.recv_pkts::<R>().await {
|
|
||||||
Ok(pkts) => {
|
|
||||||
for pkt in pkts {
|
|
||||||
info!("[recv from {:?}] {:#?}", client_id, pkt);
|
|
||||||
match state.handle(client_id, pkt).await {
|
|
||||||
Ok(response) => {
|
|
||||||
for resp in response {
|
|
||||||
clients
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&resp.0)
|
|
||||||
.unwrap()
|
|
||||||
.send(resp.1)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[client recv {:?}] error {:?} ", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
match err {
|
|
||||||
NetworkError::ClientDisconnected => {
|
|
||||||
info!("[client recv {:?}] disconnected", client_id);
|
|
||||||
for pkt in state.on_disconnect(client_id).await.unwrap() {
|
|
||||||
clients
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&pkt.0)
|
|
||||||
.unwrap()
|
|
||||||
.send(pkt.1)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
clients
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.remove(&client_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
warn!("[client {:?} recv error] {:?}", client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async fn send_pkt<S, C>(socket: &mut async_std::net::TcpStream,
|
|
||||||
cipher: &mut C,
|
|
||||||
pkt: &S)
|
|
||||||
-> Result<(), NetworkError>
|
|
||||||
where
|
|
||||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||||
C: PSOCipher,
|
|
||||||
{
|
|
||||||
let buf = pkt.as_bytes();
|
|
||||||
trace!("[send buf] {:?}", buf);
|
|
||||||
let cbuf = cipher.encrypt(&buf)?;
|
|
||||||
socket.write_all(&cbuf).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_loop<S, C>(mut socket: async_std::net::TcpStream, client_id: ClientId, mut cipher: C, packet_queue: channel::Receiver<S>)
|
|
||||||
where
|
|
||||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||||
C: PSOCipher,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
let pkt = packet_queue.recv().await.unwrap();
|
|
||||||
if let Err(err) = send_pkt(&mut socket, &mut cipher, &pkt).await {
|
|
||||||
warn!("error sending pkt {:#?} to {:?} {:?}", pkt, client_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
pub async fn server_multiplex<STATE, S, R, E>(state: STATE, packet_queue: channel::Receiver<ServerAction<S>>)
|
|
||||||
where
|
|
||||||
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E>,
|
|
||||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||||
R: RecvServerPacket + std::fmt::Debug,
|
|
||||||
E: std::fmt::Debug,
|
|
||||||
{
|
|
||||||
let mut clients = HashMap::new();
|
|
||||||
loop {
|
|
||||||
let action = packet_queue.recv().await.unwrap();
|
|
||||||
|
|
||||||
match action {
|
|
||||||
ServerAction::NewClient(client_id, sender) => {
|
|
||||||
clients.insert(client_id, sender);
|
|
||||||
},
|
|
||||||
ServerAction::Packet(client_id, pkt) => {
|
|
||||||
if let Some(sender) = clients.get(&client_id) {
|
|
||||||
sender.send(pkt).await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
ServerAction::Disconnect(client_id) => {
|
|
||||||
clients.remove(&client_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
pub async fn run_server<STATE, S, R, C, E>(mut state: STATE, port: u16)
|
|
||||||
where
|
|
||||||
STATE: ServerState<SendPacket=S, RecvPacket=R, Cipher=C, PacketError=E> + Send + 'static,
|
|
||||||
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
|
||||||
R: RecvServerPacket + std::fmt::Debug + Send,
|
|
||||||
C: PSOCipher + Send + 'static,
|
|
||||||
E: std::fmt::Debug + Send,
|
|
||||||
{
|
|
||||||
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
||||||
let mut id = 0;
|
|
||||||
//let (packet_sender, packet_receiver) = async_std::channel::unbounded();
|
|
||||||
|
|
||||||
let clients = Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
|
|
||||||
//let cstate = state.clone();
|
|
||||||
/*
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
server_multiplex(cstate, packet_receiver).await
|
|
||||||
});
|
|
||||||
*/
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (mut socket, addr) = listener.accept().await.unwrap();
|
|
||||||
id += 1;
|
|
||||||
|
|
||||||
let client_id = crate::common::serverstate::ClientId(id);
|
|
||||||
info!("new client {:?} {:?} {:?}", client_id, socket, addr);
|
|
||||||
|
|
||||||
let (client_tx, client_rx) = async_std::channel::unbounded();
|
|
||||||
//packet_sender.send(ServerAction::NewClient()).await;
|
|
||||||
|
|
||||||
clients
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(client_id, client_tx.clone());
|
|
||||||
|
|
||||||
let mut cipher_in: Option<C> = None;
|
|
||||||
let mut cipher_out: Option<C> = None;
|
|
||||||
|
|
||||||
for action in state.on_connect(client_id).await.unwrap() {
|
|
||||||
match action {
|
|
||||||
OnConnect::Cipher(cin, cout) => {
|
|
||||||
cipher_in = Some(cin);
|
|
||||||
cipher_out = Some(cout);
|
|
||||||
},
|
|
||||||
OnConnect::Packet(pkt) => {
|
|
||||||
send_pkt(&mut socket, &mut NullCipher {}, &pkt).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let rstate = state.clone();
|
|
||||||
let rsocket = socket.clone();
|
|
||||||
let rclients = clients.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
/*
|
|
||||||
rstate;
|
|
||||||
rsocket;
|
|
||||||
client_id;
|
|
||||||
cipher_in.unwrap();
|
|
||||||
rclients;
|
|
||||||
*/
|
|
||||||
//client_tx.send(12).await
|
|
||||||
recv_loop(rstate, rsocket, client_id, cipher_in.unwrap(), rclients).await
|
|
||||||
//recv_loop2(rstate, rsocket, client_id, cipher_in.unwrap()).await
|
|
||||||
});
|
|
||||||
|
|
||||||
//let sstate = state.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
send_loop(socket, client_id, cipher_out.unwrap(), client_rx).await
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
pub async fn listen_interserver<STATE, S, R, C, E>(state: STATE, port: u16)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E>,
|
|
||||||
S: serde::Serialize,
|
|
||||||
R: serde::de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
||||||
let mut id = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (socket, addr) = listener.accept().await.unwrap();
|
|
||||||
info!("new interserver connection: {:?} {:?}", socket, addr);
|
|
||||||
|
|
||||||
id += 1;
|
|
||||||
let server_id = crate::common::interserver::ServerId(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn run_interserver_receiver<STATE, S, R, E>(state: STATE, ip: std::net::Ipv4Addr, port: u16)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E>,
|
|
||||||
S: serde::Serialize,
|
|
||||||
R: serde::de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_interserver_sender<STATE, S, R, E>(state: STATE, to_send: channel::Receiver<S>)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E>,
|
|
||||||
S: serde::Serialize,
|
|
||||||
R: serde::de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
let msg = to_send.recv().await.unwrap();
|
|
||||||
|
|
||||||
let response = state.on_action(msg);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
async fn interserver_recv_loop<STATE, S, R, E>(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send,
|
|
||||||
S: serde::Serialize + Debug + Send,
|
|
||||||
R: serde::de::DeserializeOwned + Debug + Send,
|
|
||||||
E: Debug + Send,
|
|
||||||
{
|
|
||||||
let mut msg_receiver = MessageReceiver::new(socket);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match msg_receiver.recv::<R>().await {
|
|
||||||
Ok(msg) => {
|
|
||||||
info!("[interserver recv {:?}] {:?}", server_id, msg);
|
|
||||||
match state.on_action(server_id, msg).await {
|
|
||||||
Ok(response) => {
|
|
||||||
for resp in response {
|
|
||||||
ships
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&resp.0)
|
|
||||||
.unwrap()
|
|
||||||
.send(resp.1)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("[interserver recv {:?}] error {:?}", server_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
if let MessageReceiverError::Disconnected = err {
|
|
||||||
info!("[interserver recv {:?}] disconnected", server_id);
|
|
||||||
for (_, sender) in ships.read().await.iter() {
|
|
||||||
for pkt in state.on_disconnect(server_id).await {
|
|
||||||
ships
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&pkt.0)
|
|
||||||
.unwrap()
|
|
||||||
.send(pkt.1)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ships
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.remove(&server_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
info!("[interserver recv {:?}] error {:?}", server_id, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn interserver_send_loop<S>(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver<S>)
|
|
||||||
where
|
|
||||||
S: serde::Serialize + std::fmt::Debug,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
let msg = to_send.recv().await.unwrap();
|
|
||||||
let payload = serde_json::to_string(&msg);
|
|
||||||
|
|
||||||
if let Ok(payload) = payload {
|
|
||||||
let len_bytes = u32::to_le_bytes(payload.len() as u32);
|
|
||||||
if let Err(err) = socket.write_all(&len_bytes).await {
|
|
||||||
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Err(err) = socket.write_all(payload.as_bytes()).await {
|
|
||||||
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_interserver_listen<STATE, S, R, E>(mut state: STATE, port: u16)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
|
||||||
S: serde::Serialize + Debug + Send + 'static,
|
|
||||||
R: serde::de::DeserializeOwned + Debug + Send,
|
|
||||||
E: Debug + Send,
|
|
||||||
{
|
|
||||||
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
||||||
let mut id = 0;
|
|
||||||
let ships = Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (socket, addr) = listener.accept().await.unwrap();
|
|
||||||
info!("[interserver listen] new server: {:?} {:?}", socket, addr);
|
|
||||||
|
|
||||||
id += 1;
|
|
||||||
let server_id = crate::common::interserver::ServerId(id);
|
|
||||||
let (client_tx, client_rx) = async_std::channel::unbounded();
|
|
||||||
|
|
||||||
//let sclient_tx = client_tx.clone();
|
|
||||||
/*
|
|
||||||
state.set_sender(server_id, Arc::new(Box::new(move |msg| {
|
|
||||||
let sclient_tx = sclient_tx.clone();
|
|
||||||
Box::new(async move {
|
|
||||||
sclient_tx.send(msg).await;
|
|
||||||
})})));
|
|
||||||
*/
|
|
||||||
state.set_sender(server_id, client_tx.clone());
|
|
||||||
|
|
||||||
ships
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(server_id, client_tx.clone());
|
|
||||||
|
|
||||||
for msg in state.on_connect(server_id).await {
|
|
||||||
if let Some(ship_sender) = ships.read().await.get(&msg.0) {
|
|
||||||
ship_sender.send(msg.1).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let rstate = state.clone();
|
|
||||||
let rsocket = socket.clone();
|
|
||||||
let rships = ships.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
interserver_recv_loop(rstate, server_id, rsocket, rships).await;
|
|
||||||
});
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
interserver_send_loop(server_id, socket, client_rx).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_interserver_connect<STATE, S, R, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16)
|
|
||||||
where
|
|
||||||
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
|
||||||
S: serde::Serialize + Debug + Send + 'static,
|
|
||||||
R: serde::de::DeserializeOwned + Debug + Send,
|
|
||||||
E: Debug + Send,
|
|
||||||
{
|
|
||||||
let mut id = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
info!("[interserver connect] trying to connect to server");
|
|
||||||
let socket = match async_std::net::TcpStream::connect((ip, port)).await {
|
|
||||||
Ok(socket) => socket,
|
|
||||||
Err(err) => {
|
|
||||||
info!("err trying to connect to loginserv {:?}", err);
|
|
||||||
async_std::task::sleep(std::time::Duration::from_secs(10)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
id += 1;
|
|
||||||
let server_id = crate::common::interserver::ServerId(id);
|
|
||||||
info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket);
|
|
||||||
|
|
||||||
let (client_tx, client_rx) = async_std::channel::unbounded();
|
|
||||||
|
|
||||||
state.set_sender(server_id, client_tx.clone());
|
|
||||||
/*
|
|
||||||
let sclient_tx = client_tx.clone();
|
|
||||||
state.set_sender(server_id, Arc::new(Box::new(move |msg| {
|
|
||||||
let sclient_tx = sclient_tx.clone();
|
|
||||||
Box::new(async move {
|
|
||||||
sclient_tx.send(msg).await;
|
|
||||||
})})));
|
|
||||||
*/
|
|
||||||
|
|
||||||
let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
|
|
||||||
|
|
||||||
let rstate = state.clone();
|
|
||||||
let rsocket = socket.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await;
|
|
||||||
});
|
|
||||||
let ssocket = socket.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
interserver_send_loop(server_id, ssocket, client_rx).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut buf = [0u8; 1];
|
|
||||||
loop {
|
|
||||||
let peek = socket.peek(&mut buf).await;
|
|
||||||
match peek {
|
|
||||||
Ok(len) if len == 0 => {
|
|
||||||
break
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user