diff --git a/titles/ongeki/read.py b/titles/ongeki/read.py index 594689e..4797dcb 100644 --- a/titles/ongeki/read.py +++ b/titles/ongeki/read.py @@ -219,16 +219,16 @@ class OngekiReader(BaseReader): with open(datacfg_file, encoding="utf-8") as f: troot = ET.fromstring(f.read()) - if troot.find("DataConfig/version") is None: + if troot.find("version") is None: self.logger.warning(f"{directory}/DataConfig.xml contains no Version section, opt info will not be read") return None - ver_maj = troot.find("DataConfig/version/major") - ver_min = troot.find("DataConfig/version/minor") - ver_rel = troot.find("DataConfig/version/release") - cm_maj = troot.find("DataConfig/cardMakerVersion/major") - cm_min = troot.find("DataConfig/cardMakerVersion/minor") - cm_rel = troot.find("DataConfig/cardMakerVersion/release") + ver_maj = troot.find("version/major") + ver_min = troot.find("version/minor") + ver_rel = troot.find("version/release") + cm_maj = troot.find("cardMakerVersion/major") + cm_min = troot.find("cardMakerVersion/minor") + cm_rel = troot.find("cardMakerVersion/release") if ver_maj is None: # Probably not worth checking that the other sections exist self.logger.warning(f"{datacfg_file} contains no major item in the Version section, opt info will not be read") @@ -246,7 +246,7 @@ class OngekiReader(BaseReader): opt_id = await self.data.static.get_opt_by_version_folder(self.version, opt_folder) if not opt_id: - opt_id = await self.data.static.put_opt(self.version, opt_folder, int(ver_rel.text), int(cm_rel.text) if cm_rel else None) + opt_id = await self.data.static.put_opt(self.version, opt_folder, int(ver_rel.text), int(cm_rel.text) if cm_rel is not None else None) if not opt_id: self.logger.error(f"Failed to put opt folder info for {opt_folder}") return None @@ -254,6 +254,6 @@ class OngekiReader(BaseReader): opt_id = opt_id['id'] self.logger.info( - f"Opt folder {opt_folder} (Database ID {opt_id}) contains v{ver_maj.text}.{ver_min.text}.{ver_rel.text} (cm v{cm_maj.text if cm_maj else 'None'}.{cm_min.text if cm_min else 'None'}.{cm_rel.text if cm_rel else 'None'})" + f"Opt folder {opt_folder} (Database ID {opt_id}) contains v{ver_maj.text}.{ver_min.text}.{ver_rel.text} (cm v{cm_maj.text if cm_maj is not None else 'None'}.{cm_min.text if cm_min is not None else 'None'}.{cm_rel.text if cm_rel is not None else 'None'})" ) return opt_id diff --git a/titles/ongeki/schema/static.py b/titles/ongeki/schema/static.py index a784f67..bf4af07 100644 --- a/titles/ongeki/schema/static.py +++ b/titles/ongeki/schema/static.py @@ -6,6 +6,7 @@ from sqlalchemy.sql import func, select from sqlalchemy.engine import Row from sqlalchemy.dialects.mysql import insert from sqlalchemy.sql.functions import coalesce +from datetime import datetime from core.data.schema import BaseData, metadata from core.data.schema.arcade import machine @@ -57,7 +58,6 @@ events = Table( mysql_charset="utf8mb4", ) - music = Table( "ongeki_static_music", metadata, @@ -536,3 +536,121 @@ class OngekiStaticData(BaseData): if result is None: return None return result.fetchall() + + async def put_opt(self, version: int, folder: str, sequence: int, cm_seq: int = None) -> Optional[int]: + sql = insert(opts).values(version=version, name=folder, sequence=sequence, cmReleaseVer=cm_seq) + + conflict = sql.on_duplicate_key_update(sequence=sequence, whenRead=datetime.now()) + + result = await self.execute(conflict) + if result is None: + self.logger.warning(f"Failed to insert opt! version {version} folder {folder} sequence {sequence}") + return None + return result.lastrowid + + async def get_opt_by_version_folder(self, version: int, folder: str) -> Optional[Row]: + result = await self.execute(opts.select(and_( + opts.c.version == version, + opts.c.name == folder, + ))) + + if result is None: + return None + return result.fetchone() + + async def get_opt_by_version_sequence(self, version: int, sequence: str) -> Optional[Row]: + result = await self.execute(opts.select(and_( + opts.c.version == version, + opts.c.sequence == sequence, + ))) + + if result is None: + return None + return result.fetchone() + + async def get_opts_by_version(self, version: int) -> Optional[List[Row]]: + result = await self.execute(opts.select(opts.c.version == version)) + + if result is None: + return None + return result.fetchall() + + async def get_opts_enabled_by_version(self, version: int) -> Optional[List[Row]]: + result = await self.execute(opts.select(and_( + opts.c.version == version, + opts.c.isEnable == True, + ))) + + if result is None: + return None + return result.fetchall() + + async def get_latest_enabled_opt_by_version(self, version: int) -> Optional[Row]: + result = await self.execute( + opts.select(and_( + opts.c.version == version, + opts.c.isEnable == True, + )).order_by(opts.c.sequence.desc()) + ) + + if result is None: + return None + return result.fetchone() + + async def get_opts(self) -> Optional[List[Row]]: + result = await self.execute(opts.select()) + + if result is None: + return None + return result.fetchall() + + async def get_opts(self) -> Optional[List[Row]]: + result = await self.execute(opts.select()) + + if result is None: + return None + return result.fetchall() + + async def set_opt_enabled(self, opt_id: int, enabled: bool) -> bool: + result = await self.execute(opts.update(opts.c.id == opt_id).values(isEnable=enabled)) + + if result is None: + self.logger.error(f"Failed to set opt enabled status to {enabled} for opt {opt_id}") + return False + return True + + async def cm_put_opt(self, version: int, folder: str, sequence: int, geki_ver: int, geki_seq: int, mai_ver: int, mai_seq: int) -> Optional[int]: + sql = insert(cm_opts).values( + version=version, + name=folder, + sequence=sequence, + gekiVersion=geki_ver, + gekiReleaseVer=geki_seq, + maiSequence=mai_ver, + maiReleaseVer=mai_seq, + ) + + conflict = sql.on_duplicate_key_update( + sequence=sequence, + gekiVersion=geki_ver, + gekiReleaseVer=geki_seq, + maiSequence=mai_ver, + maiReleaseVer=mai_seq, + whenRead=datetime.now() + ) + + result = await self.execute(conflict) + if result is None: + self.logger.warning(f"Failed to insert opt! version {version} folder {folder} sequence {sequence}") + return None + return result.lastrowid + + async def cm_get_opt_by_version_folder(self, version: int, folder: str) -> Optional[Row]: + result = await self.execute(cm_opts.select(and_( + opts.c.version == version, + opts.c.name == folder, + ))) + + if result is None: + return None + return result.fetchone()