diff --git a/README.md b/README.md index b4ea83c..474ecaa 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,49 @@ # DTU-RemoteLCD - 远程显示通信工具 -网络调试的远程显示 LCD 模拟软件。与 `HMI/RemoeDisp/RemoDispBus.c` 协议兼容的 Python 客户端,用于远程查看装置 LCD 显存并模拟按键。 +与 **RemoDispBus** 协议兼容的远程 LCD 显示与按键模拟工具。通过网络连接 DTU 装置,实时查看 LCD 显存画面并模拟按键操作,适用于调试、监控等场景。 -## 协议说明 +## 功能特性 -- **端口**: 7003 -- **报文格式**: - - 工具 → 装置: `[0xAA][功能码][长度高][长度低][数据...][CRC]` - - 装置 → 工具: `[0xBB][功能码][长度高][长度低][数据...][CRC]` -- **CRC**: 数据区异或校验 -- **功能码**: KEEPLIVE=0, INIT=1, KEY=2, LCDMEM=3 +- **实时屏幕显示**:将装置 LCD 显存以 1bpp 位图形式拉取并渲染 +- **按键模拟**:支持方向键、确认、返回、复归等按键 +- **多种使用方式**:命令行、GUI、Web 浏览器 +- **WebSocket 传输**:Web 版采用 Socket.IO,服务端主动推送,低延迟 -## 命令行工具 (remo_disp_client.py) +## 环境要求 + +- Python 3.7+ +- 与 DTU 装置网络互通(默认端口 7003) + +## 快速开始 + +### 1. 安装依赖 ```bash -# 安装可选依赖(用于保存 PNG) pip install -r requirements.txt +``` -# 获取初始化信息 +### 2. Web 界面(推荐) + +```bash +python remo_disp_server.py [装置IP] +``` + +启动后访问 **http://localhost:8181**,在页面中: +- 点击「连接」→ 输入装置 IP 和端口(默认 7003)→ 连接 +- 左侧显示装置 LCD 画面(约 100ms 刷新) +- 右侧使用方向键、确认、复归、返回进行按键操作 +- 点击「断开」断开与装置的连接 + +**可选预连接**:启动时传入 IP 可预连接装置,例如: + +```bash +python remo_disp_server.py 192.168.253.3 +``` + +### 3. 命令行工具 + +```bash +# 获取初始化信息(屏幕宽高、显存大小) python remo_disp_client.py 192.168.1.100 --init # 拉取显存并保存为 screen.png @@ -28,9 +54,12 @@ python remo_disp_client.py 192.168.1.100 --key ENT # 发送 KEEPLIVE python remo_disp_client.py 192.168.1.100 --keepalive + +# 指定端口 +python remo_disp_client.py 192.168.1.100 -p 7003 --screen ``` -## GUI 查看器 (remo_disp_viewer.py) +### 4. GUI 查看器 ```bash python remo_disp_viewer.py 192.168.1.100 @@ -40,27 +69,70 @@ python remo_disp_viewer.py 192.168.1.100 - 点击按键模拟远程按键 - 点击「刷新」重新拉取显存 -## Web 界面 (remo_disp_server.py + remo_disp_ui.html) +## 项目结构 -```bash -pip install flask -python remo_disp_server.py +``` +DTU-RemoteLCD/ +├── remo_disp_server.py # Web 服务(Flask + Socket.IO) +├── remo_disp_ui.html # Web 前端页面 +├── remo_disp_client.py # 命令行客户端 +├── remo_disp_viewer.py # Tkinter GUI 查看器 +├── requirements.txt # Python 依赖 +├── static/ # 静态资源(可选) +│ └── favicon.ico # 网站图标 +└── README.md ``` -启动后访问 http://localhost:8080 ,界面包含: -- 顶部菜单:连接、设置、退出、关于 -- 左侧显示区:装置 LCD 画面(约 500ms 刷新) -- 右侧控制:方向键 + 确认、复归、返回 +## 协议说明(RemoDispBus) + +与 `HMI/RemoeDisp/RemoDispBus.c` 兼容: + +| 项目 | 说明 | +|------|------| +| **端口** | 7003 | +| **工具 → 装置** | `[0xAA][功能码][长度高][长度低][数据...][CRC]` | +| **装置 → 工具** | `[0xBB][功能码][长度高][长度低][数据...][CRC]` | +| **CRC** | 数据区逐字节异或,取低 8 位 | + +**功能码**: + +| 码 | 名称 | 说明 | +|----|------|------| +| 0 | KEEPLIVE | 保活 | +| 1 | INIT | 初始化,获取屏幕宽高、显存大小 | +| 2 | KEY | 按键 | +| 3 | LCDMEM | 读取显存(屏幕画面) | ## 按键映射 -| 键名 | 值 | +| 键名 | 协议码 | 说明 | +|------|--------|------| +| U | 0x02 | 上 | +| D | 0x40 | 下 | +| L | 0x10 | 左 | +| R | 0x08 | 右 | +| ENT | 0x20 | 确认 | +| ESC | 0x01 | 返回/取消 | +| F1/RESET | 0x04 | 复归 | +| F2 | 0x80 | F2 | + +## 依赖说明 + +| 依赖 | 用途 | |------|------| -| 上 | 0x02 | -| 下 | 0x40 | -| 左 | 0x10 | -| 右 | 0x08 | -| 确认 | 0x20 | -| 取消 | 0x01 | -| F1 | 0x04 | -| F2 | 0x80 | +| Flask | Web 框架 | +| flask-socketio | WebSocket 支持 | +| python-socketio | Socket.IO 服务端 | +| Pillow | 1bpp 位图转 PNG | + +`remo_disp_client.py` 仅需 Python 标准库;`remo_disp_viewer.py` 使用 Tkinter(Python 自带)。 + +## 配置说明 + +- **Web 服务端口**:默认 8181,在 `remo_disp_server.py` 的 `main()` 中修改 +- **装置端口**:默认 7003,连接时可输入自定义端口 +- **Favicon**:将 `favicon.ico` 放入 `static/` 目录即可显示网站图标 + +## 许可证 + +本项目仅供内部调试使用。 diff --git a/remo_disp_client.py b/remo_disp_client.py deleted file mode 100644 index ae8e3a0..0000000 --- a/remo_disp_client.py +++ /dev/null @@ -1,322 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端 - -协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致): -- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC] -- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC] -- CRC: 数据区异或校验(不含 Tag、功能码、长度) -- 端口: 7003 -""" - -import socket -import struct -import threading -import time -import argparse -from typing import Optional, Tuple, Callable - -# 协议常量(与 RemoDispBus.h 一致) -TAG_CLIENT = 0xAA # 工具侧发送 -TAG_DEVICE = 0xBB # 装置侧发送 -PORT = 7003 - -# 功能码 -CMD_KEEPLIVE = 0 -CMD_INIT = 1 -CMD_KEY = 2 -CMD_LCDMEM = 3 -CMD_ACK = 4 -CMD_NCK = 5 -CMD_IDLE = 0xFF - -# 按键值(与 RemoDispBus.h 一致) -KEY_U = 0x02 # 上 -KEY_D = 0x40 # 下 -KEY_L = 0x10 # 左 -KEY_R = 0x08 # 右 -KEY_ENT = 0x20 # 确认 -KEY_ESC = 0x01 # 取消 -KEY_F1 = 0x04 -KEY_F2 = 0x80 -KEY_NONE = 0 - - -def calc_crc(data: bytes) -> int: - """异或校验,与 RDisp_CalCrc 一致""" - crc = 0 - for b in data: - crc ^= b - return crc & 0xFF - - -def build_frame(cmd: int, data: bytes) -> bytes: - """组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]""" - length = len(data) - header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) - crc = calc_crc(data) - return header + data + bytes([crc]) - - -def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: - """解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]""" - if len(raw) < 6: - return None - if raw[0] != TAG_DEVICE: - return None - cmd = raw[1] - length = (raw[2] << 8) | raw[3] - if len(raw) < 5 + length + 1: - return None - data = raw[4:4 + length] - crc_recv = raw[4 + length] - if calc_crc(data) != crc_recv: - return None - return (cmd, data) - - -class RemoDispClient: - """远程显示 TCP 客户端""" - - def __init__(self, host: str, port: int = PORT, timeout: float = 5.0): - self.host = host - self.port = port - self.timeout = timeout - self._sock: Optional[socket.socket] = None - self._init_info: Optional[dict] = None - - def connect(self) -> bool: - """连接装置""" - try: - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.settimeout(self.timeout) - self._sock.connect((self.host, self.port)) - return True - except Exception as e: - print(f"连接失败: {e}") - return False - - def disconnect(self): - """断开连接""" - if self._sock: - try: - self._sock.close() - except Exception: - pass - self._sock = None - - def _send(self, cmd: int, data: bytes = b"") -> bool: - """发送报文""" - if not self._sock: - return False - frame = build_frame(cmd, data) - try: - self._sock.sendall(frame) - return True - except Exception as e: - print(f"发送失败: {e}") - return False - - def _recv(self) -> Optional[Tuple[int, bytes]]: - """接收并解析一帧""" - if not self._sock: - return None - try: - header = self._sock.recv(5) - if len(header) < 5: - return None - length = (header[2] << 8) | header[3] - rest = self._sock.recv(length + 1) - if len(rest) < length + 1: - return None - raw = header + rest - return parse_frame(raw) - except socket.timeout: - return None - except Exception as e: - print(f"接收失败: {e}") - return None - - def request_init(self) -> Optional[dict]: - """召唤初始化,解析 LCD 尺寸、显存大小、按键映射""" - if not self._send(CMD_INIT): - return None - result = self._recv() - if not result or result[0] != CMD_INIT: - return None - data = result[1] - if len(data) < 29: - return None - # 与 RDisp_Form_InitInf 格式一致 - width = (data[0] << 8) | data[1] - height = (data[2] << 8) | data[3] - lcd_type = data[4] - mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8] - rgb_red = struct.unpack(">I", data[9:13])[0] - rgb_green = struct.unpack(">I", data[13:17])[0] - rgb_blue = struct.unpack(">I", data[17:21])[0] - keys = list(data[21:29]) - self._init_info = { - "width": width, - "height": height, - "lcd_type": lcd_type, - "mem_size": mem_size, - "rgb_red": rgb_red, - "rgb_green": rgb_green, - "rgb_blue": rgb_blue, - "keys": keys, - } - return self._init_info - - def send_key(self, key_value: int) -> bool: - """下发按键""" - return self._send(CMD_KEY, bytes([key_value & 0xFF])) - - def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]: - """召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)""" - data = struct.pack(">I", start_pos) - if not self._send(CMD_LCDMEM, data): - return None - result = self._recv() - if not result or result[0] != CMD_LCDMEM: - return None - payload = result[1] - if len(payload) < 4: - return None - pos = struct.unpack(">I", payload[:4])[0] - mem_data = payload[4:] - return (pos + len(mem_data), mem_data) - - def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]: - """拉取完整显存,支持断点续传""" - info = self._init_info or self.request_init() - if not info: - return None - mem_size = info["mem_size"] - buffer = bytearray(mem_size) - pos = 0 - while pos < mem_size: - result = self.request_lcdmem(pos) - if not result: - return None - next_pos, chunk = result - buffer[pos : pos + len(chunk)] = chunk - pos = next_pos - if on_progress: - on_progress(pos, mem_size) - return bytes(buffer) - - def send_keepalive(self) -> bool: - """发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)""" - return self._send(CMD_KEEPLIVE) - - -def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]": - """单色显存转 PIL Image(1bit -> 黑白图)""" - try: - from PIL import Image - except ImportError: - return None - img = Image.new("1", (width, height)) - pix = img.load() - for y in range(height): - for x in range(width): - byte_idx = (y * width + x) // 8 - bit_idx = 7 - (x % 8) - if byte_idx < len(data): - pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0 - else: - pix[x, y] = 0 - return img - - -def main(): - parser = argparse.ArgumentParser(description="远程显示通信工具") - parser.add_argument("host", help="装置 IP 地址") - parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})") - parser.add_argument("--init", action="store_true", help="仅获取初始化信息") - parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png") - parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2") - parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE") - args = parser.parse_args() - - key_map = { - "U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R, - "ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2, - } - - client = RemoDispClient(args.host, args.port) - if not client.connect(): - return 1 - - try: - if args.init: - info = client.request_init() - if info: - print("初始化信息:") - print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}") - print(f" 显存大小: {info['mem_size']} 字节") - print(f" 按键映射: {info['keys']}") - else: - print("获取初始化失败") - return 1 - - elif args.key: - k = key_map.get(args.key.upper(), KEY_NONE) - if k == KEY_NONE: - print(f"未知按键: {args.key}") - return 1 - if client.send_key(k): - print(f"已发送按键: {args.key}") - else: - print("发送按键失败") - return 1 - - elif args.keepalive: - if client.send_keepalive(): - print("已发送 KEEPLIVE") - else: - print("发送失败") - return 1 - - elif args.screen: - def progress(cur, total): - print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="") - - data = client.fetch_full_screen(on_progress=progress) - if not data: - print("\n拉取显存失败") - return 1 - info = client._init_info or {} - w = info.get("width", 160) - h = info.get("height", 160) - img = mono_to_image(data, w, h) - if img: - img.save("screen.png") - print(f"\n已保存 screen.png ({w}x{h})") - else: - with open("screen.raw", "wb") as f: - f.write(data) - print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)") - - else: - # 默认:获取初始化并拉取一帧显存 - info = client.request_init() - if info: - print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节") - result = client.request_lcdmem(0) - if result: - next_pos, chunk = result - print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}") - else: - print("召唤显存失败") - - finally: - client.disconnect() - - return 0 - - -if __name__ == "__main__": - exit(main()) diff --git a/remo_disp_viewer.py b/remo_disp_viewer.py deleted file mode 100644 index 8ac9efa..0000000 --- a/remo_disp_viewer.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -远程显示查看器 - 带 GUI 的实时显存查看与按键模拟 - -用法: python remo_disp_viewer.py <装置IP> -""" - -import socket -import struct -import threading -import time -import sys -import tkinter as tk -from tkinter import ttk, messagebox -from typing import Optional, Tuple - -# 协议常量 -TAG_CLIENT = 0xAA -TAG_DEVICE = 0xBB -PORT = 7003 -CMD_KEEPLIVE = 0 -CMD_INIT = 1 -CMD_KEY = 2 -CMD_LCDMEM = 3 - -KEY_U, KEY_D, KEY_L, KEY_R = 0x02, 0x40, 0x10, 0x08 -KEY_ENT, KEY_ESC, KEY_F1, KEY_F2 = 0x20, 0x01, 0x04, 0x80 - - -def calc_crc(data: bytes) -> int: - crc = 0 - for b in data: - crc ^= b - return crc & 0xFF - - -def build_frame(cmd: int, data: bytes) -> bytes: - length = len(data) - header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) - return header + data + bytes([calc_crc(data)]) - - -def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: - if len(raw) < 6 or raw[0] != TAG_DEVICE: - return None - length = (raw[2] << 8) | raw[3] - if len(raw) < 5 + length + 1: - return None - data = raw[4:4 + length] - if calc_crc(data) != raw[4 + length]: - return None - return (raw[1], data) - - -class RemoDispViewer: - def __init__(self, host: str, port: int = PORT): - self.host = host - self.port = port - self._sock: Optional[socket.socket] = None - self._init_info: Optional[dict] = None - self._running = False - self._refresh_thread: Optional[threading.Thread] = None - - def connect(self) -> bool: - try: - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.settimeout(3.0) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(0.5) - return True - except Exception: - return False - - def disconnect(self): - self._running = False - if self._sock: - try: - self._sock.close() - except Exception: - pass - self._sock = None - - def _send(self, cmd: int, data: bytes = b"") -> bool: - if not self._sock: - return False - try: - self._sock.sendall(build_frame(cmd, data)) - return True - except Exception: - return False - - def _recv(self) -> Optional[Tuple[int, bytes]]: - if not self._sock: - return None - try: - header = self._sock.recv(5) - if len(header) < 5: - return None - length = (header[2] << 8) | header[3] - rest = self._sock.recv(length + 1) - if len(rest) < length + 1: - return None - return parse_frame(header + rest) - except (socket.timeout, BlockingIOError): - return None - except Exception: - return None - - def request_init(self) -> Optional[dict]: - if not self._send(CMD_INIT): - return None - for _ in range(20): - result = self._recv() - if result and result[0] == CMD_INIT: - data = result[1] - if len(data) >= 29: - self._init_info = { - "width": (data[0] << 8) | data[1], - "height": (data[2] << 8) | data[3], - "mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8], - } - return self._init_info - time.sleep(0.05) - return None - - def send_key(self, key: int) -> bool: - return self._send(CMD_KEY, bytes([key])) - - def fetch_screen(self) -> Optional[bytes]: - info = self._init_info - if not info: - info = self.request_init() - if not info: - return None - mem_size = info["mem_size"] - buffer = bytearray(mem_size) - pos = 0 - while pos < mem_size: - if not self._send(CMD_LCDMEM, struct.pack(">I", pos)): - return None - for _ in range(30): - result = self._recv() - if result and result[0] == CMD_LCDMEM: - payload = result[1] - if len(payload) >= 4: - start = struct.unpack(">I", payload[:4])[0] - chunk = payload[4:] - buffer[start : start + len(chunk)] = chunk - pos = start + len(chunk) - break - time.sleep(0.02) - else: - return None - return bytes(buffer) - - -class ViewerApp: - def __init__(self, host: str): - self.host = host - self.client = RemoDispViewer(host) - self.root = tk.Tk() - self.root.title(f"远程显示 - {host}") - self.root.geometry("400x320") - self._canvas = None - self._photo = None - self._scale = 2 - self._build_ui() - - def _build_ui(self): - # 连接区 - frm = ttk.Frame(self.root, padding=5) - frm.pack(fill=tk.X) - ttk.Label(frm, text=f"目标: {self.host}:7003").pack(side=tk.LEFT) - self._status = ttk.Label(frm, text="未连接") - self._status.pack(side=tk.RIGHT) - - # 画布 - self._canvas = tk.Canvas(self.root, width=320, height=320, bg="black") - self._canvas.pack(padx=5, pady=5) - - # 按键区 - btn_frm = ttk.Frame(self.root, padding=5) - btn_frm.pack(fill=tk.X) - keys = [ - ("↑", KEY_U), ("↓", KEY_D), ("←", KEY_L), ("→", KEY_R), - ("确认", KEY_ENT), ("取消", KEY_ESC), ("F1", KEY_F1), ("F2", KEY_F2), - ] - for label, key in keys: - ttk.Button(btn_frm, text=label, width=6, - command=lambda k=key: self._on_key(k)).pack(side=tk.LEFT, padx=2) - - # 刷新 - ttk.Button(btn_frm, text="刷新", command=self._refresh).pack(side=tk.LEFT, padx=2) - - self.root.protocol("WM_DELETE_WINDOW", self._on_close) - - def _connect(self) -> bool: - if self.client.connect(): - if self.client.request_init(): - self._status.config(text="已连接") - return True - self.client.disconnect() - self._status.config(text="连接失败") - return False - - def _on_key(self, key: int): - if not self.client._sock: - if not self._connect(): - messagebox.showerror("错误", "请先连接装置") - return - self.client.send_key(key) - - def _refresh(self): - if not self.client._sock: - if not self._connect(): - messagebox.showerror("错误", "连接失败") - return - data = self.client.fetch_screen() - if not data: - messagebox.showerror("错误", "拉取显存失败") - return - info = self.client._init_info - w, h = info["width"], info["height"] - # 单色转图像 - try: - from PIL import Image, ImageTk - img = Image.new("1", (w, h)) - pix = img.load() - for y in range(h): - for x in range(w): - bi = (y * w + x) // 8 - bit = 7 - (x % 8) - pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 - img = img.resize((w * self._scale, h * self._scale), Image.NEAREST) - self._photo = ImageTk.PhotoImage(img) - self._canvas.delete("all") - self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo) - except ImportError: - # 无 PIL:用简单矩形模拟 - self._canvas.delete("all") - step = max(1, 320 // w) - for y in range(min(h, 160)): - for x in range(min(w, 160)): - bi = (y * w + x) // 8 - bit = 7 - (x % 8) - if bi < len(data) and (data[bi] >> bit) & 1: - self._canvas.create_rectangle( - x * step, y * step, (x + 1) * step, (y + 1) * step, - fill="white", outline="" - ) - - def _on_close(self): - self.client.disconnect() - self.root.destroy() - - def run(self): - self._connect() - self._refresh() - self.root.mainloop() - - -def main(): - if len(sys.argv) < 2: - print("用法: python remo_disp_viewer.py <装置IP>") - sys.exit(1) - app = ViewerApp(sys.argv[1]) - app.run() - - -if __name__ == "__main__": - main()