From 3c62587e7e63860a3702b386e7eec81325f1055a Mon Sep 17 00:00:00 2001 From: jake Date: Tue, 19 Apr 2022 23:20:01 -0600 Subject: [PATCH] add transactions! --- src/entity/gateway/entitygateway.rs | 32 ++++- src/entity/gateway/inmemory.rs | 113 ++++++++++++++++- src/entity/gateway/mod.rs | 2 +- src/entity/gateway/postgres/postgres.rs | 162 +++++++++++++++++++++++- 4 files changed, 304 insertions(+), 5 deletions(-) diff --git a/src/entity/gateway/entitygateway.rs b/src/entity/gateway/entitygateway.rs index dbdc401..4ec2c74 100644 --- a/src/entity/gateway/entitygateway.rs +++ b/src/entity/gateway/entitygateway.rs @@ -1,4 +1,6 @@ +use std::convert::From; use thiserror::Error; +use futures::Future; use crate::entity::account::*; use crate::entity::character::*; @@ -15,7 +17,35 @@ pub enum GatewayError { } #[async_trait::async_trait] -pub trait EntityGateway: Send + Sync + Clone { +pub trait EntityGatewayTransaction: Send + Sync { + fn gateway<'a>(&'a mut self) -> &'a mut dyn EntityGateway { + unimplemented!() + } + + async fn commit(self: Box) -> Result<(), GatewayError> { + unimplemented!() + } +} + + +#[async_trait::async_trait] +pub trait EntityGateway: Send + Sync { + async fn transaction(&'static mut self) -> Result, GatewayError> + { + unimplemented!(); + } + + async fn with_transaction(&'static mut self, _func: F) -> Result + where + Fut: Future, R), E>> + Send, + F: FnOnce(Box) -> Fut + Send, + R: Send, + E: From, + Self: Sized + { + unimplemented!(); + } + async fn create_user(&mut self, _user: NewUserAccountEntity) -> Result { unimplemented!() } diff --git a/src/entity/gateway/inmemory.rs b/src/entity/gateway/inmemory.rs index 4c38815..aa9762d 100644 --- a/src/entity/gateway/inmemory.rs +++ b/src/entity/gateway/inmemory.rs @@ -1,13 +1,43 @@ use std::collections::BTreeMap; use std::convert::TryInto; +use futures::Future; use crate::entity::account::*; use crate::entity::character::*; -use crate::entity::gateway::{EntityGateway, GatewayError}; +use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError}; use crate::entity::item::*; use std::sync::{Arc, Mutex}; + +pub struct InMemoryGatewayTransaction { + working_gateway: InMemoryGateway, + original_gateway: &'static mut InMemoryGateway, +} + +#[async_trait::async_trait] +impl EntityGatewayTransaction for InMemoryGatewayTransaction { + fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway { + &mut self.working_gateway + } + + async fn commit(self: Box) -> Result<(), GatewayError> { + self.original_gateway.users = self.working_gateway.users.clone(); + self.original_gateway.user_settings = self.working_gateway.user_settings.clone(); + self.original_gateway.characters = self.working_gateway.characters.clone(); + self.original_gateway.character_meseta = self.working_gateway.character_meseta.clone(); + self.original_gateway.bank_meseta = self.working_gateway.bank_meseta.clone(); + self.original_gateway.items = self.working_gateway.items.clone(); + self.original_gateway.inventories = self.working_gateway.inventories.clone(); + self.original_gateway.banks = self.working_gateway.banks.clone(); + self.original_gateway.equips = self.working_gateway.equips.clone(); + self.original_gateway.mag_modifiers = self.working_gateway.mag_modifiers.clone(); + self.original_gateway.weapon_modifiers = self.working_gateway.weapon_modifiers.clone(); + + Ok(()) + } +} + #[derive(Clone)] pub struct InMemoryGateway { users: Arc>>, @@ -99,6 +129,87 @@ impl InMemoryGateway { #[async_trait::async_trait] impl EntityGateway for InMemoryGateway { + async fn transaction(&'static mut self) -> Result, GatewayError> + { + let working_gateway = { + let users = self.users.lock().unwrap().clone(); + let user_settings = self.user_settings.lock().unwrap().clone(); + let characters = self.characters.lock().unwrap().clone(); + let character_meseta = self.character_meseta.lock().unwrap().clone(); + let bank_meseta = self.bank_meseta.lock().unwrap().clone(); + let items = self.items.lock().unwrap().clone(); + let inventories = self.inventories.lock().unwrap().clone(); + let banks = self.banks.lock().unwrap().clone(); + let equips = self.equips.lock().unwrap().clone(); + let mag_modifiers = self.mag_modifiers.lock().unwrap().clone(); + let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone(); + + InMemoryGateway { + users: Arc::new(Mutex::new(users)), + user_settings: Arc::new(Mutex::new(user_settings)), + characters: Arc::new(Mutex::new(characters)), + character_meseta: Arc::new(Mutex::new(character_meseta)), + bank_meseta: Arc::new(Mutex::new(bank_meseta)), + items: Arc::new(Mutex::new(items)), + inventories: Arc::new(Mutex::new(inventories)), + banks: Arc::new(Mutex::new(banks)), + equips: Arc::new(Mutex::new(equips)), + mag_modifiers: Arc::new(Mutex::new(mag_modifiers)), + weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)), + } + }; + + Ok(Box::new(InMemoryGatewayTransaction { + working_gateway, + original_gateway: self, + })) + } + + + async fn with_transaction(&'static mut self, func: F) -> Result + where + Fut: Future, R), E>> + Send, + F: FnOnce(Box) -> Fut + Send, + R: Send, + E: From, + { + let users = self.users.lock().unwrap().clone(); + let user_settings = self.user_settings.lock().unwrap().clone(); + let characters = self.characters.lock().unwrap().clone(); + let character_meseta = self.character_meseta.lock().unwrap().clone(); + let bank_meseta = self.bank_meseta.lock().unwrap().clone(); + let items = self.items.lock().unwrap().clone(); + let inventories = self.inventories.lock().unwrap().clone(); + let banks = self.banks.lock().unwrap().clone(); + let equips = self.equips.lock().unwrap().clone(); + let mag_modifiers = self.mag_modifiers.lock().unwrap().clone(); + let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone(); + + let working_gateway = InMemoryGateway { + users: Arc::new(Mutex::new(users)), + user_settings: Arc::new(Mutex::new(user_settings)), + characters: Arc::new(Mutex::new(characters)), + character_meseta: Arc::new(Mutex::new(character_meseta)), + bank_meseta: Arc::new(Mutex::new(bank_meseta)), + items: Arc::new(Mutex::new(items)), + inventories: Arc::new(Mutex::new(inventories)), + banks: Arc::new(Mutex::new(banks)), + equips: Arc::new(Mutex::new(equips)), + mag_modifiers: Arc::new(Mutex::new(mag_modifiers)), + weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)), + }; + + let transaction = Box::new(InMemoryGatewayTransaction { + working_gateway, + original_gateway: self, + }); + + let (mut transaction, result) = func(transaction).await?; + + transaction.commit().await?; + Ok(result) + } + async fn create_user(&mut self, user: NewUserAccountEntity) -> Result { let mut users = self.users.lock().unwrap(); let id = users diff --git a/src/entity/gateway/mod.rs b/src/entity/gateway/mod.rs index d819ef5..4ce029d 100644 --- a/src/entity/gateway/mod.rs +++ b/src/entity/gateway/mod.rs @@ -2,6 +2,6 @@ pub mod entitygateway; pub mod inmemory; pub mod postgres; -pub use entitygateway::{EntityGateway, GatewayError}; +pub use entitygateway::{EntityGateway, EntityGatewayTransaction, GatewayError}; pub use inmemory::InMemoryGateway; pub use self::postgres::PostgresGateway; diff --git a/src/entity/gateway/postgres/postgres.rs b/src/entity/gateway/postgres/postgres.rs index 10961a5..b87dfad 100644 --- a/src/entity/gateway/postgres/postgres.rs +++ b/src/entity/gateway/postgres/postgres.rs @@ -1,14 +1,17 @@ +use async_std::sync::{Arc, Mutex}; use std::convert::{From, TryFrom, Into}; -use futures::TryStreamExt; +use futures::{Future, TryStreamExt}; use async_std::stream::StreamExt; use libpso::character::guildcard; use crate::entity::account::*; use crate::entity::character::*; -use crate::entity::gateway::{EntityGateway, GatewayError}; +use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError}; use crate::entity::item::*; use super::models::*; use sqlx::postgres::PgPoolOptions; +use sqlx::Connection; + mod embedded { use refinery::embed_migrations; @@ -16,6 +19,24 @@ mod embedded { } +pub struct PostgresTransaction<'t> { + pgtransaction: sqlx::Transaction<'t, sqlx::Postgres>, +} + + +#[async_trait::async_trait] +impl<'t> EntityGatewayTransaction for PostgresTransaction<'t> { + fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway { + self + } + + async fn commit(self: Box) -> Result<(), GatewayError> { + self.pgtransaction.commit().await?; + Ok(()) + } +} + + #[derive(Clone)] pub struct PostgresGateway { pool: sqlx::Pool, @@ -539,6 +560,28 @@ async fn get_bank_meseta(conn: &mut sqlx::PgConnection, char_id: &CharacterEntit #[async_trait::async_trait] impl EntityGateway for PostgresGateway { + async fn transaction(&'static mut self) -> Result, GatewayError> + { + Ok(Box::new(PostgresTransaction { + pgtransaction: self.pool.begin().await?, + })) + } + + async fn with_transaction(&'static mut self, func: F) -> Result + where + Fut: Future, R), E>> + Send, + F: FnOnce(Box) -> Fut + Send, + R: Send, + E: From, + { + let mut transaction = Box::new(PostgresTransaction { + pgtransaction: self.pool.begin().await.map_err(|_| ()).unwrap() + }); + let (mut transaction, result) = func(transaction).await.map_err(|_| ()).unwrap(); + transaction.commit().await.map_err(|_| ()).unwrap(); + Ok(result) + } + async fn create_user(&mut self, user: NewUserAccountEntity) -> Result { create_user(&mut *self.pool.acquire().await?, user).await } @@ -653,3 +696,118 @@ impl EntityGateway for PostgresGateway { } +#[async_trait::async_trait] +impl<'c> EntityGateway for PostgresTransaction<'c> { + async fn create_user(&mut self, user: NewUserAccountEntity) -> Result { + create_user(&mut *self.pgtransaction, user).await + } + + async fn get_user_by_id(&mut self, id: UserAccountId) -> Result { + get_user_by_id(&mut *self.pgtransaction, id).await + } + + async fn get_user_by_name(&mut self, username: String) -> Result { + get_user_by_name(&mut *self.pgtransaction, username).await + } + + async fn save_user(&mut self, user: &UserAccountEntity) -> Result<(), GatewayError> { + save_user(&mut *self.pgtransaction, user).await + } + + async fn create_user_settings(&mut self, settings: NewUserSettingsEntity) -> Result { + create_user_settings(&mut *self.pgtransaction, settings).await + } + + async fn get_user_settings_by_user(&mut self, user: &UserAccountEntity) -> Result { + get_user_settings_by_user(&mut *self.pgtransaction, user).await + } + + async fn save_user_settings(&mut self, settings: &UserSettingsEntity) -> Result<(), GatewayError> { + save_user_settings(&mut *self.pgtransaction, settings).await + } + + async fn create_character(&mut self, char: NewCharacterEntity) -> Result { + create_character(&mut *self.pgtransaction, char).await + } + + async fn get_characters_by_user(&mut self, user: &UserAccountEntity) -> Result<[Option; 4], GatewayError> { + get_characters_by_user(&mut *self.pgtransaction, user).await + } + + async fn save_character(&mut self, char: &CharacterEntity) -> Result<(), GatewayError> { + save_character(&mut *self.pgtransaction, char).await + } + + async fn get_guild_card_data_by_user(&mut self, user: &UserAccountEntity) -> Result { + Ok(GuildCardDataEntity { + id: GuildCardDataId(0), + user_id: user.id, + guildcard: guildcard::GuildCardData::default(), + }) + } + + async fn create_item(&mut self, item: NewItemEntity) -> Result { + create_item(&mut *self.pgtransaction, item).await + } + + async fn add_item_note(&mut self, item_id: &ItemEntityId, item_note: ItemNote) -> Result<(), GatewayError> { + add_item_note(&mut *self.pgtransaction, item_id, item_note).await + } + + async fn feed_mag(&mut self, mag_item_id: &ItemEntityId, tool_item_id: &ItemEntityId) -> Result<(), GatewayError> { + feed_mag(&mut *self.pgtransaction, mag_item_id, tool_item_id).await + } + + async fn change_mag_owner(&mut self, mag_item_id: &ItemEntityId, character: &CharacterEntity) -> Result<(), GatewayError> { + change_mag_owner(&mut *self.pgtransaction, mag_item_id, character).await + } + + async fn use_mag_cell(&mut self, mag_item_id: &ItemEntityId, mag_cell_id: &ItemEntityId) -> Result<(), GatewayError> { + use_mag_cell(&mut *self.pgtransaction, mag_item_id, mag_cell_id).await + } + + async fn add_weapon_modifier(&mut self, item_id: &ItemEntityId, modifier: weapon::WeaponModifier) -> Result<(), GatewayError> { + add_weapon_modifier(&mut *self.pgtransaction, item_id, modifier).await + } + + async fn get_character_inventory(&mut self, char_id: &CharacterEntityId) -> Result { + get_character_inventory(&mut *self.pgtransaction, char_id).await + } + + async fn get_character_bank(&mut self, char_id: &CharacterEntityId, bank_name: BankName) -> Result { + get_character_bank(&mut *self.pgtransaction, char_id, bank_name).await + } + + async fn set_character_inventory(&mut self, char_id: &CharacterEntityId, inventory: &InventoryEntity) -> Result<(), GatewayError> { + set_character_inventory(&mut *self.pgtransaction, char_id, inventory).await + } + + async fn set_character_bank(&mut self, char_id: &CharacterEntityId, bank: &BankEntity, bank_name: BankName) -> Result<(), GatewayError> { + set_character_bank(&mut *self.pgtransaction, char_id, bank, bank_name).await + } + + async fn get_character_equips(&mut self, char_id: &CharacterEntityId) -> Result { + get_character_equips(&mut *self.pgtransaction, char_id).await + } + + async fn set_character_equips(&mut self, char_id: &CharacterEntityId, equips: &EquippedEntity) -> Result<(), GatewayError> { + set_character_equips(&mut *self.pgtransaction, char_id, equips).await + } + + async fn set_character_meseta(&mut self, char_id: &CharacterEntityId, meseta: Meseta) -> Result<(), GatewayError> { + set_character_meseta(&mut *self.pgtransaction, char_id, meseta).await + } + + async fn get_character_meseta(&mut self, char_id: &CharacterEntityId) -> Result { + get_character_meseta(&mut *self.pgtransaction, char_id).await + } + + async fn set_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName, meseta: Meseta) -> Result<(), GatewayError> { + set_bank_meseta(&mut *self.pgtransaction, char_id, bank, meseta).await + } + + async fn get_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName) -> Result { + get_bank_meseta(&mut *self.pgtransaction, char_id, bank).await + } +} +