Compare commits

...

2 Commits

6 changed files with 1250 additions and 2 deletions

View File

@@ -1,3 +1,66 @@
# DTU-RemtoeLCD # DTU-RemoteLCD - 远程显示通信工具
网络调试的远程显示LCD模拟软件 网络调试的远程显示 LCD 模拟软件。与 `HMI/RemoeDisp/RemoDispBus.c` 协议兼容的 Python 客户端,用于远程查看装置 LCD 显存并模拟按键。
## 协议说明
- **端口**: 7003
- **报文格式**:
- 工具 → 装置: `[0xAA][功能码][长度高][长度低][数据...][CRC]`
- 装置 → 工具: `[0xBB][功能码][长度高][长度低][数据...][CRC]`
- **CRC**: 数据区异或校验
- **功能码**: KEEPLIVE=0, INIT=1, KEY=2, LCDMEM=3
## 命令行工具 (remo_disp_client.py)
```bash
# 安装可选依赖(用于保存 PNG
pip install -r requirements.txt
# 获取初始化信息
python remo_disp_client.py 192.168.1.100 --init
# 拉取显存并保存为 screen.png
python remo_disp_client.py 192.168.1.100 --screen
# 发送按键 (U/D/L/R/ENT/ESC/F1/F2)
python remo_disp_client.py 192.168.1.100 --key ENT
# 发送 KEEPLIVE
python remo_disp_client.py 192.168.1.100 --keepalive
```
## GUI 查看器 (remo_disp_viewer.py)
```bash
python remo_disp_viewer.py 192.168.1.100
```
- 连接装置并实时显示 LCD 画面
- 点击按键模拟远程按键
- 点击「刷新」重新拉取显存
## Web 界面 (remo_disp_server.py + remo_disp_ui.html)
```bash
pip install flask
python remo_disp_server.py
```
启动后访问 http://localhost:8080 ,界面包含:
- 顶部菜单:连接、设置、退出、关于
- 左侧显示区:装置 LCD 画面(约 500ms 刷新)
- 右侧控制:方向键 + 确认、复归、返回
## 按键映射
| 键名 | 值 |
|------|------|
| 上 | 0x02 |
| 下 | 0x40 |
| 左 | 0x10 |
| 右 | 0x08 |
| 确认 | 0x20 |
| 取消 | 0x01 |
| F1 | 0x04 |
| F2 | 0x80 |

