#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 远程显示查看器 - 带 GUI 的实时显存查看与按键模拟 用法: python remo_disp_viewer.py <装置IP> """ import socket import struct import threading import time import sys import tkinter as tk from tkinter import ttk, messagebox from typing import Optional, Tuple # 协议常量 TAG_CLIENT = 0xAA TAG_DEVICE = 0xBB PORT = 7003 CMD_KEEPLIVE = 0 CMD_INIT = 1 CMD_KEY = 2 CMD_LCDMEM = 3 KEY_U, KEY_D, KEY_L, KEY_R = 0x02, 0x40, 0x10, 0x08 KEY_ENT, KEY_ESC, KEY_F1, KEY_F2 = 0x20, 0x01, 0x04, 0x80 def calc_crc(data: bytes) -> int: crc = 0 for b in data: crc ^= b return crc & 0xFF def build_frame(cmd: int, data: bytes) -> bytes: length = len(data) header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) return header + data + bytes([calc_crc(data)]) def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: if len(raw) < 6 or raw[0] != TAG_DEVICE: return None length = (raw[2] << 8) | raw[3] if len(raw) < 5 + length + 1: return None data = raw[4:4 + length] if calc_crc(data) != raw[4 + length]: return None return (raw[1], data) class RemoDispViewer: def __init__(self, host: str, port: int = PORT): self.host = host self.port = port self._sock: Optional[socket.socket] = None self._init_info: Optional[dict] = None self._running = False self._refresh_thread: Optional[threading.Thread] = None def connect(self) -> bool: try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.settimeout(3.0) self._sock.connect((self.host, self.port)) self._sock.settimeout(0.5) return True except Exception: return False def disconnect(self): self._running = False if self._sock: try: self._sock.close() except Exception: pass self._sock = None def _send(self, cmd: int, data: bytes = b"") -> bool: if not self._sock: return False try: self._sock.sendall(build_frame(cmd, data)) return True except Exception: return False def _recv(self) -> Optional[Tuple[int, bytes]]: if not self._sock: return None try: header = self._sock.recv(5) if len(header) < 5: return None length = (header[2] << 8) | header[3] rest = self._sock.recv(length + 1) if len(rest) < length + 1: return None return parse_frame(header + rest) except (socket.timeout, BlockingIOError): return None except Exception: return None def request_init(self) -> Optional[dict]: if not self._send(CMD_INIT): return None for _ in range(20): result = self._recv() if result and result[0] == CMD_INIT: data = result[1] if len(data) >= 29: self._init_info = { "width": (data[0] << 8) | data[1], "height": (data[2] << 8) | data[3], "mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8], } return self._init_info time.sleep(0.05) return None def send_key(self, key: int) -> bool: return self._send(CMD_KEY, bytes([key])) def fetch_screen(self) -> Optional[bytes]: info = self._init_info if not info: info = self.request_init() if not info: return None mem_size = info["mem_size"] buffer = bytearray(mem_size) pos = 0 while pos < mem_size: if not self._send(CMD_LCDMEM, struct.pack(">I", pos)): return None for _ in range(30): result = self._recv() if result and result[0] == CMD_LCDMEM: payload = result[1] if len(payload) >= 4: start = struct.unpack(">I", payload[:4])[0] chunk = payload[4:] buffer[start : start + len(chunk)] = chunk pos = start + len(chunk) break time.sleep(0.02) else: return None return bytes(buffer) class ViewerApp: def __init__(self, host: str): self.host = host self.client = RemoDispViewer(host) self.root = tk.Tk() self.root.title(f"远程显示 - {host}") self.root.geometry("400x320") self._canvas = None self._photo = None self._scale = 2 self._build_ui() def _build_ui(self): # 连接区 frm = ttk.Frame(self.root, padding=5) frm.pack(fill=tk.X) ttk.Label(frm, text=f"目标: {self.host}:7003").pack(side=tk.LEFT) self._status = ttk.Label(frm, text="未连接") self._status.pack(side=tk.RIGHT) # 画布 self._canvas = tk.Canvas(self.root, width=320, height=320, bg="black") self._canvas.pack(padx=5, pady=5) # 按键区 btn_frm = ttk.Frame(self.root, padding=5) btn_frm.pack(fill=tk.X) keys = [ ("↑", KEY_U), ("↓", KEY_D), ("←", KEY_L), ("→", KEY_R), ("确认", KEY_ENT), ("取消", KEY_ESC), ("F1", KEY_F1), ("F2", KEY_F2), ] for label, key in keys: ttk.Button(btn_frm, text=label, width=6, command=lambda k=key: self._on_key(k)).pack(side=tk.LEFT, padx=2) # 刷新 ttk.Button(btn_frm, text="刷新", command=self._refresh).pack(side=tk.LEFT, padx=2) self.root.protocol("WM_DELETE_WINDOW", self._on_close) def _connect(self) -> bool: if self.client.connect(): if self.client.request_init(): self._status.config(text="已连接") return True self.client.disconnect() self._status.config(text="连接失败") return False def _on_key(self, key: int): if not self.client._sock: if not self._connect(): messagebox.showerror("错误", "请先连接装置") return self.client.send_key(key) def _refresh(self): if not self.client._sock: if not self._connect(): messagebox.showerror("错误", "连接失败") return data = self.client.fetch_screen() if not data: messagebox.showerror("错误", "拉取显存失败") return info = self.client._init_info w, h = info["width"], info["height"] # 单色转图像 try: from PIL import Image, ImageTk img = Image.new("1", (w, h)) pix = img.load() for y in range(h): for x in range(w): bi = (y * w + x) // 8 bit = 7 - (x % 8) pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 img = img.resize((w * self._scale, h * self._scale), Image.NEAREST) self._photo = ImageTk.PhotoImage(img) self._canvas.delete("all") self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo) except ImportError: # 无 PIL:用简单矩形模拟 self._canvas.delete("all") step = max(1, 320 // w) for y in range(min(h, 160)): for x in range(min(w, 160)): bi = (y * w + x) // 8 bit = 7 - (x % 8) if bi < len(data) and (data[bi] >> bit) & 1: self._canvas.create_rectangle( x * step, y * step, (x + 1) * step, (y + 1) * step, fill="white", outline="" ) def _on_close(self): self.client.disconnect() self.root.destroy() def run(self): self._connect() self._refresh() self.root.mainloop() def main(): if len(sys.argv) < 2: print("用法: python remo_disp_viewer.py <装置IP>") sys.exit(1) app = ViewerApp(sys.argv[1]) app.run() if __name__ == "__main__": main()