from PIL import Image import math import os import queue import sys import time import yaml import serial import win32api import win32con import win32gui import threading import copy # 编辑好的图片路径 IMAGE_PATH = "./image/image_monitor.png" # 串口号 COM_PORT = "COM33" # 比特率 COM_BAUDRATE = 9600 # 检测区域的像素值范围 AREA_SCOPE = 50 # 检测区域圆上点的数量 AREA_POINT_NUM = 8 # 触摸屏幕大小 (单位:像素) MONITOR_SIZE = [1600, 2560] # 是否开启屏幕反转 REVERSE_MONITOR = False # touch_thread 是否启用sleep, 默认开启, 如果程序 CPU 占用较高则开启, 如果滑动时延迟极大请关闭 TOUCH_THREAD_SLEEP_MODE = False # 每次 sleep 的延迟, 单位: 微秒, 默认 10 微秒 TOUCH_THREAD_SLEEP_DELAY = 10 # 窗口图标路径 icon_path = './image/favicon.ico' exp_list = [ ["A1", "A2", "A3", "A4", "A5", ], ["A6", "A7", "A8", "B1", "B2", ], ["B3", "B4", "B5", "B6", "B7", ], ["B8", "C1", "C2", "D1", "D2", ], ["D3", "D4", "D5", "D6", "D7", ], ["D8", "E1", "E2", "E3", "E4", ], ["E5", "E6", "E7", "E8", ], ] exp_image_dict = {'41-65-93': 'A1', '87-152-13': 'A2', '213-109-81': 'A3', '23-222-55': 'A4', '69-203-71': 'A5', '147-253-55': 'A6', '77-19-35': 'A7', '159-109-79': 'A8', '87-217-111': 'B1', '149-95-154': 'B2', '97-233-9': 'B3', '159-27-222': 'B4', '152-173-186': 'B5', '192-185-149': 'B6', '158-45-23': 'B7', '197-158-219': 'B8', '127-144-79': 'C1', '242-41-155': 'C2', '69-67-213': 'D1', '105-25-130': 'D2', '17-39-170': 'D3', '97-103-203': 'D4', '113-25-77': 'D5', '21-21-140': 'D6', '155-179-166': 'D7', '55-181-134': 'D8', '61-33-27': 'E1', '51-91-95': 'E2', '143-227-63': 'E3', '216-67-226': 'E4', '202-181-245': 'E5', '99-11-183': 'E6', '75-119-224': 'E7', '182-19-85': 'E8'} class SerialManager: p1Serial = serial.Serial(COM_PORT, COM_BAUDRATE) settingPacket = bytearray([40, 0, 0, 0, 0, 41]) startUp = False recvData = "" def __init__(self): self.touchQueue = queue.Queue() self.data_lock = threading.Lock() self.touchThread = threading.Thread(target=self.touch_thread) self.writeThread = threading.Thread(target=self.write_thread) self.now_touch_data = b'' self.now_touch_keys = [] self.ping_touch_thread() def start(self): print(f"开始监听 {COM_PORT} 串口...") self.touchThread.start() self.writeThread.start() def ping_touch_thread(self): self.touchQueue.put([self.build_touch_package(exp_list), []]) def touch_thread(self): while True: start_time = time.perf_counter() if self.p1Serial.is_open: self.read_data(self.p1Serial) if not self.touchQueue.empty(): # print("touchQueue 不为空,开始执行") s_temp = self.touchQueue.get() self.update_touch(s_temp) # 延迟防止消耗 CPU 时间过长 if TOUCH_THREAD_SLEEP_MODE: microsecond_sleep(TOUCH_THREAD_SLEEP_DELAY) # print("单次执行时间:", (time.perf_counter() - start_time) * 1e3, "毫秒") def write_thread(self): while True: # 延迟匹配波特率 time.sleep(0.0075) # 9600 # time.sleep(0.002) # 115200 if not self.startUp: # print("当前没有启动") continue # print(self.now_touch_data) with self.data_lock: self.send_touch(self.p1Serial, self.now_touch_data) def destroy(self): self.touchThread.join() self.p1Serial.close() def read_data(self, ser): if ser.in_waiting == 6: self.recvData = # print(self.recvData) self.touch_setup(ser, self.recvData) def touch_setup(self, ser, data): byte_data = ord(data[3]) if byte_data in [76, 69]: self.startUp = False elif byte_data in [114, 107]: for i in range(1, 5): self.settingPacket[i] = ord(data[i]) ser.write(self.settingPacket) elif byte_data == 65: self.startUp = True print("已连接到游戏") def send_touch(self, ser, data): ser.write(data) # def build_touch_package(self, sl): # sum_list = [0, 0, 0, 0, 0, 0, 0] # for i in range(len(sl)): # for j in range(len(sl[i])): # if sl[i][j] == 1: # sum_list[i] += (2 ** j) # s = "28 " # for i in sum_list: # s += hex(i)[2:].zfill(2).upper() + " " # s += "29" # # print(s) # return bytes.fromhex(s) def build_touch_package(self, sl): sum_list = [sum(2 ** j for j, val in enumerate(row) if val == 1) for row in sl] hex_list = [hex(i)[2:].zfill(2).upper() for i in sum_list] s = "28 " + " ".join(hex_list) + " 29" # print(s) return bytes.fromhex(s) def update_touch(self, s_temp): # if not self.startUp: # print("当前没有启动") # return with self.data_lock: self.now_touch_data = s_temp[0] self.send_touch(self.p1Serial, s_temp[0]) self.now_touch_keys = s_temp[1] print("Touch Keys:", s_temp[1]) # else: # self.send_touch(self.p2Serial, s_temp[0]) def change_touch(self, sl, touch_keys): self.touchQueue.put([self.build_touch_package(sl), touch_keys]) def restart_script(): python = sys.executable script = os.path.abspath(sys.argv[0]) os.execv(python, [python, script]) def microsecond_sleep(sleep_time): # time.sleep(sleep_time / 1000000) end_time = time.perf_counter() + (sleep_time - 1.0) / 1e6 # 1.0是时间补偿,需要根据自己PC的性能去实测 while time.perf_counter() < end_time: pass def get_colors_in_area(x, y): colors = set() # 使用集合来存储颜色值,以避免重复 num_points = AREA_POINT_NUM # 要获取的点的数量 angle_increment = 360.0 / num_points # 角度增量 cos_values = [math.cos(math.radians(i * angle_increment)) for i in range(num_points)] sin_values = [math.sin(math.radians(i * angle_increment)) for i in range(num_points)] # 处理中心点 if 0 <= x < exp_image_width and 0 <= y < exp_image_height: colors.add(get_color_name(exp_image.getpixel((x, y)))) # 处理圆上的点 for i in range(num_points): dx = int(AREA_SCOPE * cos_values[i]) dy = int(AREA_SCOPE * sin_values[i]) px = x + dx py = y + dy if 0 <= px < exp_image_width and 0 <= py < exp_image_height: colors.add(get_color_name(exp_image.getpixel((px, py)))) return list(colors) def get_color_name(pixel): return str(pixel[0]) + "-" + str(pixel[1]) + "-" + str(pixel[2]) # def convert(touch_data): # copy_exp_list = copy.deepcopy(exp_list) # touch_data_values = list(touch_data.values()) # touch_keys = set() # touched = 0 # for i in touch_data_values: # touched += 1 # x = i["x"] # y = i["y"] # for rgb_str in get_colors_in_area(x, y): # if not rgb_str in exp_image_dict: # continue # touch_keys.add(exp_image_dict[rgb_str]) # # print("Touched:", touched) # # print("Touch Keys:", touch_keys) # touch_keys_list = list(touch_keys) # for i in range(len(copy_exp_list)): # for j in range(len(copy_exp_list[i])): # if copy_exp_list[i][j] in touch_keys_list: # copy_exp_list[i][j] = 1 # else: # copy_exp_list[i][j] = 0 # # print(copy_exp_list) # serial_manager.change_touch(copy_exp_list, touch_keys_list) def convert(touch_data): copy_exp_list = copy.deepcopy(exp_list) touch_keys = set() for i in touch_data.values(): colors = set(get_colors_in_area(i["x"], i["y"])) touch_keys.update(exp_image_dict[rgb_str] for rgb_str in colors if rgb_str in exp_image_dict) touch_keys_list = list(touch_keys) copy_exp_list = [[1 if item in touch_keys_list else 0 for item in sublist] for sublist in copy_exp_list] serial_manager.change_touch(copy_exp_list, touch_keys_list) def getevent(): # 存储多点触控数据的列表 touch_data = {} pygame.init() icon = pygame.image.load(icon_path) pygame.display.set_icon(icon) screen_width, screen_height = MONITOR_SIZE screen = pygame.display.set_mode((screen_width, screen_height)) pygame.display.set_caption("maimai-windows-touch-panel") fuchsia = (128, 128, 128) hwnd = pygame.display.get_wm_info()["window"] win32gui.SetWindowLong(hwnd, win32con.GWL_EXSTYLE, win32gui.GetWindowLong(hwnd, win32con.GWL_EXSTYLE) | win32con.WS_EX_LAYERED) win32gui.SetLayeredWindowAttributes(hwnd, win32api.RGB(*fuchsia), 1, win32con.LWA_ALPHA) screen.fill(fuchsia) # 使用透明背景 clock = pygame.time.Clock() while True: # start_time = time.perf_counter() clock.tick(120) for event in pygame.event.get(): if event.type == pygame.QUIT: break elif event.type == pygame.FINGERDOWN or event.type == pygame.FINGERUP or event.type == pygame.FINGERMOTION: touch_id = event.finger_id touch_x, touch_y = event.x * screen_width, event.y * screen_height if event.type == pygame.FINGERDOWN or event.type == pygame.FINGERMOTION: touch_data[str(touch_id)] = {} if not REVERSE_MONITOR: touch_data[str(touch_id)]["x"] = touch_x touch_data[str(touch_id)]["y"] = touch_y else: touch_data[str(touch_id)]["x"] = MONITOR_SIZE[0] - touch_x touch_data[str(touch_id)]["y"] = MONITOR_SIZE[1] - touch_y elif event.type == pygame.FINGERUP: touch_data.pop(str(touch_id)) convert(touch_data) # print("单次执行时间:", (time.perf_counter() - start_time) * 1e3, "毫秒") exp_image = exp_image_width, exp_image_height = exp_image.size if __name__ == "__main__": os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" import pygame yaml_file_path = 'config.yaml' if len(sys.argv) > 1: yaml_file_path = sys.argv[1] if os.path.isfile(yaml_file_path): print("使用配置文件:", yaml_file_path) with open(yaml_file_path, 'r', encoding='utf-8') as file: c = yaml.safe_load(file) IMAGE_PATH = c["IMAGE_PATH"] COM_PORT = c["COM_PORT"] COM_BAUDRATE = c["COM_BAUDRATE"] AREA_SCOPE = c["AREA_SCOPE"] AREA_POINT_NUM = c["AREA_POINT_NUM"] MONITOR_SIZE = c["MONITOR_SIZE"] REVERSE_MONITOR = c["REVERSE_MONITOR"] TOUCH_THREAD_SLEEP_MODE = c["TOUCH_THREAD_SLEEP_MODE"] TOUCH_THREAD_SLEEP_DELAY = c["TOUCH_THREAD_SLEEP_DELAY"] exp_image_dict = c["exp_image_dict"] else: print("未找到配置文件, 使用默认配置") print(('已' if REVERSE_MONITOR else '未') + "开启屏幕反转") serial_manager = SerialManager() serial_manager.start() threading.Thread(target=getevent).start() while True: input_str = input().strip() if len(input_str) == 0: continue if input_str == 'start': serial_manager.startUp = True print("已连接到游戏") elif input_str == 'reverse': REVERSE_MONITOR = not REVERSE_MONITOR print("已" + ('开启' if REVERSE_MONITOR else '关闭') + "屏幕反转") elif input_str == 'restart': restart_script() else: print("未知的输入")