commit ba4431c01f54c3148e2696d7d1d941fbcf2bbe57 Author: Wanderingss <1624155937@qq.com> Date: Mon Mar 2 15:47:47 2026 +0800 创建第一个版本的工程,基本逻辑和显示界面已经完成 diff --git a/README.md b/README.md new file mode 100644 index 0000000..28f1ac3 --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +# 远程显示通信工具 + +与 `HMI/RemoeDisp/RemoDispBus.c` 协议兼容的 Python 客户端,用于远程查看装置 LCD 显存并模拟按键。 + +## 协议说明 + +- **端口**: 7003 +- **报文格式**: + - 工具 → 装置: `[0xAA][功能码][长度高][长度低][数据...][CRC]` + - 装置 → 工具: `[0xBB][功能码][长度高][长度低][数据...][CRC]` +- **CRC**: 数据区异或校验 +- **功能码**: KEEPLIVE=0, INIT=1, KEY=2, LCDMEM=3 + +## 命令行工具 (remo_disp_client.py) + +```bash +# 安装可选依赖(用于保存 PNG) +pip install -r requirements.txt + +# 获取初始化信息 +python remo_disp_client.py 192.168.1.100 --init + +# 拉取显存并保存为 screen.png +python remo_disp_client.py 192.168.1.100 --screen + +# 发送按键 (U/D/L/R/ENT/ESC/F1/F2) +python remo_disp_client.py 192.168.1.100 --key ENT + +# 发送 KEEPLIVE +python remo_disp_client.py 192.168.1.100 --keepalive +``` + +## GUI 查看器 (remo_disp_viewer.py) + +```bash +python remo_disp_viewer.py 192.168.1.100 +``` + +- 连接装置并实时显示 LCD 画面 +- 点击按键模拟远程按键 +- 点击「刷新」重新拉取显存 + +## Web 界面 (remo_disp_server.py + remo_disp_ui.html) + +```bash +pip install flask +python remo_disp_server.py +``` + +启动后访问 http://localhost:8080 ,界面包含: +- 顶部菜单:连接、设置、退出、关于 +- 左侧显示区:装置 LCD 画面(约 500ms 刷新) +- 右侧控制:方向键 + 确认、复归、返回 + +## 按键映射 + +| 键名 | 值 | +|------|------| +| 上 | 0x02 | +| 下 | 0x40 | +| 左 | 0x10 | +| 右 | 0x08 | +| 确认 | 0x20 | +| 取消 | 0x01 | +| F1 | 0x04 | +| F2 | 0x80 | diff --git a/remo_disp_client.py b/remo_disp_client.py new file mode 100644 index 0000000..ae8e3a0 --- /dev/null +++ b/remo_disp_client.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端 + +协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致): +- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC] +- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC] +- CRC: 数据区异或校验(不含 Tag、功能码、长度) +- 端口: 7003 +""" + +import socket +import struct +import threading +import time +import argparse +from typing import Optional, Tuple, Callable + +# 协议常量(与 RemoDispBus.h 一致) +TAG_CLIENT = 0xAA # 工具侧发送 +TAG_DEVICE = 0xBB # 装置侧发送 +PORT = 7003 + +# 功能码 +CMD_KEEPLIVE = 0 +CMD_INIT = 1 +CMD_KEY = 2 +CMD_LCDMEM = 3 +CMD_ACK = 4 +CMD_NCK = 5 +CMD_IDLE = 0xFF + +# 按键值(与 RemoDispBus.h 一致) +KEY_U = 0x02 # 上 +KEY_D = 0x40 # 下 +KEY_L = 0x10 # 左 +KEY_R = 0x08 # 右 +KEY_ENT = 0x20 # 确认 +KEY_ESC = 0x01 # 取消 +KEY_F1 = 0x04 +KEY_F2 = 0x80 +KEY_NONE = 0 + + +def calc_crc(data: bytes) -> int: + """异或校验,与 RDisp_CalCrc 一致""" + crc = 0 + for b in data: + crc ^= b + return crc & 0xFF + + +def build_frame(cmd: int, data: bytes) -> bytes: + """组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]""" + length = len(data) + header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) + crc = calc_crc(data) + return header + data + bytes([crc]) + + +def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: + """解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]""" + if len(raw) < 6: + return None + if raw[0] != TAG_DEVICE: + return None + cmd = raw[1] + length = (raw[2] << 8) | raw[3] + if len(raw) < 5 + length + 1: + return None + data = raw[4:4 + length] + crc_recv = raw[4 + length] + if calc_crc(data) != crc_recv: + return None + return (cmd, data) + + +class RemoDispClient: + """远程显示 TCP 客户端""" + + def __init__(self, host: str, port: int = PORT, timeout: float = 5.0): + self.host = host + self.port = port + self.timeout = timeout + self._sock: Optional[socket.socket] = None + self._init_info: Optional[dict] = None + + def connect(self) -> bool: + """连接装置""" + try: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.settimeout(self.timeout) + self._sock.connect((self.host, self.port)) + return True + except Exception as e: + print(f"连接失败: {e}") + return False + + def disconnect(self): + """断开连接""" + if self._sock: + try: + self._sock.close() + except Exception: + pass + self._sock = None + + def _send(self, cmd: int, data: bytes = b"") -> bool: + """发送报文""" + if not self._sock: + return False + frame = build_frame(cmd, data) + try: + self._sock.sendall(frame) + return True + except Exception as e: + print(f"发送失败: {e}") + return False + + def _recv(self) -> Optional[Tuple[int, bytes]]: + """接收并解析一帧""" + if not self._sock: + return None + try: + header = self._sock.recv(5) + if len(header) < 5: + return None + length = (header[2] << 8) | header[3] + rest = self._sock.recv(length + 1) + if len(rest) < length + 1: + return None + raw = header + rest + return parse_frame(raw) + except socket.timeout: + return None + except Exception as e: + print(f"接收失败: {e}") + return None + + def request_init(self) -> Optional[dict]: + """召唤初始化,解析 LCD 尺寸、显存大小、按键映射""" + if not self._send(CMD_INIT): + return None + result = self._recv() + if not result or result[0] != CMD_INIT: + return None + data = result[1] + if len(data) < 29: + return None + # 与 RDisp_Form_InitInf 格式一致 + width = (data[0] << 8) | data[1] + height = (data[2] << 8) | data[3] + lcd_type = data[4] + mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8] + rgb_red = struct.unpack(">I", data[9:13])[0] + rgb_green = struct.unpack(">I", data[13:17])[0] + rgb_blue = struct.unpack(">I", data[17:21])[0] + keys = list(data[21:29]) + self._init_info = { + "width": width, + "height": height, + "lcd_type": lcd_type, + "mem_size": mem_size, + "rgb_red": rgb_red, + "rgb_green": rgb_green, + "rgb_blue": rgb_blue, + "keys": keys, + } + return self._init_info + + def send_key(self, key_value: int) -> bool: + """下发按键""" + return self._send(CMD_KEY, bytes([key_value & 0xFF])) + + def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]: + """召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)""" + data = struct.pack(">I", start_pos) + if not self._send(CMD_LCDMEM, data): + return None + result = self._recv() + if not result or result[0] != CMD_LCDMEM: + return None + payload = result[1] + if len(payload) < 4: + return None + pos = struct.unpack(">I", payload[:4])[0] + mem_data = payload[4:] + return (pos + len(mem_data), mem_data) + + def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]: + """拉取完整显存,支持断点续传""" + info = self._init_info or self.request_init() + if not info: + return None + mem_size = info["mem_size"] + buffer = bytearray(mem_size) + pos = 0 + while pos < mem_size: + result = self.request_lcdmem(pos) + if not result: + return None + next_pos, chunk = result + buffer[pos : pos + len(chunk)] = chunk + pos = next_pos + if on_progress: + on_progress(pos, mem_size) + return bytes(buffer) + + def send_keepalive(self) -> bool: + """发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)""" + return self._send(CMD_KEEPLIVE) + + +def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]": + """单色显存转 PIL Image(1bit -> 黑白图)""" + try: + from PIL import Image + except ImportError: + return None + img = Image.new("1", (width, height)) + pix = img.load() + for y in range(height): + for x in range(width): + byte_idx = (y * width + x) // 8 + bit_idx = 7 - (x % 8) + if byte_idx < len(data): + pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0 + else: + pix[x, y] = 0 + return img + + +def main(): + parser = argparse.ArgumentParser(description="远程显示通信工具") + parser.add_argument("host", help="装置 IP 地址") + parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})") + parser.add_argument("--init", action="store_true", help="仅获取初始化信息") + parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png") + parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2") + parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE") + args = parser.parse_args() + + key_map = { + "U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R, + "ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2, + } + + client = RemoDispClient(args.host, args.port) + if not client.connect(): + return 1 + + try: + if args.init: + info = client.request_init() + if info: + print("初始化信息:") + print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}") + print(f" 显存大小: {info['mem_size']} 字节") + print(f" 按键映射: {info['keys']}") + else: + print("获取初始化失败") + return 1 + + elif args.key: + k = key_map.get(args.key.upper(), KEY_NONE) + if k == KEY_NONE: + print(f"未知按键: {args.key}") + return 1 + if client.send_key(k): + print(f"已发送按键: {args.key}") + else: + print("发送按键失败") + return 1 + + elif args.keepalive: + if client.send_keepalive(): + print("已发送 KEEPLIVE") + else: + print("发送失败") + return 1 + + elif args.screen: + def progress(cur, total): + print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="") + + data = client.fetch_full_screen(on_progress=progress) + if not data: + print("\n拉取显存失败") + return 1 + info = client._init_info or {} + w = info.get("width", 160) + h = info.get("height", 160) + img = mono_to_image(data, w, h) + if img: + img.save("screen.png") + print(f"\n已保存 screen.png ({w}x{h})") + else: + with open("screen.raw", "wb") as f: + f.write(data) + print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)") + + else: + # 默认:获取初始化并拉取一帧显存 + info = client.request_init() + if info: + print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节") + result = client.request_lcdmem(0) + if result: + next_pos, chunk = result + print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}") + else: + print("召唤显存失败") + + finally: + client.disconnect() + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/remo_disp_server.py b/remo_disp_server.py new file mode 100644 index 0000000..ae56f06 --- /dev/null +++ b/remo_disp_server.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +远程显示 Web 服务 - Flask 实现 + +用法: python remo_disp_server.py [装置IP] +启动后访问 http://localhost:8181 +依赖: pip install flask +""" + +import socket +import struct +import time +import sys +import os +import io +from typing import Optional, Tuple + +from flask import Flask, request, jsonify, send_file, Response + +# 协议常量 +TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB +PORT = 7003 +CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3 + +# 全局连接 +_sock: Optional[socket.socket] = None +_init_info: Optional[dict] = None + + +def calc_crc(data: bytes) -> int: + crc = 0 + for b in data: + crc ^= b + return crc & 0xFF + + +def build_frame(cmd: int, data: bytes) -> bytes: + length = len(data) + header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) + return header + data + bytes([calc_crc(data)]) + + +def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: + if len(raw) < 6 or raw[0] != TAG_DEVICE: + return None + length = (raw[2] << 8) | raw[3] + if len(raw) < 5 + length + 1: + return None + data = raw[4:4 + length] + if calc_crc(data) != raw[4 + length]: + return None + return (raw[1], data) + + +def connect(host: str, port: int = PORT) -> bool: + global _sock, _init_info + try: + if _sock: + _sock.close() + _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + _sock.settimeout(5.0) + _sock.connect((host, port)) + _sock.settimeout(2.0) + _init_info = _request_init() + return True + except Exception: + return False + + +def _send(cmd: int, data: bytes = b"") -> bool: + if not _sock: + return False + try: + _sock.sendall(build_frame(cmd, data)) + return True + except Exception: + return False + + +def _recv() -> Optional[Tuple[int, bytes]]: + if not _sock: + return None + try: + header = _sock.recv(5) + if len(header) < 5: + return None + length = (header[2] << 8) | header[3] + rest = _sock.recv(length + 1) + if len(rest) < length + 1: + return None + return parse_frame(header + rest) + except Exception: + return None + + +def _request_init() -> Optional[dict]: + if not _send(CMD_INIT): + return None + for _ in range(30): + result = _recv() + if result and result[0] == CMD_INIT: + data = result[1] + if len(data) >= 29: + return { + "width": (data[0] << 8) | data[1], + "height": (data[2] << 8) | data[3], + "mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8], + } + time.sleep(0.05) + return None + + +def send_key(code: int) -> bool: + return _send(CMD_KEY, bytes([code & 0xFF])) + + +def fetch_screen() -> Optional[bytes]: + global _init_info + info = _init_info or _request_init() + if not info: + return None + mem_size = info["mem_size"] + buffer = bytearray(mem_size) + pos = 0 + while pos < mem_size: + if not _send(CMD_LCDMEM, struct.pack(">I", pos)): + return None + for _ in range(40): + result = _recv() + if result and result[0] == CMD_LCDMEM: + payload = result[1] + if len(payload) >= 4: + start = struct.unpack(">I", payload[:4])[0] + chunk = payload[4:] + buffer[start : start + len(chunk)] = chunk + pos = start + len(chunk) + break + time.sleep(0.02) + else: + return None + return bytes(buffer) + + +def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]: + try: + from PIL import Image + img = Image.new("1", (width, height)) + pix = img.load() + for y in range(height): + for x in range(width): + bi = (y * width + x) // 8 + bit = 7 - (x % 8) + pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 + buf = io.BytesIO() + img.save(buf, format="PNG") + return buf.getvalue() + except ImportError: + return None + + +app = Flask(__name__) + + +@app.route("/") +@app.route("/index.html") +def index(): + return send_file( + os.path.join(os.path.dirname(__file__), "remo_disp_ui.html"), + mimetype="text/html; charset=utf-8", + ) + + +@app.route("/connect") +def api_connect(): + host = request.args.get("host", "").strip() + port = int(request.args.get("port", PORT)) + ok = connect(host, port) + return jsonify(success=ok, error=None if ok else "connect failed") + + +@app.route("/key") +def api_key(): + code = int(request.args.get("code", 0)) + send_key(code) + return jsonify(ok=True) + + +@app.route("/screen") +def api_screen(): + data = fetch_screen() + if not data: + return "", 500 + info = _init_info or {} + w, h = info.get("width", 160), info.get("height", 160) + png = mono_to_png(data, w, h) + if png: + return Response(png, mimetype="image/png") + resp = Response(data, mimetype="application/octet-stream") + resp.headers["X-Width"] = str(w) + resp.headers["X-Height"] = str(h) + return resp + + +def main(): + port = 8181 + print(f"远程显示服务: http://localhost:{port}") + print("在浏览器中打开上述地址,点击「连接」输入装置 IP") + if len(sys.argv) >= 2: + connect(sys.argv[1]) + print(f"已预连接: {sys.argv[1]}") + app.run(host="0.0.0.0", port=port, debug=False, threaded=True) + + +if __name__ == "__main__": + main() diff --git a/remo_disp_ui.html b/remo_disp_ui.html new file mode 100644 index 0000000..6f1a8d0 --- /dev/null +++ b/remo_disp_ui.html @@ -0,0 +1,369 @@ + + +
+ + +