创建第一个版本的工程,基本逻辑和显示界面已经完成

This commit is contained in:
2026-03-02 15:47:47 +08:00
commit ba4431c01f
6 changed files with 1251 additions and 0 deletions

322
remo_disp_client.py Normal file
View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端
协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致):
- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC]
- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC]
- CRC: 数据区异或校验(不含 Tag、功能码、长度
- 端口: 7003
"""
import socket
import struct
import threading
import time
import argparse
from typing import Optional, Tuple, Callable
# 协议常量(与 RemoDispBus.h 一致)
TAG_CLIENT = 0xAA # 工具侧发送
TAG_DEVICE = 0xBB # 装置侧发送
PORT = 7003
# 功能码
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
CMD_ACK = 4
CMD_NCK = 5
CMD_IDLE = 0xFF
# 按键值(与 RemoDispBus.h 一致)
KEY_U = 0x02 # 上
KEY_D = 0x40 # 下
KEY_L = 0x10 # 左
KEY_R = 0x08 # 右
KEY_ENT = 0x20 # 确认
KEY_ESC = 0x01 # 取消
KEY_F1 = 0x04
KEY_F2 = 0x80
KEY_NONE = 0
def calc_crc(data: bytes) -> int:
"""异或校验,与 RDisp_CalCrc 一致"""
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
"""组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]"""
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
crc = calc_crc(data)
return header + data + bytes([crc])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]"""
if len(raw) < 6:
return None
if raw[0] != TAG_DEVICE:
return None
cmd = raw[1]
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
crc_recv = raw[4 + length]
if calc_crc(data) != crc_recv:
return None
return (cmd, data)
class RemoDispClient:
"""远程显示 TCP 客户端"""
def __init__(self, host: str, port: int = PORT, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
def connect(self) -> bool:
"""连接装置"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
"""发送报文"""
if not self._sock:
return False
frame = build_frame(cmd, data)
try:
self._sock.sendall(frame)
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
"""接收并解析一帧"""
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
raw = header + rest
return parse_frame(raw)
except socket.timeout:
return None
except Exception as e:
print(f"接收失败: {e}")
return None
def request_init(self) -> Optional[dict]:
"""召唤初始化,解析 LCD 尺寸、显存大小、按键映射"""
if not self._send(CMD_INIT):
return None
result = self._recv()
if not result or result[0] != CMD_INIT:
return None
data = result[1]
if len(data) < 29:
return None
# 与 RDisp_Form_InitInf 格式一致
width = (data[0] << 8) | data[1]
height = (data[2] << 8) | data[3]
lcd_type = data[4]
mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8]
rgb_red = struct.unpack(">I", data[9:13])[0]
rgb_green = struct.unpack(">I", data[13:17])[0]
rgb_blue = struct.unpack(">I", data[17:21])[0]
keys = list(data[21:29])
self._init_info = {
"width": width,
"height": height,
"lcd_type": lcd_type,
"mem_size": mem_size,
"rgb_red": rgb_red,
"rgb_green": rgb_green,
"rgb_blue": rgb_blue,
"keys": keys,
}
return self._init_info
def send_key(self, key_value: int) -> bool:
"""下发按键"""
return self._send(CMD_KEY, bytes([key_value & 0xFF]))
def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]:
"""召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)"""
data = struct.pack(">I", start_pos)
if not self._send(CMD_LCDMEM, data):
return None
result = self._recv()
if not result or result[0] != CMD_LCDMEM:
return None
payload = result[1]
if len(payload) < 4:
return None
pos = struct.unpack(">I", payload[:4])[0]
mem_data = payload[4:]
return (pos + len(mem_data), mem_data)
def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]:
"""拉取完整显存,支持断点续传"""
info = self._init_info or self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
result = self.request_lcdmem(pos)
if not result:
return None
next_pos, chunk = result
buffer[pos : pos + len(chunk)] = chunk
pos = next_pos
if on_progress:
on_progress(pos, mem_size)
return bytes(buffer)
def send_keepalive(self) -> bool:
"""发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)"""
return self._send(CMD_KEEPLIVE)
def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]":
"""单色显存转 PIL Image1bit -> 黑白图)"""
try:
from PIL import Image
except ImportError:
return None
img = Image.new("1", (width, height))
pix = img.load()
for y in range(height):
for x in range(width):
byte_idx = (y * width + x) // 8
bit_idx = 7 - (x % 8)
if byte_idx < len(data):
pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0
else:
pix[x, y] = 0
return img
def main():
parser = argparse.ArgumentParser(description="远程显示通信工具")
parser.add_argument("host", help="装置 IP 地址")
parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})")
parser.add_argument("--init", action="store_true", help="仅获取初始化信息")
parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png")
parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2")
parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE")
args = parser.parse_args()
key_map = {
"U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R,
"ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2,
}
client = RemoDispClient(args.host, args.port)
if not client.connect():
return 1
try:
if args.init:
info = client.request_init()
if info:
print("初始化信息:")
print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}")
print(f" 显存大小: {info['mem_size']} 字节")
print(f" 按键映射: {info['keys']}")
else:
print("获取初始化失败")
return 1
elif args.key:
k = key_map.get(args.key.upper(), KEY_NONE)
if k == KEY_NONE:
print(f"未知按键: {args.key}")
return 1
if client.send_key(k):
print(f"已发送按键: {args.key}")
else:
print("发送按键失败")
return 1
elif args.keepalive:
if client.send_keepalive():
print("已发送 KEEPLIVE")
else:
print("发送失败")
return 1
elif args.screen:
def progress(cur, total):
print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="")
data = client.fetch_full_screen(on_progress=progress)
if not data:
print("\n拉取显存失败")
return 1
info = client._init_info or {}
w = info.get("width", 160)
h = info.get("height", 160)
img = mono_to_image(data, w, h)
if img:
img.save("screen.png")
print(f"\n已保存 screen.png ({w}x{h})")
else:
with open("screen.raw", "wb") as f:
f.write(data)
print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)")
else:
# 默认:获取初始化并拉取一帧显存
info = client.request_init()
if info:
print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节")
result = client.request_lcdmem(0)
if result:
next_pos, chunk = result
print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}")
else:
print("召唤显存失败")
finally:
client.disconnect()
return 0
if __name__ == "__main__":
exit(main())