use std::time::Duration;
use std::pin::Pin;
use futures::future::Future;
use log::{info, warn};
use async_std::sync::{Arc, Mutex};
use async_std::io::prelude::{ReadExt, WriteExt};
use std::collections::HashMap;
use serde::Serialize;
use serde::de::DeserializeOwned;

use crate::common::interserver::{ServerId, InterserverActor};

use crate::login::character::CharacterServerState;
use crate::ship::ship::ShipServerState;
use crate::entity::gateway::entitygateway::EntityGateway;

#[derive(Debug)]
enum MessageReceiverError {
    //InvalidSize,
    InvalidPayload,
    //NetworkError(std::io::Error),
    Disconnected,
}

struct MessageReceiver {
    socket: async_std::net::TcpStream,
}

impl MessageReceiver {
    fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
        MessageReceiver {
            socket: socket,
        }
    }

    async fn recv<R: DeserializeOwned + std::fmt::Debug + Send>(&mut self) -> Result<R, MessageReceiverError> {
        let mut size_buf = [0u8; 4];
        self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
        let size = u32::from_le_bytes(size_buf) as usize;

        let mut payload = vec![0u8; size];
        self.socket.read_exact(&mut payload).await.map_err(|err| MessageReceiverError::Disconnected)?;
        let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;

        let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
        Ok(msg)
    }
}

#[derive(Debug)]
enum InterserverInputAction<S, R> {
    NewConnection(ServerId, async_std::sync::Sender<S>),
    Message(ServerId, R),
    Disconnect(ServerId),
}

async fn interserver_state_loop<A, S, R>(state: Arc<Mutex<A>>, action_receiver: async_std::sync::Receiver<InterserverInputAction<S, R>>)
where
    A: InterserverActor<SendMessage=S, RecvMessage=R, Error=()> + Send + 'static,
    S: Serialize + Send + 'static,
    R: DeserializeOwned + Send + 'static,
{
    async_std::task::spawn(async move {
        let mut ships = HashMap::new();

        loop {
            info!("interserver loop");
            let action = action_receiver.recv().await.unwrap();
            let mut state = state.lock().await;

            match action {
                InterserverInputAction::NewConnection(server_id, ship_action_sender) => {
                    ships.insert(server_id, ship_action_sender);
                    for (server, action) in state.on_connect(server_id).await {
                        if let Some(sender) = ships.get_mut(&server) { 
                            sender.send(action).await;
                        }
                    }
                },
                InterserverInputAction::Message(server_id, message) => {
                    let actions = state.action(server_id, message).await;
                    match actions {
                        Ok(actions) => {
                            for (server, action) in actions{
                                if let Some(sender) = ships.get_mut(&server) {
                                    sender.send(action).await;
                                }
                            }
                        },
                        Err(err) => {
                            warn!("[server {:?} state handler error] {:?}", server_id, err);
                        }
                    }
                },
                InterserverInputAction::Disconnect(server_id) => {
                    let actions = state.on_disconnect(server_id).await;
                    for (server, action) in actions {
                        if let Some(sender) = ships.get_mut(&server) {
                            sender.send(action).await;
                        }
                    }
                    break;
                }
            }
        }
    });
}

async fn login_recv_loop<S, R>(server_id: ServerId,
                               socket: async_std::net::TcpStream,
                               state_loop_sender: async_std::sync::Sender<InterserverInputAction<S, R>>,
                               output_loop_sender: async_std::sync::Sender<S>)
where
    S: Serialize + std::fmt::Debug + Send + 'static,
    R: DeserializeOwned + std::fmt::Debug + Send + 'static,
{
    async_std::task::spawn(async move {
        state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await;
        let mut msg_receiver = MessageReceiver::new(socket);

        loop {
            info!("login recv loop");
            match msg_receiver.recv().await {
                Ok(msg) => {
                    info!("[login recv loop msg] {:?}", msg);
                    state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await
                },
                Err(err) => {
                    if let MessageReceiverError::Disconnected = err {
                        info!("[login recv loop disconnect] {:?}", server_id);
                        state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await;
                        break;
                    }
                    info!("[login recv loop err] {:?}", err);
                }
            }
        }
    });
}

async fn interserver_send_loop<S>(server_id: ServerId,
                                  mut socket: async_std::net::TcpStream,
                                  output_loop_receiver: async_std::sync::Receiver<S>)
where
    S: Serialize + std::fmt::Debug + Send + 'static,
{
    async_std::task::spawn(async move {
        loop {
            info!("login send loop");
            match output_loop_receiver.recv().await {
                Ok(msg) => {
                    let payload = serde_json::to_string(&msg);
                    if let Ok(payload) = payload {
                        let len_bytes = u32::to_le_bytes(payload.len() as u32);

                        match socket.write_all(&len_bytes).await {
                            Ok(_) => {},
                            Err(err) => warn!("send failed: {:?}", err),
                        }
                        match socket.write_all(&payload.as_bytes()).await {
                            Ok(_) => {},
                            Err(err) => warn!("send failed: {:?}", err),
                        }
                    }
                },
                Err(err) => {
                    warn!("error in send_loop: {:?}, {:?}", server_id, err)
                }
            }
        }
    });
}



pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<CharacterServerState<EG>>>, port: u16) ->  Pin<Box<dyn Future<Output = ()>>> {
    Box::pin(async_std::task::spawn(async move {
        let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
        let mut id = 0;

        let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
        interserver_state_loop(state, server_state_receiver).await;

        loop {
            let (socket, addr) = listener.accept().await.unwrap();
            info!("new ship server: {:?} {:?}", socket, addr);

            id += 1;
            let server_id = crate::common::interserver::ServerId(id);

            let (client_sender, client_receiver) = async_std::sync::channel(64);

            login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
            interserver_send_loop(server_id, socket.clone(), client_receiver).await;
        }
    }))
}

pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) ->  Pin<Box<dyn Future<Output = ()>>> {
    Box::pin(async_std::task::spawn(async move {
        let mut id = 0;

        let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
        interserver_state_loop(state, server_state_receiver).await;

        loop {
            let socket = async_std::net::TcpStream::connect((ip, port)).await.unwrap();
            id += 1;
            let server_id = crate::common::interserver::ServerId(id);
            let (client_sender, client_receiver) = async_std::sync::channel(64);

            info!("ship connected to login: {:?}", socket);
            login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
            interserver_send_loop(server_id, socket.clone(), client_receiver).await;
            loop {
                if let Err(_) = socket.peer_addr() {
                    info!("ship connected to login: {:?}", socket);
                    break;
                }
                async_std::task::sleep(Duration::from_secs(10)).await;
            }
        }
    }))
}