322
remo_disp_client.py Normal file
View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端
协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致):
- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC]
- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC]
- CRC: 数据区异或校验(不含 Tag、功能码、长度
- 端口: 7003
"""
import socket
import struct
import threading
import time
import argparse
from typing import Optional, Tuple, Callable
# 协议常量(与 RemoDispBus.h 一致)
TAG_CLIENT = 0xAA # 工具侧发送
TAG_DEVICE = 0xBB # 装置侧发送
PORT = 7003
# 功能码
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
CMD_ACK = 4
CMD_NCK = 5
CMD_IDLE = 0xFF
# 按键值(与 RemoDispBus.h 一致)
KEY_U = 0x02 # 上
KEY_D = 0x40 # 下
KEY_L = 0x10 # 左
KEY_R = 0x08 # 右
KEY_ENT = 0x20 # 确认
KEY_ESC = 0x01 # 取消
KEY_F1 = 0x04
KEY_F2 = 0x80
KEY_NONE = 0
def calc_crc(data: bytes) -> int:
"""异或校验,与 RDisp_CalCrc 一致"""
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
"""组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]"""
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
crc = calc_crc(data)
return header + data + bytes([crc])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]"""
if len(raw) < 6:
return None
if raw[0] != TAG_DEVICE:
return None
cmd = raw[1]
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
crc_recv = raw[4 + length]
if calc_crc(data) != crc_recv:
return None
return (cmd, data)
class RemoDispClient:
"""远程显示 TCP 客户端"""
def __init__(self, host: str, port: int = PORT, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
def connect(self) -> bool:
"""连接装置"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
"""发送报文"""
if not self._sock:
return False
frame = build_frame(cmd, data)
try:
self._sock.sendall(frame)
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
"""接收并解析一帧"""
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
raw = header + rest
return parse_frame(raw)
except socket.timeout:
return None
except Exception as e:
print(f"接收失败: {e}")
return None
def request_init(self) -> Optional[dict]:
"""召唤初始化,解析 LCD 尺寸、显存大小、按键映射"""
if not self._send(CMD_INIT):
return None
result = self._recv()
if not result or result[0] != CMD_INIT:
return None
data = result[1]
if len(data) < 29:
return None
# 与 RDisp_Form_InitInf 格式一致
width = (data[0] << 8) | data[1]
height = (data[2] << 8) | data[3]
lcd_type = data[4]
mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8]
rgb_red = struct.unpack(">I", data[9:13])[0]
rgb_green = struct.unpack(">I", data[13:17])[0]
rgb_blue = struct.unpack(">I", data[17:21])[0]
keys = list(data[21:29])
self._init_info = {
"width": width,
"height": height,
"lcd_type": lcd_type,
"mem_size": mem_size,
"rgb_red": rgb_red,
"rgb_green": rgb_green,
"rgb_blue": rgb_blue,
"keys": keys,
}
return self._init_info
def send_key(self, key_value: int) -> bool:
"""下发按键"""
return self._send(CMD_KEY, bytes([key_value & 0xFF]))
def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]:
"""召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)"""
data = struct.pack(">I", start_pos)
if not self._send(CMD_LCDMEM, data):
return None
result = self._recv()
if not result or result[0] != CMD_LCDMEM:
return None
payload = result[1]
if len(payload) < 4:
return None
pos = struct.unpack(">I", payload[:4])[0]
mem_data = payload[4:]
return (pos + len(mem_data), mem_data)
def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]:
"""拉取完整显存,支持断点续传"""
info = self._init_info or self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
result = self.request_lcdmem(pos)
if not result:
return None
next_pos, chunk = result
buffer[pos : pos + len(chunk)] = chunk
pos = next_pos
if on_progress:
on_progress(pos, mem_size)
return bytes(buffer)
def send_keepalive(self) -> bool:
"""发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)"""
return self._send(CMD_KEEPLIVE)
def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]":
"""单色显存转 PIL Image1bit -> 黑白图)"""
try:
from PIL import Image
except ImportError:
return None
img = Image.new("1", (width, height))
pix = img.load()
for y in range(height):
for x in range(width):
byte_idx = (y * width + x) // 8
bit_idx = 7 - (x % 8)
if byte_idx < len(data):
pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0
else:
pix[x, y] = 0
return img
def main():
parser = argparse.ArgumentParser(description="远程显示通信工具")
parser.add_argument("host", help="装置 IP 地址")
parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})")
parser.add_argument("--init", action="store_true", help="仅获取初始化信息")
parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png")
parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2")
parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE")
args = parser.parse_args()
key_map = {
"U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R,
"ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2,
}
client = RemoDispClient(args.host, args.port)
if not client.connect():
return 1
try:
if args.init:
info = client.request_init()
if info:
print("初始化信息:")
print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}")
print(f" 显存大小: {info['mem_size']} 字节")
print(f" 按键映射: {info['keys']}")
else:
print("获取初始化失败")
return 1
elif args.key:
k = key_map.get(args.key.upper(), KEY_NONE)
if k == KEY_NONE:
print(f"未知按键: {args.key}")
return 1
if client.send_key(k):
print(f"已发送按键: {args.key}")
else:
print("发送按键失败")
return 1
elif args.keepalive:
if client.send_keepalive():
print("已发送 KEEPLIVE")
else:
print("发送失败")
return 1
elif args.screen:
def progress(cur, total):
print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="")
data = client.fetch_full_screen(on_progress=progress)
if not data:
print("\n拉取显存失败")
return 1
info = client._init_info or {}
w = info.get("width", 160)
h = info.get("height", 160)
img = mono_to_image(data, w, h)
if img:
img.save("screen.png")
print(f"\n已保存 screen.png ({w}x{h})")
else:
with open("screen.raw", "wb") as f:
f.write(data)
print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)")
else:
# 默认:获取初始化并拉取一帧显存
info = client.request_init()
if info:
print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节")
result = client.request_lcdmem(0)
if result:
next_pos, chunk = result
print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}")
else:
print("召唤显存失败")
finally:
client.disconnect()
return 0
if __name__ == "__main__":
exit(main())

216
remo_disp_server.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示 Web 服务 - Flask 实现
用法: python remo_disp_server.py [装置IP]
启动后访问 http://localhost:8181
依赖: pip install flask
"""
import socket
import struct
import time
import sys
import os
import io
from typing import Optional, Tuple
from flask import Flask, request, jsonify, send_file, Response
# 协议常量
TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB
PORT = 7003
CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3
# 全局连接
_sock: Optional[socket.socket] = None
_init_info: Optional[dict] = None
def calc_crc(data: bytes) -> int:
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
if len(raw) < 6 or raw[0] != TAG_DEVICE:
return None
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
if calc_crc(data) != raw[4 + length]:
return None
return (raw[1], data)
def connect(host: str, port: int = PORT) -> bool:
global _sock, _init_info
try:
if _sock:
_sock.close()
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_sock.settimeout(5.0)
_sock.connect((host, port))
_sock.settimeout(2.0)
_init_info = _request_init()
return True
except Exception:
return False
def _send(cmd: int, data: bytes = b"") -> bool:
if not _sock:
return False
try:
_sock.sendall(build_frame(cmd, data))
return True
except Exception:
return False
def _recv() -> Optional[Tuple[int, bytes]]:
if not _sock:
return None
try:
header = _sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = _sock.recv(length + 1)
if len(rest) < length + 1:
return None
return parse_frame(header + rest)
except Exception:
return None
def _request_init() -> Optional[dict]:
if not _send(CMD_INIT):
return None
for _ in range(30):
result = _recv()
if result and result[0] == CMD_INIT:
data = result[1]
if len(data) >= 29:
return {
"width": (data[0] << 8) | data[1],
"height": (data[2] << 8) | data[3],
"mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8],
}
time.sleep(0.05)
return None
def send_key(code: int) -> bool:
return _send(CMD_KEY, bytes([code & 0xFF]))
def fetch_screen() -> Optional[bytes]:
global _init_info
info = _init_info or _request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
if not _send(CMD_LCDMEM, struct.pack(">I", pos)):
return None
for _ in range(40):
result = _recv()
if result and result[0] == CMD_LCDMEM:
payload = result[1]
if len(payload) >= 4:
start = struct.unpack(">I", payload[:4])[0]
chunk = payload[4:]
buffer[start : start + len(chunk)] = chunk
pos = start + len(chunk)
break
time.sleep(0.02)
else:
return None
return bytes(buffer)
def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]:
try:
from PIL import Image
img = Image.new("1", (width, height))
pix = img.load()
for y in range(height):
for x in range(width):
bi = (y * width + x) // 8
bit = 7 - (x % 8)
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
except ImportError:
return None
app = Flask(__name__)
@app.route("/")
@app.route("/index.html")
def index():
return send_file(
os.path.join(os.path.dirname(__file__), "remo_disp_ui.html"),
mimetype="text/html; charset=utf-8",
)
@app.route("/connect")
def api_connect():
host = request.args.get("host", "").strip()
port = int(request.args.get("port", PORT))
ok = connect(host, port)
return jsonify(success=ok, error=None if ok else "connect failed")
@app.route("/key")
def api_key():
code = int(request.args.get("code", 0))
send_key(code)
return jsonify(ok=True)
@app.route("/screen")
def api_screen():
data = fetch_screen()
if not data:
return "", 500
info = _init_info or {}
w, h = info.get("width", 160), info.get("height", 160)
png = mono_to_png(data, w, h)
if png:
return Response(png, mimetype="image/png")
resp = Response(data, mimetype="application/octet-stream")
resp.headers["X-Width"] = str(w)
resp.headers["X-Height"] = str(h)
return resp
def main():
port = 8181
print(f"远程显示服务: http://localhost:{port}")
print("在浏览器中打开上述地址,点击「连接」输入装置 IP")
if len(sys.argv) >= 2:
connect(sys.argv[1])
print(f"已预连接: {sys.argv[1]}")
app.run(host="0.0.0.0", port=port, debug=False, threaded=True)
if __name__ == "__main__":
main()

369
remo_disp_ui.html Normal file
View File

@@ -0,0 +1,369 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>远程显示</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: "Microsoft YaHei", "PingFang SC", sans-serif;
background: linear-gradient(135deg, #e8f4fc 0%, #f0f8ff 50%, #e6f2ff 100%);
min-height: 100vh;
padding: 16px;
}
.app {
max-width: 900px;
margin: 0 auto;
background: #fff;
border-radius: 12px;
box-shadow: 0 4px 20px rgba(0,0,0,0.08);
overflow: hidden;
}
/* 顶部菜单栏 */
.menu-bar {
display: flex;
align-items: center;
justify-content: center;
gap: 8px;
padding: 12px 16px;
background: #fff;
border-bottom: 1px solid #e8e8e8;
}
.menu-item {
display: flex;
flex-direction: column;
align-items: center;
gap: 4px;
padding: 8px 14px;
border: none;
background: transparent;
cursor: pointer;
border-radius: 8px;
transition: all 0.2s;
color: #333;
font-size: 12px;
}
.menu-item:hover {
background: #f5f5f5;
}
.menu-item .icon {
width: 28px;
height: 28px;
display: flex;
align-items: center;
justify-content: center;
font-size: 18px;
}
.menu-item.connect .icon { color: #22c55e; }
.menu-item.settings .icon { color: #3b82f6; }
.menu-item.exit .icon { color: #ef4444; }
.menu-item.about .icon { color: #64748b; }
.menu-item.connected .icon { color: #22c55e; }
.menu-item.connected { color: #22c55e; }
/* 主内容区 */
.main {
display: flex;
padding: 16px;
gap: 20px;
}
/* 显示区域 */
.display-area {
flex: 1;
min-width: 0;
aspect-ratio: 1;
max-height: 420px;
background: #1e3a5f;
border-radius: 8px;
display: flex;
align-items: center;
justify-content: center;
overflow: hidden;
box-shadow: inset 0 2px 8px rgba(0,0,0,0.2);
}
.display-area canvas {
max-width: 100%;
max-height: 100%;
image-rendering: pixelated;
image-rendering: crisp-edges;
}
.display-placeholder {
color: rgba(255,255,255,0.5);
font-size: 14px;
}
/* 右侧控制面板 */
.control-panel {
display: flex;
flex-direction: column;
align-items: center;
gap: 16px;
padding: 8px;
}
/* D-pad */
.dpad {
display: grid;
grid-template-columns: 50px 50px 50px;
grid-template-rows: 50px 50px 50px;
gap: 4px;
}
.dpad .btn {
width: 50px;
height: 50px;
border: none;
border-radius: 8px;
background: linear-gradient(180deg, #f8f9fa 0%, #e9ecef 100%);
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
font-size: 20px;
color: #22c55e;
transition: all 0.15s;
}
.dpad .btn:hover {
background: linear-gradient(180deg, #fff 0%, #f1f3f5 100%);
box-shadow: 0 3px 6px rgba(0,0,0,0.15);
transform: translateY(-1px);
}
.dpad .btn:active {
transform: translateY(0);
box-shadow: 0 1px 2px rgba(0,0,0,0.1);
}
.dpad .btn-up { grid-column: 2; grid-row: 1; }
.dpad .btn-down { grid-column: 2; grid-row: 3; }
.dpad .btn-left { grid-column: 1; grid-row: 2; }
.dpad .btn-right { grid-column: 3; grid-row: 2; }
.dpad .btn-ok { grid-column: 2; grid-row: 2; }
/* 动作按钮 */
.action-btns {
display: flex;
gap: 12px;
}
.action-btn {
display: flex;
align-items: center;
gap: 6px;
padding: 10px 18px;
border: none;
border-radius: 8px;
background: linear-gradient(180deg, #f8f9fa 0%, #e9ecef 100%);
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
cursor: pointer;
font-size: 14px;
color: #333;
transition: all 0.15s;
}
.action-btn:hover {
background: linear-gradient(180deg, #fff 0%, #f1f3f5 100%);
box-shadow: 0 3px 6px rgba(0,0,0,0.15);
transform: translateY(-1px);
}
.action-btn .icon { color: #22c55e; font-size: 16px; }
/* 连接设置弹窗 */
.modal {
display: none;
position: fixed;
inset: 0;
background: rgba(0,0,0,0.4);
align-items: center;
justify-content: center;
z-index: 100;
}
.modal.show { display: flex; }
.modal-content {
background: #fff;
padding: 24px;
border-radius: 12px;
box-shadow: 0 8px 32px rgba(0,0,0,0.2);
min-width: 320px;
}
.modal-content h3 { margin-bottom: 16px; font-size: 16px; }
.modal-content input {
width: 100%;
padding: 10px 12px;
border: 1px solid #ddd;
border-radius: 6px;
margin-bottom: 12px;
font-size: 14px;
}
.modal-content .btn-row {
display: flex;
gap: 8px;
justify-content: flex-end;
margin-top: 16px;
}
.modal-content button {
padding: 8px 16px;
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 14px;
}
.modal-content .btn-primary {
background: #22c55e;
color: #fff;
}
.modal-content .btn-secondary {
background: #e5e7eb;
color: #333;
}
</style>
</head>
<body>
<div class="app">
<div class="menu-bar">
<button class="menu-item connect" id="btnConnect" title="连接">
<span class="icon">📶</span>
<span>连接</span>
</button>
<button class="menu-item settings" title="设置">
<span class="icon"></span>
<span>设置</span>
</button>
<button class="menu-item exit" title="退出">
<span class="icon"></span>
<span>退出</span>
</button>
<button class="menu-item about" id="btnAbout" title="关于">
<span class="icon"></span>
<span>关于</span>
</button>
</div>
<div class="main">
<div class="display-area">
<canvas id="screen" style="display:none;"></canvas>
<span class="display-placeholder" id="placeholder">点击「连接」连接装置</span>
</div>
<div class="control-panel">
<div class="dpad">
<button class="btn btn-up" data-key="U"></button>
<button class="btn btn-left" data-key="L"></button>
<button class="btn btn-ok" data-key="ENT"></button>
<button class="btn btn-right" data-key="R"></button>
<button class="btn btn-down" data-key="D"></button>
</div>
<div class="action-btns">
<button class="action-btn" data-key="RESET">
<span class="icon"></span>
<span>复归</span>
</button>
<button class="action-btn" data-key="ESC">
<span class="icon"></span>
<span>返回</span>
</button>
</div>
</div>
</div>
</div>
<!-- 连接弹窗 -->
<div class="modal" id="connectModal">
<div class="modal-content">
<h3>连接装置</h3>
<input type="text" id="hostInput" placeholder="装置 IP 地址" value="192.168.1.100">
<input type="number" id="portInput" placeholder="端口" value="7003">
<div class="btn-row">
<button class="btn-secondary" id="modalCancel">取消</button>
<button class="btn-primary" id="modalConnect">连接</button>
</div>
</div>
</div>
<script>
const KEY_MAP = { U: 0x02, D: 0x40, L: 0x10, R: 0x08, ENT: 0x20, ESC: 0x01, RESET: 0x04 };
let apiBase = '';
let refreshTimer = null;
const canvas = document.getElementById('screen');
const placeholder = document.getElementById('placeholder');
const connectBtn = document.getElementById('btnConnect');
const connectModal = document.getElementById('connectModal');
const hostInput = document.getElementById('hostInput');
const portInput = document.getElementById('portInput');
const modalConnect = document.getElementById('modalConnect');
const modalCancel = document.getElementById('modalCancel');
function showConnectModal() {
connectModal.classList.add('show');
}
function hideConnectModal() {
connectModal.classList.remove('show');
}
connectBtn.onclick = showConnectModal;
modalCancel.onclick = hideConnectModal;
modalConnect.onclick = async () => {
const host = hostInput.value.trim();
const port = portInput.value || '7003';
try {
const r = await fetch(`/connect?host=${encodeURIComponent(host)}&port=${port}`);
const ok = await r.json();
if (ok.success) {
hideConnectModal();
connectBtn.classList.add('connected');
connectBtn.querySelector('span:last-child').textContent = '已连接';
apiBase = 'ok';
startRefresh();
} else {
alert('连接失败: ' + (ok.error || '未知错误'));
}
} catch (e) {
alert('连接失败,请先启动: python remo_disp_server.py');
}
};
async function sendKey(key) {
if (!connectBtn.classList.contains('connected')) return;
const code = KEY_MAP[key] ?? 0;
await fetch(`/key?code=${code}`);
}
async function fetchScreen() {
if (!connectBtn.classList.contains('connected')) return;
try {
const r = await fetch('/screen');
const ct = r.headers.get('Content-Type') || '';
const blob = await r.blob();
if (ct.includes('image/png')) {
const url = URL.createObjectURL(blob);
const img = new Image();
img.onload = () => {
canvas.width = img.width;
canvas.height = img.height;
const ctx = canvas.getContext('2d');
ctx.drawImage(img, 0, 0);
canvas.style.display = 'block';
placeholder.style.display = 'none';
URL.revokeObjectURL(url);
};
img.src = url;
} else {
const buf = await blob.arrayBuffer();
const data = new Uint8Array(buf);
const w = parseInt(r.headers.get('X-Width') || '160', 10);
const h = parseInt(r.headers.get('X-Height') || '160', 10);
canvas.width = w;
canvas.height = h;
const ctx = canvas.getContext('2d');
const id = ctx.createImageData(w, h);
for (let y = 0; y < h; y++)
for (let x = 0; x < w; x++) {
const bi = (y * w + x) >> 3, bit = 7 - (x & 7);
const v = (bi < data.length && (data[bi] >> bit) & 1) ? 255 : 0;
const i = (y * w + x) * 4;
id.data[i] = id.data[i+1] = id.data[i+2] = v;
id.data[i+3] = 255;
}
ctx.putImageData(id, 0, 0);
canvas.style.display = 'block';
placeholder.style.display = 'none';
}
} catch (e) {}
}
function startRefresh() {
fetchScreen();
if (refreshTimer) clearInterval(refreshTimer);
refreshTimer = setInterval(fetchScreen, 500);
}
document.querySelectorAll('.dpad .btn, .action-btn').forEach(btn => {
btn.onclick = () => sendKey(btn.dataset.key);
});
document.querySelector('.menu-item.exit').onclick = () => sendKey('ESC');
document.getElementById('btnAbout').onclick = () => alert('远程显示工具\n与 RemoDispBus 协议兼容\n端口 7003');
</script>
</body>
</html>

272
remo_disp_viewer.py Normal file
View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示查看器 - 带 GUI 的实时显存查看与按键模拟
用法: python remo_disp_viewer.py <装置IP>
"""
import socket
import struct
import threading
import time
import sys
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Tuple
# 协议常量
TAG_CLIENT = 0xAA
TAG_DEVICE = 0xBB
PORT = 7003
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
KEY_U, KEY_D, KEY_L, KEY_R = 0x02, 0x40, 0x10, 0x08
KEY_ENT, KEY_ESC, KEY_F1, KEY_F2 = 0x20, 0x01, 0x04, 0x80
def calc_crc(data: bytes) -> int:
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
if len(raw) < 6 or raw[0] != TAG_DEVICE:
return None
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
if calc_crc(data) != raw[4 + length]:
return None
return (raw[1], data)
class RemoDispViewer:
def __init__(self, host: str, port: int = PORT):
self.host = host
self.port = port
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
self._running = False
self._refresh_thread: Optional[threading.Thread] = None
def connect(self) -> bool:
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(3.0)
self._sock.connect((self.host, self.port))
self._sock.settimeout(0.5)
return True
except Exception:
return False
def disconnect(self):
self._running = False
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
if not self._sock:
return False
try:
self._sock.sendall(build_frame(cmd, data))
return True
except Exception:
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
return parse_frame(header + rest)
except (socket.timeout, BlockingIOError):
return None
except Exception:
return None
def request_init(self) -> Optional[dict]:
if not self._send(CMD_INIT):
return None
for _ in range(20):
result = self._recv()
if result and result[0] == CMD_INIT:
data = result[1]
if len(data) >= 29:
self._init_info = {
"width": (data[0] << 8) | data[1],
"height": (data[2] << 8) | data[3],
"mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8],
}
return self._init_info
time.sleep(0.05)
return None
def send_key(self, key: int) -> bool:
return self._send(CMD_KEY, bytes([key]))
def fetch_screen(self) -> Optional[bytes]:
info = self._init_info
if not info:
info = self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
if not self._send(CMD_LCDMEM, struct.pack(">I", pos)):
return None
for _ in range(30):
result = self._recv()
if result and result[0] == CMD_LCDMEM:
payload = result[1]
if len(payload) >= 4:
start = struct.unpack(">I", payload[:4])[0]
chunk = payload[4:]
buffer[start : start + len(chunk)] = chunk
pos = start + len(chunk)
break
time.sleep(0.02)
else:
return None
return bytes(buffer)
class ViewerApp:
def __init__(self, host: str):
self.host = host
self.client = RemoDispViewer(host)
self.root = tk.Tk()
self.root.title(f"远程显示 - {host}")
self.root.geometry("400x320")
self._canvas = None
self._photo = None
self._scale = 2
self._build_ui()
def _build_ui(self):
# 连接区
frm = ttk.Frame(self.root, padding=5)
frm.pack(fill=tk.X)
ttk.Label(frm, text=f"目标: {self.host}:7003").pack(side=tk.LEFT)
self._status = ttk.Label(frm, text="未连接")
self._status.pack(side=tk.RIGHT)
# 画布
self._canvas = tk.Canvas(self.root, width=320, height=320, bg="black")
self._canvas.pack(padx=5, pady=5)
# 按键区
btn_frm = ttk.Frame(self.root, padding=5)
btn_frm.pack(fill=tk.X)
keys = [
("", KEY_U), ("", KEY_D), ("", KEY_L), ("", KEY_R),
("确认", KEY_ENT), ("取消", KEY_ESC), ("F1", KEY_F1), ("F2", KEY_F2),
]
for label, key in keys:
ttk.Button(btn_frm, text=label, width=6,
command=lambda k=key: self._on_key(k)).pack(side=tk.LEFT, padx=2)
# 刷新
ttk.Button(btn_frm, text="刷新", command=self._refresh).pack(side=tk.LEFT, padx=2)
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
def _connect(self) -> bool:
if self.client.connect():
if self.client.request_init():
self._status.config(text="已连接")
return True
self.client.disconnect()
self._status.config(text="连接失败")
return False
def _on_key(self, key: int):
if not self.client._sock:
if not self._connect():
messagebox.showerror("错误", "请先连接装置")
return
self.client.send_key(key)
def _refresh(self):
if not self.client._sock:
if not self._connect():
messagebox.showerror("错误", "连接失败")
return
data = self.client.fetch_screen()
if not data:
messagebox.showerror("错误", "拉取显存失败")
return
info = self.client._init_info
w, h = info["width"], info["height"]
# 单色转图像
try:
from PIL import Image, ImageTk
img = Image.new("1", (w, h))
pix = img.load()
for y in range(h):
for x in range(w):
bi = (y * w + x) // 8
bit = 7 - (x % 8)
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0
img = img.resize((w * self._scale, h * self._scale), Image.NEAREST)
self._photo = ImageTk.PhotoImage(img)
self._canvas.delete("all")
self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo)
except ImportError:
# 无 PIL用简单矩形模拟
self._canvas.delete("all")
step = max(1, 320 // w)
for y in range(min(h, 160)):
for x in range(min(w, 160)):
bi = (y * w + x) // 8
bit = 7 - (x % 8)
if bi < len(data) and (data[bi] >> bit) & 1:
self._canvas.create_rectangle(
x * step, y * step, (x + 1) * step, (y + 1) * step,
fill="white", outline=""
)
def _on_close(self):
self.client.disconnect()
self.root.destroy()
def run(self):
self._connect()
self._refresh()
self.root.mainloop()
def main():
if len(sys.argv) < 2:
print("用法: python remo_disp_viewer.py <装置IP>")
sys.exit(1)
app = ViewerApp(sys.argv[1])
app.run()
if __name__ == "__main__":
main()

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
# 远程显示工具依赖
# 命令行工具 remo_disp_client.py 仅需 Python 标准库
# Web 服务 remo_disp_server.py 需要 Flask
# 保存 PNG 需要 Pillow可选
Flask>=2.0.0
Pillow>=9.0.0