Compare commits

...

7 Commits

Author SHA1 Message Date
f7898adc5d 1. 修改了,远程服务端断开就卡死的bug,
2. 修改图像解析逻辑,使其支持DTU-HMI 的图像格式,不再支持原来的图像格式,不能显示原来设备发来的图像
2026-03-12 18:06:07 +08:00
fb2d4ffc00 update 2026-03-03 14:10:16 +08:00
b47962e5c8 增加日志功能,增加打包成exe功能,点击exe就可以使用 2026-03-03 14:07:54 +08:00
9f54a0cb2e 增加了异常信息打印,图像显示增加为原来的两倍大小 2026-03-03 10:24:39 +08:00
999443ebd5 update readme 2026-03-02 20:19:45 +08:00
391fae9f0e 更新 Readme 的内容,删除不必要的文件 2026-03-02 20:16:22 +08:00
7b540e1ed4 可以接收远程图像,并进行操作且测试没有问题 2026-03-02 20:03:25 +08:00
11 changed files with 838 additions and 862 deletions

6
.gitignore vendored
View File

@@ -11,3 +11,9 @@
*.app *.app
.snapshots/* .snapshots/*
# PyInstaller 打包输出
dist/
build/
*.spec
remo_disp.log

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"python-envs.defaultEnvManager": "ms-python.python:conda",
"python-envs.defaultPackageManager": "ms-python.python:conda",
"python-envs.pythonProjects": []
}

181
README.md
View File

@@ -1,66 +1,145 @@
# DTU-RemoteLCD - 远程显示通信工具 # DTU-RemoteLCD - 远程显示通信工具
网络调试的远程显示 LCD 模拟软件。与 `HMI/RemoeDisp/RemoDispBus.c` 协议兼容的 Python 客户端,用于远程查看装置 LCD 显存并模拟按键。 **RemoDispBus** 协议兼容的远程 LCD 显示与按键模拟工具。通过网络连接 DTU 装置,在 Web 浏览器中实时查看 LCD 画面并模拟按键操作,适用于调试、监控等场景
## 功能特性
- **实时屏幕显示**:将装置 LCD 显存以 1bpp 位图形式拉取并渲染为 PNG
- **按键模拟**:支持方向键、确认、返回、复归等按键
- **WebSocket 传输**:采用 Socket.IO服务端主动推送约 100ms 刷新
- **跨平台**:浏览器访问,无需安装客户端
## 环境要求
- Python 3.7+
- 与 DTU 装置网络互通(默认端口 7003
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 启动服务
```bash
python remo_disp_server.py [装置IP]
```
启动后访问 **http://localhost:8181**
### 3. 使用说明
- 点击「连接」→ 输入装置 IP 和端口(默认 7003→ 连接
- 左侧显示装置 LCD 画面(约 100ms 刷新)
- 右侧使用方向键、确认、复归、返回进行按键操作
- 点击「断开」断开与装置的连接
**可选预连接**:启动时传入 IP 可预连接装置,例如:
```bash
python remo_disp_server.py 192.168.253.3
```
## 打包为 exeWindows
在项目目录下执行:
```bash
# 安装 PyInstaller 后打包
pip install pyinstaller
python -m PyInstaller remo_disp_server.spec --noconfirm
```
或直接双击运行 `build_exe.bat`
打包完成后可执行文件为 **dist\\remo_disp_server.exe**。双击运行,或在命令行:
```bash
dist\remo_disp_server.exe
dist\remo_disp_server.exe 192.168.253.3
```
运行后浏览器访问 http://localhost:8181。日志会输出到控制台同目录下会生成 `remo_disp.log`
## 项目结构
```
DTU-RemoteLCD/
├── remo_disp_server.py # Web 服务Flask + Socket.IO
├── remo_disp_server.spec # PyInstaller 打包配置
├── build_exe.bat # 一键打包脚本Windows
├── remo_disp_ui.html # Web 前端页面
├── requirements.txt # Python 依赖
├── static/ # 静态资源(可选)
│ └── favicon.ico # 网站图标
└── README.md
```
## 协议说明 ## 协议说明
- **端口**: 7003 | 项目 | 说明 |
- **报文格式**: |------|------|
- 工具 → 装置: `[0xAA][功能码][长度高][长度低][数据...][CRC]` | **端口** | 7003 |
- 装置 → 工具: `[0xBB][功能码][长度高][长度低][数据...][CRC]` | **工具 → 装置** | `[0xAA][功能码][长度高][长度低][数据...][CRC]` |
- **CRC**: 数据区异或校验 | **装置 → 工具** | `[0xBB][功能码][长度高][长度低][数据...][CRC]` |
- **功能码**: KEEPLIVE=0, INIT=1, KEY=2, LCDMEM=3 | **CRC** | 数据区逐字节异或,取低 8 位 |
## 命令行工具 (remo_disp_client.py) ```
刷新屏幕请求
```bash AA 03 00 04 00 00 00 00 00
# 安装可选依赖(用于保存 PNG
pip install -r requirements.txt
# 获取初始化信息
python remo_disp_client.py 192.168.1.100 --init
# 拉取显存并保存为 screen.png
python remo_disp_client.py 192.168.1.100 --screen
# 发送按键 (U/D/L/R/ENT/ESC/F1/F2)
python remo_disp_client.py 192.168.1.100 --key ENT
# 发送 KEEPLIVE
python remo_disp_client.py 192.168.1.100 --keepalive
``` ```
## GUI 查看器 (remo_disp_viewer.py)
```bash
python remo_disp_viewer.py 192.168.1.100
```
- 连接装置并实时显示 LCD 画面 **功能码**
- 点击按键模拟远程按键
- 点击「刷新」重新拉取显存
## Web 界面 (remo_disp_server.py + remo_disp_ui.html) | 码 | 名称 | 说明 |
|----|------|------|
```bash | 0 | KEEPLIVE | 保活 |
pip install flask | 1 | INIT | 初始化,获取屏幕宽高、显存大小 |
python remo_disp_server.py | 2 | KEY | 按键 |
``` | 3 | LCDMEM | 读取显存(屏幕画面) |
启动后访问 http://localhost:8080 ,界面包含:
- 顶部菜单:连接、设置、退出、关于
- 左侧显示区:装置 LCD 画面(约 500ms 刷新)
- 右侧控制:方向键 + 确认、复归、返回
## 按键映射 ## 按键映射
| 键名 | | | 键名 | 协议码 | 说明 |
|------|--------|------|
| U | 0x02 | 上 |
| D | 0x40 | 下 |
| L | 0x10 | 左 |
| R | 0x08 | 右 |
| ENT | 0x20 | 确认 |
| ESC | 0x01 | 返回/取消 |
| RESET | 0x04 | 复归 |
| F2 | 0x80 | F2 |
## 依赖说明
| 依赖 | 用途 |
|------|------| |------|------|
| 上 | 0x02 | | Flask | Web 框架 |
| 下 | 0x40 | | flask-socketio | WebSocket 支持 |
| 左 | 0x10 | | python-socketio | Socket.IO 服务端 |
| 右 | 0x08 | | Pillow | 1bpp 位图转 PNG |
| 确认 | 0x20 | | loguru | 结构化日志、错误追踪 |
| 取消 | 0x01 |
| F1 | 0x04 | ## 日志与调试
| F2 | 0x80 |
- 后端使用 `loguru` 输出日志,默认会打印到控制台,并写入当前目录下的 `remo_disp.log`
-`remo_disp_server.py` 顶部有 `LOG_LEVEL` 常量:
- `LOG_LEVEL = "INFO"`:只输出 INFO/WARNING/ERROR 级别日志(默认)。
- `LOG_LEVEL = "DEBUG"`:输出包括 DEBUG 在内的详细调试信息。
- exe 版本(`remo_disp_server.exe`)同样会在运行目录生成 `remo_disp.log`,方便线下排查问题。
## 配置说明
- **Web 服务端口**:默认 8181`remo_disp_server.py``main()` 中修改
- **装置端口**:默认 7003连接时可输入自定义端口
- **Favicon**:将 `favicon.ico` 放入 `static/` 目录即可显示网站图标
## 许可证
本项目仅供内部调试使用。

17
build_exe.bat Normal file
View File

@@ -0,0 +1,17 @@
@echo off
chcp 65001 >nul
echo 正在打包 DTU-RemoteLCD 为 exe ...
echo.
rem 使用当前环境中的 python 和 pip而不是系统默认 pip
python -m pip install pyinstaller -q
python -m PyInstaller remo_disp_server.spec --noconfirm
if %ERRORLEVEL% equ 0 (
echo.
echo 打包完成。可执行文件: dist\remo_disp_server.exe
echo 直接双击运行,或在命令行: dist\remo_disp_server.exe [装置IP]
) else (
echo 打包失败,请检查错误信息。
exit /b 1
)

View File

@@ -1,322 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端
协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致):
- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC]
- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC]
- CRC: 数据区异或校验(不含 Tag、功能码、长度
- 端口: 7003
"""
import socket
import struct
import threading
import time
import argparse
from typing import Optional, Tuple, Callable
# 协议常量(与 RemoDispBus.h 一致)
TAG_CLIENT = 0xAA # 工具侧发送
TAG_DEVICE = 0xBB # 装置侧发送
PORT = 7003
# 功能码
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
CMD_ACK = 4
CMD_NCK = 5
CMD_IDLE = 0xFF
# 按键值(与 RemoDispBus.h 一致)
KEY_U = 0x02 # 上
KEY_D = 0x40 # 下
KEY_L = 0x10 # 左
KEY_R = 0x08 # 右
KEY_ENT = 0x20 # 确认
KEY_ESC = 0x01 # 取消
KEY_F1 = 0x04
KEY_F2 = 0x80
KEY_NONE = 0
def calc_crc(data: bytes) -> int:
"""异或校验,与 RDisp_CalCrc 一致"""
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
"""组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]"""
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
crc = calc_crc(data)
return header + data + bytes([crc])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]"""
if len(raw) < 6:
return None
if raw[0] != TAG_DEVICE:
return None
cmd = raw[1]
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
crc_recv = raw[4 + length]
if calc_crc(data) != crc_recv:
return None
return (cmd, data)
class RemoDispClient:
"""远程显示 TCP 客户端"""
def __init__(self, host: str, port: int = PORT, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
def connect(self) -> bool:
"""连接装置"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
"""发送报文"""
if not self._sock:
return False
frame = build_frame(cmd, data)
try:
self._sock.sendall(frame)
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
"""接收并解析一帧"""
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
raw = header + rest
return parse_frame(raw)
except socket.timeout:
return None
except Exception as e:
print(f"接收失败: {e}")
return None
def request_init(self) -> Optional[dict]:
"""召唤初始化,解析 LCD 尺寸、显存大小、按键映射"""
if not self._send(CMD_INIT):
return None
result = self._recv()
if not result or result[0] != CMD_INIT:
return None
data = result[1]
if len(data) < 29:
return None
# 与 RDisp_Form_InitInf 格式一致
width = (data[0] << 8) | data[1]
height = (data[2] << 8) | data[3]
lcd_type = data[4]
mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8]
rgb_red = struct.unpack(">I", data[9:13])[0]
rgb_green = struct.unpack(">I", data[13:17])[0]
rgb_blue = struct.unpack(">I", data[17:21])[0]
keys = list(data[21:29])
self._init_info = {
"width": width,
"height": height,
"lcd_type": lcd_type,
"mem_size": mem_size,
"rgb_red": rgb_red,
"rgb_green": rgb_green,
"rgb_blue": rgb_blue,
"keys": keys,
}
return self._init_info
def send_key(self, key_value: int) -> bool:
"""下发按键"""
return self._send(CMD_KEY, bytes([key_value & 0xFF]))
def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]:
"""召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)"""
data = struct.pack(">I", start_pos)
if not self._send(CMD_LCDMEM, data):
return None
result = self._recv()
if not result or result[0] != CMD_LCDMEM:
return None
payload = result[1]
if len(payload) < 4:
return None
pos = struct.unpack(">I", payload[:4])[0]
mem_data = payload[4:]
return (pos + len(mem_data), mem_data)
def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]:
"""拉取完整显存,支持断点续传"""
info = self._init_info or self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
result = self.request_lcdmem(pos)
if not result:
return None
next_pos, chunk = result
buffer[pos : pos + len(chunk)] = chunk
pos = next_pos
if on_progress:
on_progress(pos, mem_size)
return bytes(buffer)
def send_keepalive(self) -> bool:
"""发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)"""
return self._send(CMD_KEEPLIVE)
def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]":
"""单色显存转 PIL Image1bit -> 黑白图)"""
try:
from PIL import Image
except ImportError:
return None
img = Image.new("1", (width, height))
pix = img.load()
for y in range(height):
for x in range(width):
byte_idx = (y * width + x) // 8
bit_idx = 7 - (x % 8)
if byte_idx < len(data):
pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0
else:
pix[x, y] = 0
return img
def main():
parser = argparse.ArgumentParser(description="远程显示通信工具")
parser.add_argument("host", help="装置 IP 地址")
parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})")
parser.add_argument("--init", action="store_true", help="仅获取初始化信息")
parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png")
parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2")
parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE")
args = parser.parse_args()
key_map = {
"U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R,
"ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2,
}
client = RemoDispClient(args.host, args.port)
if not client.connect():
return 1
try:
if args.init:
info = client.request_init()
if info:
print("初始化信息:")
print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}")
print(f" 显存大小: {info['mem_size']} 字节")
print(f" 按键映射: {info['keys']}")
else:
print("获取初始化失败")
return 1
elif args.key:
k = key_map.get(args.key.upper(), KEY_NONE)
if k == KEY_NONE:
print(f"未知按键: {args.key}")
return 1
if client.send_key(k):
print(f"已发送按键: {args.key}")
else:
print("发送按键失败")
return 1
elif args.keepalive:
if client.send_keepalive():
print("已发送 KEEPLIVE")
else:
print("发送失败")
return 1
elif args.screen:
def progress(cur, total):
print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="")
data = client.fetch_full_screen(on_progress=progress)
if not data:
print("\n拉取显存失败")
return 1
info = client._init_info or {}
w = info.get("width", 160)
h = info.get("height", 160)
img = mono_to_image(data, w, h)
if img:
img.save("screen.png")
print(f"\n已保存 screen.png ({w}x{h})")
else:
with open("screen.raw", "wb") as f:
f.write(data)
print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)")
else:
# 默认:获取初始化并拉取一帧显存
info = client.request_init()
if info:
print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节")
result = client.request_lcdmem(0)
if result:
next_pos, chunk = result
print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}")
else:
print("召唤显存失败")
finally:
client.disconnect()
return 0
if __name__ == "__main__":
exit(main())

View File

@@ -1,215 +1,503 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
远程显示 Web 服务 - Flask 实现 远程显示 Web 服务 - Flask + WebSocket 实现
通过 RemoDispBus 协议与 DTU 装置通信,将远程 LCD 画面以 Web 页面形式展示。
前端通过 WebSocket 连接,服务端主动推送屏幕数据,实现低延迟传输。
用法: python remo_disp_server.py [装置IP] 用法: python remo_disp_server.py [装置IP]
启动后访问 http://localhost:8181 启动后访问 http://localhost:8181
依赖: pip install flask 依赖: pip install flask flask-socketio pillow loguru
日志级别: 在代码中设置 LOG_LEVEL见下方"INFO" 不显示 DEBUG"DEBUG" 显示调试信息。
""" """
# base64将二进制 PNG 编码为字符串,便于通过 JSON/WebSocket 传输
import base64
# socket与 DTU 装置的 TCP 通信
import socket import socket
# struct将整数打包为二进制如起始地址 4 字节)
import struct import struct
import time # threading后台线程持续拉取屏幕并推送
import threading
# sys读取命令行参数装置 IP
import sys import sys
# os获取当前脚本目录、路径与文件检查
import os import os
# ioBytesIO 用于在内存中保存 PNG
import io import io
from typing import Optional, Tuple
from flask import Flask, request, jsonify, send_file, Response
# 协议常量 def _get_base_path() -> str:
"""打包成 exe 时资源在 sys._MEIPASS否则为脚本所在目录。"""
if getattr(sys, "frozen", False):
return sys._MEIPASS # type: ignore[attr-defined]
return os.path.dirname(os.path.abspath(__file__))
# typing类型注解
from typing import Optional, Tuple, Set
# loguru结构化日志记录错误与调试信息
from loguru import logger
# FlaskWeb 框架,提供 HTTP 路由
from flask import Flask, request, send_file
# SocketIOWebSocket 支持,实现实时双向通信
from flask_socketio import SocketIO, emit
# =============================================================================
# 协议常量RemoDispBus
# =============================================================================
# TAG_CLIENT 0xAA客户端发送帧的帧头标识
# TAG_DEVICE 0xBB设备回复帧的帧头标识
TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB TAG_CLIENT, TAG_DEVICE = 0xAA, 0xBB
# PORT 7003RemoDispBus 协议默认端口
PORT = 7003 PORT = 7003
# CMD_KEEPLIVE 0保活
# CMD_INIT 1初始化获取屏幕宽高、显存大小
# CMD_KEY 2按键
# CMD_LCDMEM 3读取显存屏幕画面
CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3 CMD_KEEPLIVE, CMD_INIT, CMD_KEY, CMD_LCDMEM = 0, 1, 2, 3
# 全局连接 # =============================================================================
_sock: Optional[socket.socket] = None # 日志级别(在代码中直接修改)
_init_info: Optional[dict] = None # =============================================================================
# "INFO":仅输出 INFO/WARNING/ERROR不显示 DEBUG
# "DEBUG":输出包括 DEBUG 在内的全部日志
LOG_LEVEL = "INFO"
# =============================================================================
# 全局状态
# =============================================================================
# _sock与 DTU 装置的 TCP socketNone 表示未连接
_sock: Optional[socket.socket] = None
# _screen_clients当前需要接收屏幕推送的 WebSocket 客户端 sid 集合
_screen_clients: Set[str] = set()
# _refresh_thread后台刷新线程持续拉取屏幕并推送
_refresh_thread: Optional[threading.Thread] = None
# _refresh_stopEventset 时刷新线程退出
_refresh_stop = threading.Event()
# =============================================================================
# 协议帧编解码
# =============================================================================
def calc_crc(data: bytes) -> int: def calc_crc(data: bytes) -> int:
crc = 0 """
对数据区逐字节异或,取低 8 位作为 CRC 校验码。
协议规定CRC = data[0] ^ data[1] ^ ... ^ data[n-1],取低 8 位。
"""
crc = 0 # 初始值为 0
for b in data: for b in data:
crc ^= b crc ^= b # 逐字节异或
return crc & 0xFF return crc & 0xFF # 取低 8 位作为最终 CRC
def build_frame(cmd: int, data: bytes) -> bytes: def build_frame(cmd: int, data: bytes) -> bytes:
length = len(data) """
构造一帧发送数据,格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data][crc(1)]
length 大端 2 字节crc 对 data 计算。
"""
length = len(data) # 数据区长度
# 构造帧头TAG_CLIENT + cmd + 长度高字节 + 长度低字节(大端序)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)]) return header + data + bytes([calc_crc(data)]) # 帧头 + 数据 + CRC
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]: def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
if len(raw) < 6 or raw[0] != TAG_DEVICE: """
解析设备回复帧,返回 (cmd, data) 或 None。
帧格式: [TAG(1)][cmd(1)][len_hi(1)][len_lo(1)][data(length)][crc(1)]
最短合法帧长度为 5 字节(长度为 0 时: 1+1+2+0+1
"""
if len(raw) < 5 or raw[0] != TAG_DEVICE: # 至少 5 字节且帧头为 TAG_DEVICE
logger.error(
"[parse_frame] 帧太短或帧头不是 TAG_DEVICE(0xBB), len={len_}",
len_=len(raw),
)
return None return None
length = (raw[2] << 8) | raw[3] length = (raw[2] << 8) | raw[3] # 大端序解析数据区长度
if len(raw) < 5 + length + 1: if len(raw) < 4 + length + 1: # 检查是否收齐整帧头4字节+数据+CRC1字节
logger.error(
"[parse_frame] 数据不完整, 需要 {need} 字节, 实际 {actual} 字节",
need=4 + length + 1,
actual=len(raw),
)
return None return None
data = raw[4:4 + length] data = raw[4:4 + length] # 提取数据区
if calc_crc(data) != raw[4 + length]: if calc_crc(data) != raw[4 + length]: # CRC 校验失败
logger.error("[parse_frame] CRC 校验失败")
return None return None
return (raw[1], data) cmd = raw[1]
logger.debug("[parse_frame] 成功解析帧 cmd={cmd:#x}, length={length}", cmd=cmd, length=length)
return (cmd, data) # 返回 (命令码, 数据)
# =============================================================================
# 连接与收发
# =============================================================================
def connect(host: str, port: int = PORT) -> bool: def connect(host: str, port: int = PORT) -> bool:
global _sock, _init_info """
建立与 DTU 装置的 TCP 连接。
若有旧连接则先关闭;连接时超时 3 秒;连接成功后设为 2 秒。
"""
global _sock
try: try:
if _sock: if _sock:
_sock.close() _sock.close() # 关闭旧连接
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logger.info("[connect] 已关闭旧连接")
_sock.settimeout(5.0) _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 TCP socket
_sock.connect((host, port)) _sock.settimeout(3.0) # 连接阶段超时 3 秒
_sock.settimeout(2.0) _sock.connect((host, port)) # 连接目标主机和端口
_init_info = _request_init() _sock.settimeout(2.0) # 连接成功后收发超时 2 秒
logger.info("[connect] 连接 DTU 成功: {host}:{port}", host=host, port=port)
return True return True
except Exception: except Exception as e:
logger.exception("[connect] 连接失败: {host}:{port} - {err}", host=host, port=port, err=e)
return False return False
def _send(cmd: int, data: bytes = b"") -> bool: def _send(cmd: int, data: bytes = b"") -> bool:
"""发送一帧到设备"""
if not _sock: if not _sock:
logger.error("[_send] 未连接设备,无法发送 cmd={cmd:#x}", cmd=cmd)
return False return False
try: try:
_sock.sendall(build_frame(cmd, data)) frame = build_frame(cmd, data)
_sock.sendall(frame) # 构造帧并一次性发送
logger.debug("[_send] 已发送 cmd={cmd:#x}, data_len={length}", cmd=cmd, length=len(data))
return True return True
except Exception: except Exception as e:
logger.exception("[_send] 发送失败 cmd={cmd:#x} - {err}", cmd=cmd, err=e)
return False return False
def _recv() -> Optional[Tuple[int, bytes]]: def _recv() -> Optional[Tuple[int, bytes]]:
"""
从设备接收一帧。若首次接收长度不足,会再尝试接收最多 2 次(共 3 次),
拼齐整帧后再解析。单次 recv 最多 256KB。
"""
if not _sock: if not _sock:
logger.error("[_recv] 未连接设备,无法接收数据")
return None return None
try: try:
header = _sock.recv(5) buffer = b""
if len(header) < 5: max_attempts = 3 # 首次 + 最多再收 2 次
for attempt in range(max_attempts):
chunk = _sock.recv(256 * 1024)
if not chunk:
logger.debug("[_recv] 第 {n} 次 recv 收到空数据,连接可能已关闭", n=attempt + 1)
break
buffer += chunk
if len(buffer) < 5: # 至少需要 5 字节才能解析头
logger.debug("[_recv] 第 {n} 次 recv 后共 {len_} 字节,继续接收", n=attempt + 1, len_=len(buffer))
continue
data_len = (buffer[2] << 8) | buffer[3] # 数据区长度
need = 4 + data_len + 1 # 整帧长度
if len(buffer) >= need:
logger.debug("[_recv] 第 {n} 次 recv 后收齐整帧 len={len_}", n=attempt + 1, len_=need)
return parse_frame(buffer[:need])
logger.debug(
"[_recv] 第 {n} 次 recv 后共 {have} 字节,需要 {need} 字节,继续接收",
n=attempt + 1,
have=len(buffer),
need=need,
)
if len(buffer) < 5:
logger.error("[_recv] 接收数据太短, len={len_}", len_=len(buffer))
return None return None
length = (header[2] << 8) | header[3] need = 4 + ((buffer[2] << 8) | buffer[3]) + 1
rest = _sock.recv(length + 1) logger.error(
if len(rest) < length + 1: "[_recv] 尝试 {max_attempts} 次后数据仍不完整, 需要 {need} 字节, 实际 {actual} 字节",
return None max_attempts=max_attempts,
return parse_frame(header + rest) need=need,
except Exception: actual=len(buffer),
)
return None return None
except Exception as e:
logger.exception("[_recv] 接收异常 - {err}", err=e)
def _request_init() -> Optional[dict]:
if not _send(CMD_INIT):
return None return None
for _ in range(30):
result = _recv()
if result and result[0] == CMD_INIT:
data = result[1]
if len(data) >= 29:
return {
"width": (data[0] << 8) | data[1],
"height": (data[2] << 8) | data[3],
"mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8],
}
time.sleep(0.05)
return None
def send_key(code: int) -> bool: def send_key(code: int) -> bool:
return _send(CMD_KEY, bytes([code & 0xFF])) """发送按键码到设备"""
return _send(CMD_KEY, bytes([code & 0xFF])) # 按键码单字节
def fetch_screen() -> Optional[bytes]: def fetch_screen() -> Optional[bytes]:
global _init_info """
info = _init_info or _request_init() 向设备请求当前屏幕显存,返回 1bpp 位图数据。
if not info: 发送 CMD_LCDMEM + 起始地址 0设备回复 payload 前 4 字节为地址,后续为位图,返回 payload[4:]。
"""
if not _sock:
logger.error("[fetch_screen] 未连接设备,无法读取屏幕")
return None return None
mem_size = info["mem_size"] if not _send(CMD_LCDMEM, struct.pack(">I", 0)): # 发送读取显存命令,起始地址 0大端 4 字节)
buffer = bytearray(mem_size) logger.error("[fetch_screen] 发送 CMD_LCDMEM 失败")
pos = 0 return None
while pos < mem_size: logger.debug("[fetch_screen] 发送 CMD_LCDMEM 成功")
if not _send(CMD_LCDMEM, struct.pack(">I", pos)): result = _recv() # 接收设备回复
return None if not result:
for _ in range(40): logger.error("[fetch_screen] 未收到任何回复result=None")
result = _recv() return None
if result and result[0] == CMD_LCDMEM: cmd, payload = result
payload = result[1] if cmd != CMD_LCDMEM:
if len(payload) >= 4: # 可能是按键 ACK、保活等其他短帧按键操作时较常见这里只做调试日志不视为错误
start = struct.unpack(">I", payload[:4])[0] logger.debug("[fetch_screen] 收到非 LCDMEM 帧 cmd={cmd:#x},忽略", cmd=cmd)
chunk = payload[4:] return None
buffer[start : start + len(chunk)] = chunk # 确认为 LCDMEM 回复
pos = start + len(chunk) if len(payload) >= 4: # 前 4 字节为地址,后面是位图
break logger.debug("[fetch_screen] 收到 LCDMEMpayload_len={len_}", len_=len(payload))
time.sleep(0.02) return payload[4:]
else: logger.debug("[fetch_screen] 收到 LCDMEMpayload_len={len_} (<4)", len_=len(payload))
return None return payload or None
return bytes(buffer)
def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]: def mono_to_png(data: bytes, width: int, height: int) -> Optional[bytes]:
"""
将 1bpp 位图转为 PNG 字节流。
每字节 8 像素高位在前bi=(y*w+x)//8 为字节索引bit=7-(x%8) 为位索引1 为白 2550 为黑 0。
"""
try: try:
from PIL import Image from PIL import Image
img = Image.new("1", (width, height)) logger.debug(
pix = img.load() "[mono_to_png] 开始转换为 PNGwidth={width}, height={height}, data_len={len_}",
width=width,
height=height,
len_=len(data),
)
img = Image.new("1", (width, height)) # 创建 1bpp 黑白图像
pix = img.load() # 获取像素访问对象
'''
for y in range(height): for y in range(height):
for x in range(width): for x in range(width):
bi = (y * width + x) // 8 bi = (y * width + x) // 8 # 字节索引:每 8 像素一字节
bit = 7 - (x % 8) bit = 7 - (x % 8) # 位索引:高位在前
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0 pix[x, y] = 0 if (bi < len(data) and (data[bi] >> bit) & 1) else 255 # 1→白 0→黑
buf = io.BytesIO() '''
img.save(buf, format="PNG") for y in range(height):
return buf.getvalue() for x in range(width):
pix[x, y] = data[y * width + x]
buf = io.BytesIO() # 内存缓冲区
img.save(buf, format="PNG") # 保存为 PNG 格式
png_bytes = buf.getvalue()
logger.debug("[mono_to_png] 转换 PNG 成功, png_len={len_}", len_=len(png_bytes))
return png_bytes # 返回 PNG 字节流
except ImportError: except ImportError:
logger.error("[mono_to_png] 缺少 PIL/Pillow, 请执行 pip install pillow")
return None
except Exception as e:
logger.exception("[mono_to_png] 转换 PNG 失败 - {err}", err=e)
return None return None
app = Flask(__name__) def disconnect_device():
"""关闭与设备的连接"""
global _sock
try:
if _sock:
logger.info("[disconnect_device] 正在关闭与 DTU 的连接")
_sock.close() # 关闭 socket
_sock = None # 清空引用
except Exception as e:
logger.exception("[disconnect_device] 关闭连接时异常 - {err}", err=e)
# =============================================================================
# 屏幕推送线程
# =============================================================================
def _screen_refresh_loop():
"""
后台线程:每约 100ms 拉取一帧屏幕,转 PNG 后 base64 推送给各 WebSocket 客户端。
若检测到远程服务端断开(连续多次拉取失败),则断开设备并停止重复请求。
_refresh_stop.wait(0.1) 既作间隔,也便于收到 set 时快速退出。
"""
global _screen_clients, _refresh_stop
logger.info("[_screen_refresh_loop] 刷新线程启动")
# 连续拉取失败次数,超过阈值则认为远程已断开
MAX_CONSECUTIVE_FAILURES = 5
consecutive_failures = 0
try:
while not _refresh_stop.is_set(): # 未被要求停止时持续循环
if _screen_clients and _sock: # 有客户端且已连接设备
data = fetch_screen() # 拉取屏幕位图
logger.debug("[fetch_screen] 拉取屏幕位图数据: {data}", data=data)
if data:
consecutive_failures = 0 # 成功则清零
# 当前协议未返回宽高,这里固定为 160x160可根据实际情况调整
w, h = 160, 160
png = mono_to_png(data, w, h) # 转为 PNG
if png:
b64 = base64.b64encode(png).decode("ascii") # base64 编码便于 JSON 传输
for sid in list(_screen_clients): # 复制列表避免迭代时修改
try:
socketio.emit(
"screen",
{"png": b64, "w": w, "h": h},
room=sid,
) # 推送给该客户端
except Exception as e:
logger.exception(
"[_screen_refresh_loop] 推送 screen 给 {sid} 失败 - {err}",
sid=sid,
err=e,
)
else:
# 拉取失败,累计连续失败次数
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
logger.warning(
"[_screen_refresh_loop] 连续 {n} 次拉取失败,判定远程服务端已断开,停止请求并断开连接",
n=consecutive_failures,
)
disconnect_device()
# 通知所有订阅客户端:设备已断开
for sid in list(_screen_clients):
try:
socketio.emit("device_disconnected", {"reason": "remote_closed"}, room=sid)
except Exception as e:
logger.debug("[_screen_refresh_loop] 推送 device_disconnected 失败 sid={sid} - {err}", sid=sid, err=e)
consecutive_failures = 0
else:
consecutive_failures = 0 # 无客户端或未连接时清零,便于下次连接后重新计数
_refresh_stop.wait(timeout=0.1) # 等待 100ms若被 set 则立即返回
finally:
logger.info("[_screen_refresh_loop] 刷新线程退出")
# =============================================================================
# Flask + SocketIO
# =============================================================================
app = Flask(__name__) # 创建 Flask 应用
app.config["SECRET_KEY"] = "remo_disp" # 会话密钥
socketio = SocketIO(app, cors_allowed_origins="*") # 创建 SocketIO允许跨域
FAVICON_PATH = os.path.join(_get_base_path(), "static", "favicon.ico") # favicon 文件路径
@app.route("/favicon.ico")
def favicon():
"""提供网站图标"""
if os.path.exists(FAVICON_PATH):
return send_file(FAVICON_PATH, mimetype="image/x-icon")
return "", 404 # 不存在则返回 404
@app.route("/") @app.route("/")
@app.route("/index.html") @app.route("/index.html")
def index(): def index():
"""主页,返回前端 HTML"""
return send_file( return send_file(
os.path.join(os.path.dirname(__file__), "remo_disp_ui.html"), os.path.join(_get_base_path(), "remo_disp_ui.html"),
mimetype="text/html; charset=utf-8", mimetype="text/html; charset=utf-8",
) )
@app.route("/connect") @socketio.on("connect")
def api_connect(): def on_connect():
host = request.args.get("host", "").strip() """WebSocket 客户端连接时触发(可留空)"""
port = int(request.args.get("port", PORT)) logger.info("[on_connect] WebSocket 客户端已连接 sid={sid}", sid=request.sid)
ok = connect(host, port)
return jsonify(success=ok, error=None if ok else "connect failed")
@app.route("/key") @socketio.on("disconnect")
def api_key(): def on_disconnect():
code = int(request.args.get("code", 0)) """WebSocket 客户端断开(关标签页等)时,从 _screen_clients 移除,若无客户端则断开设备"""
send_key(code) global _screen_clients, _refresh_thread, _refresh_stop
return jsonify(ok=True) sid = request.sid # 获取断开客户端的 session id
logger.info("[on_disconnect] WebSocket 客户端断开 sid={sid}", sid=sid)
if sid in _screen_clients:
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
@app.route("/screen") @socketio.on("connect_device")
def api_screen(): def on_connect_device(data):
data = fetch_screen() """前端请求连接设备:连接成功后加入 _screen_clients启动刷新线程回复 connect_result"""
if not data: global _screen_clients, _refresh_thread, _refresh_stop
return "", 500 host = (data.get("host") or "").strip() # 从 data 取 host去空格
info = _init_info or {} port = int(data.get("port") or PORT) # 从 data 取 port缺省 7003
w, h = info.get("width", 160), info.get("height", 160) logger.info("[on_connect_device] 请求连接 DTU: {host}:{port}", host=host, port=port)
png = mono_to_png(data, w, h) ok = connect(host, port) # 建立 TCP 连接
if png: if ok:
return Response(png, mimetype="image/png") _screen_clients.add(request.sid) # 将当前客户端加入屏幕订阅
resp = Response(data, mimetype="application/octet-stream") _refresh_stop.clear() # 清除停止标志,允许刷新线程运行
resp.headers["X-Width"] = str(w) if _refresh_thread is None or not _refresh_thread.is_alive(): # 刷新线程未运行
resp.headers["X-Height"] = str(h) _refresh_thread = threading.Thread(target=_screen_refresh_loop, daemon=True) # 创建后台线程
return resp _refresh_thread.start() # 启动线程
logger.info("[on_connect_device] 已启动屏幕刷新线程")
emit("connect_result", {"success": True}) # 回复连接成功
logger.info("[on_connect_device] 连接 DTU 成功,已加入订阅 sid={sid}", sid=request.sid)
else:
logger.error("[on_connect_device] 连接 DTU 失败: {host}:{port}", host=host, port=port)
emit("connect_result", {"success": False, "error": "connect failed"}) # 回复连接失败
@socketio.on("disconnect_device")
def on_disconnect_device():
"""前端请求断开设备:从 _screen_clients 移除,若无客户端则断开设备,回复 disconnect_result"""
global _screen_clients, _refresh_stop
sid = request.sid
logger.info("[on_disconnect_device] 前端请求断开设备 sid={sid}", sid=sid)
_screen_clients.discard(sid) # 从订阅列表移除
if not _screen_clients: # 若无其他客户端
_refresh_stop.set() # 通知刷新线程停止
disconnect_device() # 断开设备连接
emit("disconnect_result", {"ok": True}) # 回复断开成功
logger.info("[on_disconnect_device] 设备已断开(若为最后一个客户端)")
@socketio.on("key")
def on_key(data):
"""前端发送按键:从 data 取 code转发给设备"""
code = int(data.get("code", 0)) # 按键码,缺省 0
logger.debug("[on_key] 收到按键 code={code:#x}", code=code)
send_key(code) # 发送给 DTU 设备
# =============================================================================
# 启动
# =============================================================================
def _configure_log_level():
"""根据全局 LOG_LEVEL 配置日志级别(见文件顶部 LOG_LEVEL 常量)。"""
level = LOG_LEVEL.upper() if isinstance(LOG_LEVEL, str) else "INFO"
if level not in ("TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"):
level = "INFO"
logger.remove() # 移除默认的 stderr 输出
logger.add(sys.stderr, level=level, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>")
logger.add(
"remo_disp.log",
rotation="1 week",
encoding="utf-8",
enqueue=True,
backtrace=True,
diagnose=False,
level=level,
)
return level
@logger.catch
def main(): def main():
port = 8181 """主入口:启动 Web 服务"""
print(f"远程显示服务: http://localhost:{port}") port = 8181 # HTTP 服务端口
print("在浏览器中打开上述地址,点击「连接」输入装置 IP") log_level = _configure_log_level()
logger.info("日志级别: {level}", level=log_level)
logger.info("远程显示服务: http://localhost:{port}", port=port)
logger.info("使用 WebSocket 传输,在浏览器中打开上述地址")
if len(sys.argv) >= 2: if len(sys.argv) >= 2:
connect(sys.argv[1]) host = sys.argv[1]
print(f"已预连接: {sys.argv[1]}") logger.info("[main] 尝试预连接 DTU: {host}", host=host)
app.run(host="0.0.0.0", port=port, debug=False, threaded=True) if connect(host): # 若命令行有 IP预连接
logger.info("[main] 已预连接: {host}", host=host)
else:
logger.error("[main] 预连接失败: {host}", host=host)
socketio.run(app, host="0.0.0.0", port=port, debug=True, allow_unsafe_werkzeug=True) # 监听所有网卡
if __name__ == "__main__": if __name__ == "__main__":

63
remo_disp_server.spec Normal file
View File

@@ -0,0 +1,63 @@
# -*- mode: python ; coding: utf-8 -*-
# PyInstaller 打包配置python -m PyInstaller remo_disp_server.spec
# 打包后运行 dist\remo_disp_server\remo_disp_server.exe [装置IP]
block_cipher = None
# 需要随 exe 一起打包的数据文件HTML、静态资源
added_files = [
('remo_disp_ui.html', '.'),
]
# 若存在 static 目录则打包favicon 等)
import os as _os
if _os.path.isdir('static'):
added_files.append(('static', 'static'))
a = Analysis(
['remo_disp_server.py'],
pathex=[],
binaries=[],
datas=added_files,
hiddenimports=[
'engineio.async_drivers.threading',
'flask_socketio',
'python_engineio',
'python_socketio',
'werkzeug',
'PIL',
'PIL._tkinter_finder',
'loguru',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='remo_disp_server',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View File

@@ -1,51 +1,83 @@
<!DOCTYPE html> <!DOCTYPE html>
<!-- 文档类型声明HTML5 -->
<html lang="zh-CN"> <html lang="zh-CN">
<!-- 根元素,语言设为简体中文 -->
<head> <head>
<!-- 头部:元数据与资源 -->
<meta charset="UTF-8"> <meta charset="UTF-8">
<!-- 字符编码UTF-8 -->
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- 视口:适配移动端,初始缩放 1 -->
<title>远程显示</title> <title>远程显示</title>
<!-- 页面标题 -->
<link rel="icon" type="image/x-icon" href="/favicon.ico">
<!-- 网站图标 -->
<script src="https://cdn.socket.io/4.7.2/socket.io.min.js"></script>
<!-- Socket.IO 客户端库,用于 WebSocket 通信 -->
<style> <style>
/* ========== 全局样式 ========== */
* { box-sizing: border-box; margin: 0; padding: 0; } * { box-sizing: border-box; margin: 0; padding: 0; }
/* 通配符:盒模型为 border-box去除默认边距 */
body { body {
font-family: "Microsoft YaHei", "PingFang SC", sans-serif; font-family: "Microsoft YaHei", "PingFang SC", sans-serif;
/* 字体:微软雅黑、苹方,无衬线回退 */
background: linear-gradient(135deg, #e8f4fc 0%, #f0f8ff 50%, #e6f2ff 100%); background: linear-gradient(135deg, #e8f4fc 0%, #f0f8ff 50%, #e6f2ff 100%);
/* 背景135 度渐变,浅蓝到淡蓝 */
min-height: 100vh; min-height: 100vh;
/* 最小高度为视口高度 */
padding: 16px; padding: 16px;
/* 内边距 */
} }
/* ========== 主容器 ========== */
.app { .app {
max-width: 900px; max-width: 900px;
/* 最大宽度限制 */
margin: 0 auto; margin: 0 auto;
/* 水平居中 */
background: #fff; background: #fff;
/* 白色背景 */
border-radius: 12px; border-radius: 12px;
/* 圆角 */
box-shadow: 0 4px 20px rgba(0,0,0,0.08); box-shadow: 0 4px 20px rgba(0,0,0,0.08);
/* 阴影 */
overflow: hidden; overflow: hidden;
/* 子元素超出则裁剪 */
} }
/* 顶部菜单栏 */ /* ========== 顶部菜单栏 ========== */
.menu-bar { .menu-bar {
display: flex; display: flex;
/* 弹性布局 */
align-items: center; align-items: center;
/* 垂直居中 */
justify-content: center; justify-content: center;
/* 水平居中 */
gap: 8px; gap: 8px;
/* 子元素间距 */
padding: 12px 16px; padding: 12px 16px;
background: #fff; background: #fff;
border-bottom: 1px solid #e8e8e8; border-bottom: 1px solid #e8e8e8;
/* 底部分割线 */
} }
.menu-item { .menu-item {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
/* 纵向排列图标和文字 */
align-items: center; align-items: center;
gap: 4px; gap: 4px;
padding: 8px 14px; padding: 8px 14px;
border: none; border: none;
background: transparent; background: transparent;
cursor: pointer; cursor: pointer;
/* 鼠标指针为手型 */
border-radius: 8px; border-radius: 8px;
transition: all 0.2s; transition: all 0.2s;
/* 过渡动画 */
color: #333; color: #333;
font-size: 12px; font-size: 12px;
} }
.menu-item:hover { .menu-item:hover {
background: #f5f5f5; background: #f5f5f5;
/* 悬停时浅灰背景 */
} }
.menu-item .icon { .menu-item .icon {
width: 28px; width: 28px;
@@ -54,57 +86,77 @@
align-items: center; align-items: center;
justify-content: center; justify-content: center;
font-size: 18px; font-size: 18px;
/* 图标区域 */
} }
.menu-item.connect .icon { color: #22c55e; } .menu-item.connect .icon { color: #22c55e; }
/* 连接按钮图标:绿色 */
.menu-item.settings .icon { color: #3b82f6; } .menu-item.settings .icon { color: #3b82f6; }
/* 设置按钮图标:蓝色 */
.menu-item.exit .icon { color: #ef4444; } .menu-item.exit .icon { color: #ef4444; }
/* 断开按钮图标:红色 */
.menu-item.about .icon { color: #64748b; } .menu-item.about .icon { color: #64748b; }
/* 关于按钮图标:灰色 */
.menu-item.connected .icon { color: #22c55e; } .menu-item.connected .icon { color: #22c55e; }
.menu-item.connected { color: #22c55e; } .menu-item.connected { color: #22c55e; }
/* 主内容区 */ /* 已连接状态:绿色 */
/* ========== 主内容区 ========== */
.main { .main {
display: flex; display: flex;
padding: 16px; padding: 16px;
gap: 20px; gap: 20px;
/* 左右分栏间距 */
} }
/* 显示区域 */ /* ========== 显示区域 ========== */
.display-area { .display-area {
flex: 1; flex: 1;
/* 占据剩余空间 */
min-width: 0; min-width: 0;
/* 允许 flex 子项收缩 */
aspect-ratio: 1; aspect-ratio: 1;
/* 宽高比 1:1 */
max-height: 420px; max-height: 420px;
background: #1e3a5f; background: #1e3a5f;
/* 深蓝背景,模拟 LCD 边框 */
border-radius: 8px; border-radius: 8px;
display: flex; display: flex;
align-items: center; align-items: center;
justify-content: center; justify-content: center;
overflow: hidden; overflow: hidden;
box-shadow: inset 0 2px 8px rgba(0,0,0,0.2); box-shadow: inset 0 2px 8px rgba(0,0,0,0.2);
/* 内阴影 */
} }
.display-area canvas { .display-area canvas {
max-width: 100%; max-width: 100%;
max-height: 100%; max-height: 100%;
/* canvas 自适应容器 */
image-rendering: pixelated; image-rendering: pixelated;
image-rendering: crisp-edges; image-rendering: crisp-edges;
/* 像素风格渲染,避免模糊 */
} }
.display-placeholder { .display-placeholder {
color: rgba(255,255,255,0.5); color: rgba(255,255,255,0.5);
/* 半透明白色提示文字 */
font-size: 14px; font-size: 14px;
} }
/* 右侧控制面板 */ /* ========== 右侧控制面板 ========== */
.control-panel { .control-panel {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
justify-content: center;
/* 垂直方向居中,与左侧显示区对齐 */
gap: 16px; gap: 16px;
padding: 8px; padding: 8px;
align-self: stretch;
/* 与 .main 同高,便于内容居中 */
} }
/* D-pad */ /* ========== D-pad 方向键 ========== */
.dpad { .dpad {
display: grid; display: grid;
grid-template-columns: 50px 50px 50px; grid-template-columns: 50px 50px 50px;
grid-template-rows: 50px 50px 50px; grid-template-rows: 50px 50px 50px;
gap: 4px; gap: 4px;
/* 3x3 网格布局 */
} }
.dpad .btn { .dpad .btn {
width: 50px; width: 50px;
@@ -112,6 +164,7 @@
border: none; border: none;
border-radius: 8px; border-radius: 8px;
background: linear-gradient(180deg, #f8f9fa 0%, #e9ecef 100%); background: linear-gradient(180deg, #f8f9fa 0%, #e9ecef 100%);
/* 渐变背景 */
box-shadow: 0 2px 4px rgba(0,0,0,0.1); box-shadow: 0 2px 4px rgba(0,0,0,0.1);
cursor: pointer; cursor: pointer;
display: flex; display: flex;
@@ -125,17 +178,24 @@
background: linear-gradient(180deg, #fff 0%, #f1f3f5 100%); background: linear-gradient(180deg, #fff 0%, #f1f3f5 100%);
box-shadow: 0 3px 6px rgba(0,0,0,0.15); box-shadow: 0 3px 6px rgba(0,0,0,0.15);
transform: translateY(-1px); transform: translateY(-1px);
/* 悬停上浮效果 */
} }
.dpad .btn:active { .dpad .btn:active {
transform: translateY(0); transform: translateY(0);
box-shadow: 0 1px 2px rgba(0,0,0,0.1); box-shadow: 0 1px 2px rgba(0,0,0,0.1);
/* 按下时恢复 */
} }
.dpad .btn-up { grid-column: 2; grid-row: 1; } .dpad .btn-up { grid-column: 2; grid-row: 1; }
/* 上键:中上 */
.dpad .btn-down { grid-column: 2; grid-row: 3; } .dpad .btn-down { grid-column: 2; grid-row: 3; }
/* 下键:中下 */
.dpad .btn-left { grid-column: 1; grid-row: 2; } .dpad .btn-left { grid-column: 1; grid-row: 2; }
/* 左键:左中 */
.dpad .btn-right { grid-column: 3; grid-row: 2; } .dpad .btn-right { grid-column: 3; grid-row: 2; }
/* 右键:右中 */
.dpad .btn-ok { grid-column: 2; grid-row: 2; } .dpad .btn-ok { grid-column: 2; grid-row: 2; }
/* 动作按钮 */ /* 确认键:正中 */
/* ========== 动作按钮 ========== */
.action-btns { .action-btns {
display: flex; display: flex;
gap: 12px; gap: 12px;
@@ -160,17 +220,21 @@
transform: translateY(-1px); transform: translateY(-1px);
} }
.action-btn .icon { color: #22c55e; font-size: 16px; } .action-btn .icon { color: #22c55e; font-size: 16px; }
/* 连接设置弹窗 */ /* ========== 连接设置弹窗 ========== */
.modal { .modal {
display: none; display: none;
/* 默认隐藏 */
position: fixed; position: fixed;
inset: 0; inset: 0;
/* 全屏覆盖 */
background: rgba(0,0,0,0.4); background: rgba(0,0,0,0.4);
/* 半透明黑色遮罩 */
align-items: center; align-items: center;
justify-content: center; justify-content: center;
z-index: 100; z-index: 100;
} }
.modal.show { display: flex; } .modal.show { display: flex; }
/* 显示时用 flex 居中 */
.modal-content { .modal-content {
background: #fff; background: #fff;
padding: 24px; padding: 24px;
@@ -203,16 +267,21 @@
.modal-content .btn-primary { .modal-content .btn-primary {
background: #22c55e; background: #22c55e;
color: #fff; color: #fff;
/* 主按钮:绿色 */
} }
.modal-content .btn-secondary { .modal-content .btn-secondary {
background: #e5e7eb; background: #e5e7eb;
color: #333; color: #333;
/* 次要按钮:灰色 */
} }
</style> </style>
</head> </head>
<body> <body>
<!-- 页面主体 -->
<div class="app"> <div class="app">
<!-- 主应用容器 -->
<div class="menu-bar"> <div class="menu-bar">
<!-- 顶部菜单栏 -->
<button class="menu-item connect" id="btnConnect" title="连接"> <button class="menu-item connect" id="btnConnect" title="连接">
<span class="icon">📶</span> <span class="icon">📶</span>
<span>连接</span> <span>连接</span>
@@ -221,9 +290,9 @@
<span class="icon"></span> <span class="icon"></span>
<span>设置</span> <span>设置</span>
</button> </button>
<button class="menu-item exit" title="退出"> <button class="menu-item exit" title="断开">
<span class="icon"></span> <span class="icon"></span>
<span>退出</span> <span>断开</span>
</button> </button>
<button class="menu-item about" id="btnAbout" title="关于"> <button class="menu-item about" id="btnAbout" title="关于">
<span class="icon"></span> <span class="icon"></span>
@@ -231,12 +300,18 @@
</button> </button>
</div> </div>
<div class="main"> <div class="main">
<!-- 主内容区:显示 + 控制 -->
<div class="display-area"> <div class="display-area">
<!-- 屏幕显示区域 -->
<canvas id="screen" style="display:none;"></canvas> <canvas id="screen" style="display:none;"></canvas>
<!-- canvas 初始隐藏,有画面后显示 -->
<span class="display-placeholder" id="placeholder">点击「连接」连接装置</span> <span class="display-placeholder" id="placeholder">点击「连接」连接装置</span>
<!-- 未连接时的提示文字 -->
</div> </div>
<div class="control-panel"> <div class="control-panel">
<!-- 右侧控制面板 -->
<div class="dpad"> <div class="dpad">
<!-- 方向键 + 确认 -->
<button class="btn btn-up" data-key="U"></button> <button class="btn btn-up" data-key="U"></button>
<button class="btn btn-left" data-key="L"></button> <button class="btn btn-left" data-key="L"></button>
<button class="btn btn-ok" data-key="ENT"></button> <button class="btn btn-ok" data-key="ENT"></button>
@@ -244,6 +319,7 @@
<button class="btn btn-down" data-key="D"></button> <button class="btn btn-down" data-key="D"></button>
</div> </div>
<div class="action-btns"> <div class="action-btns">
<!-- 复归、返回按钮 -->
<button class="action-btn" data-key="RESET"> <button class="action-btn" data-key="RESET">
<span class="icon"></span> <span class="icon"></span>
<span>复归</span> <span>复归</span>
@@ -260,7 +336,7 @@
<div class="modal" id="connectModal"> <div class="modal" id="connectModal">
<div class="modal-content"> <div class="modal-content">
<h3>连接装置</h3> <h3>连接装置</h3>
<input type="text" id="hostInput" placeholder="装置 IP 地址" value="192.168.1.100"> <input type="text" id="hostInput" placeholder="装置 IP 地址" value="127.0.0.1">
<input type="number" id="portInput" placeholder="端口" value="7003"> <input type="number" id="portInput" placeholder="端口" value="7003">
<div class="btn-row"> <div class="btn-row">
<button class="btn-secondary" id="modalCancel">取消</button> <button class="btn-secondary" id="modalCancel">取消</button>
@@ -269,101 +345,137 @@
</div> </div>
</div> </div>
<script> <script>
/**
* 前端逻辑:通过 Socket.IO 与后端 WebSocket 通信。
* - connect_device: 连接装置,成功后后端持续推送 screen 事件
* - disconnect_device: 断开装置
* - key: 发送按键码
*/
// ========== 常量与 DOM 引用 ==========
// 按键名 -> RemoDispBus 协议码U/D/L/R 方向ENT 确认ESC 返回RESET 复归
const KEY_MAP = { U: 0x02, D: 0x40, L: 0x10, R: 0x08, ENT: 0x20, ESC: 0x01, RESET: 0x04 }; const KEY_MAP = { U: 0x02, D: 0x40, L: 0x10, R: 0x08, ENT: 0x20, ESC: 0x01, RESET: 0x04 };
let apiBase = '';
let refreshTimer = null;
const canvas = document.getElementById('screen'); const canvas = document.getElementById('screen');
// canvas 元素,用于绘制远程屏幕
const placeholder = document.getElementById('placeholder'); const placeholder = document.getElementById('placeholder');
// 占位提示文字
const connectBtn = document.getElementById('btnConnect'); const connectBtn = document.getElementById('btnConnect');
// 连接按钮
const connectModal = document.getElementById('connectModal'); const connectModal = document.getElementById('connectModal');
// 连接弹窗
const hostInput = document.getElementById('hostInput'); const hostInput = document.getElementById('hostInput');
// IP 输入框
const portInput = document.getElementById('portInput'); const portInput = document.getElementById('portInput');
// 端口输入框
const modalConnect = document.getElementById('modalConnect'); const modalConnect = document.getElementById('modalConnect');
// 弹窗内「连接」按钮
const modalCancel = document.getElementById('modalCancel'); const modalCancel = document.getElementById('modalCancel');
function showConnectModal() { // 弹窗内「取消」按钮
connectModal.classList.add('show');
} // ========== WebSocket 连接 ==========
function hideConnectModal() { const socket = io();
connectModal.classList.remove('show'); // 连接当前页面的 origin建立 Socket.IO 连接
}
connectBtn.onclick = showConnectModal; socket.on('connect', () => {});
modalCancel.onclick = hideConnectModal; // 连接成功时(可留空)
modalConnect.onclick = async () => { socket.on('connect_result', (data) => {
const host = hostInput.value.trim(); // 收到后端连接结果
const port = portInput.value || '7003'; if (data.success) {
try { hideConnectModal();
const r = await fetch(`/connect?host=${encodeURIComponent(host)}&port=${port}`); // 关闭弹窗
const ok = await r.json(); connectBtn.classList.add('connected');
if (ok.success) { // 标记为已连接状态
hideConnectModal(); connectBtn.querySelector('span:last-child').textContent = '已连接';
connectBtn.classList.add('connected'); // 更新按钮文字
connectBtn.querySelector('span:last-child').textContent = '已连接'; } else {
apiBase = 'ok'; alert('连接失败: ' + (data.error || '未知错误'));
startRefresh(); // 弹出错误提示
} else {
alert('连接失败: ' + (ok.error || '未知错误'));
}
} catch (e) {
alert('连接失败,请先启动: python remo_disp_server.py');
} }
};
async function sendKey(key) {
if (!connectBtn.classList.contains('connected')) return;
const code = KEY_MAP[key] ?? 0;
await fetch(`/key?code=${code}`);
}
async function fetchScreen() {
if (!connectBtn.classList.contains('connected')) return;
try {
const r = await fetch('/screen');
const ct = r.headers.get('Content-Type') || '';
const blob = await r.blob();
if (ct.includes('image/png')) {
const url = URL.createObjectURL(blob);
const img = new Image();
img.onload = () => {
canvas.width = img.width;
canvas.height = img.height;
const ctx = canvas.getContext('2d');
ctx.drawImage(img, 0, 0);
canvas.style.display = 'block';
placeholder.style.display = 'none';
URL.revokeObjectURL(url);
};
img.src = url;
} else {
const buf = await blob.arrayBuffer();
const data = new Uint8Array(buf);
const w = parseInt(r.headers.get('X-Width') || '160', 10);
const h = parseInt(r.headers.get('X-Height') || '160', 10);
canvas.width = w;
canvas.height = h;
const ctx = canvas.getContext('2d');
const id = ctx.createImageData(w, h);
for (let y = 0; y < h; y++)
for (let x = 0; x < w; x++) {
const bi = (y * w + x) >> 3, bit = 7 - (x & 7);
const v = (bi < data.length && (data[bi] >> bit) & 1) ? 255 : 0;
const i = (y * w + x) * 4;
id.data[i] = id.data[i+1] = id.data[i+2] = v;
id.data[i+3] = 255;
}
ctx.putImageData(id, 0, 0);
canvas.style.display = 'block';
placeholder.style.display = 'none';
}
} catch (e) {}
}
function startRefresh() {
fetchScreen();
if (refreshTimer) clearInterval(refreshTimer);
refreshTimer = setInterval(fetchScreen, 500);
}
document.querySelectorAll('.dpad .btn, .action-btn').forEach(btn => {
btn.onclick = () => sendKey(btn.dataset.key);
}); });
document.querySelector('.menu-item.exit').onclick = () => sendKey('ESC'); // 服务端推送屏幕data.png 为 base64 编码的 PNG解码后绘制到 canvas
document.getElementById('btnAbout').onclick = () => alert('远程显示工具\n与 RemoDispBus 协议兼容\n端口 7003'); socket.on('screen', (data) => {
if (!data.png) return;
// 无 base64 数据则跳过
const binary = atob(data.png);
// base64 解码为二进制字符串
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);
// 转为 Uint8Array
const blob = new Blob([bytes], { type: 'image/png' });
// 创建 PNG Blob
const url = URL.createObjectURL(blob);
// 创建临时 URL
const img = new Image();
img.onload = () => {
// 图片加载完成后放大一倍显示160x160 -> 320x320
const scale = 2;
canvas.width = img.width * scale;
canvas.height = img.height * scale;
const ctx = canvas.getContext('2d');
ctx.imageSmoothingEnabled = false;
// 关闭平滑,像素清晰放大
ctx.drawImage(img, 0, 0, img.width, img.height, 0, 0, canvas.width, canvas.height);
// 绘制到 canvas2 倍尺寸)
canvas.style.display = 'block';
placeholder.style.display = 'none';
// 显示 canvas隐藏占位符
URL.revokeObjectURL(url);
// 释放临时 URL
};
img.src = url;
// 触发加载
});
socket.on('disconnect_result', () => {});
// 收到断开结果(可留空)
function showConnectModal() { connectModal.classList.add('show'); }
// 显示连接弹窗
function hideConnectModal() { connectModal.classList.remove('show'); }
// 隐藏连接弹窗
connectBtn.onclick = showConnectModal;
// 点击连接按钮 -> 显示弹窗
modalCancel.onclick = hideConnectModal;
// 点击取消 -> 隐藏弹窗
modalConnect.onclick = () => {
// 点击弹窗内「连接」按钮
const host = hostInput.value.trim();
// 获取 IP去空格
const port = portInput.value || '7003';
// 获取端口,缺省 7003
socket.emit('connect_device', { host, port });
// 向后端发送连接请求
};
function sendKey(key) {
// 发送按键到后端
if (!connectBtn.classList.contains('connected')) return;
// 未连接则不发送
socket.emit('key', { code: KEY_MAP[key] ?? 0 });
// 根据按键名查协议码并发送
}
function doDisconnect() {
// 执行断开
socket.emit('disconnect_device');
// 通知后端断开设备
connectBtn.classList.remove('connected');
connectBtn.querySelector('span:last-child').textContent = '连接';
// 恢复按钮状态和文字
canvas.style.display = 'none';
placeholder.style.display = '';
placeholder.textContent = '点击「连接」连接装置';
// 隐藏 canvas显示占位符
}
document.querySelectorAll('.dpad .btn, .action-btn').forEach(btn => {
// 为所有方向键和动作按钮绑定点击
btn.onclick = () => sendKey(btn.dataset.key);
// 点击时发送对应按键码
});
document.querySelector('.menu-item.exit').onclick = doDisconnect;
// 断开按钮 -> 执行断开
document.getElementById('btnAbout').onclick = () => alert('阜阳师范大学物理与电子工程学院\nDTU 远程液晶屏幕通信工具');
// 关于按钮 -> 弹出说明
</script> </script>
</body> </body>
</html> </html>

View File

@@ -1,272 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示查看器 - 带 GUI 的实时显存查看与按键模拟
用法: python remo_disp_viewer.py <装置IP>
"""
import socket
import struct
import threading
import time
import sys
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Tuple
# 协议常量
TAG_CLIENT = 0xAA
TAG_DEVICE = 0xBB
PORT = 7003
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
KEY_U, KEY_D, KEY_L, KEY_R = 0x02, 0x40, 0x10, 0x08
KEY_ENT, KEY_ESC, KEY_F1, KEY_F2 = 0x20, 0x01, 0x04, 0x80
def calc_crc(data: bytes) -> int:
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
return header + data + bytes([calc_crc(data)])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
if len(raw) < 6 or raw[0] != TAG_DEVICE:
return None
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
if calc_crc(data) != raw[4 + length]:
return None
return (raw[1], data)
class RemoDispViewer:
def __init__(self, host: str, port: int = PORT):
self.host = host
self.port = port
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
self._running = False
self._refresh_thread: Optional[threading.Thread] = None
def connect(self) -> bool:
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(3.0)
self._sock.connect((self.host, self.port))
self._sock.settimeout(0.5)
return True
except Exception:
return False
def disconnect(self):
self._running = False
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
if not self._sock:
return False
try:
self._sock.sendall(build_frame(cmd, data))
return True
except Exception:
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
return parse_frame(header + rest)
except (socket.timeout, BlockingIOError):
return None
except Exception:
return None
def request_init(self) -> Optional[dict]:
if not self._send(CMD_INIT):
return None
for _ in range(20):
result = self._recv()
if result and result[0] == CMD_INIT:
data = result[1]
if len(data) >= 29:
self._init_info = {
"width": (data[0] << 8) | data[1],
"height": (data[2] << 8) | data[3],
"mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8],
}
return self._init_info
time.sleep(0.05)
return None
def send_key(self, key: int) -> bool:
return self._send(CMD_KEY, bytes([key]))
def fetch_screen(self) -> Optional[bytes]:
info = self._init_info
if not info:
info = self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
if not self._send(CMD_LCDMEM, struct.pack(">I", pos)):
return None
for _ in range(30):
result = self._recv()
if result and result[0] == CMD_LCDMEM:
payload = result[1]
if len(payload) >= 4:
start = struct.unpack(">I", payload[:4])[0]
chunk = payload[4:]
buffer[start : start + len(chunk)] = chunk
pos = start + len(chunk)
break
time.sleep(0.02)
else:
return None
return bytes(buffer)
class ViewerApp:
def __init__(self, host: str):
self.host = host
self.client = RemoDispViewer(host)
self.root = tk.Tk()
self.root.title(f"远程显示 - {host}")
self.root.geometry("400x320")
self._canvas = None
self._photo = None
self._scale = 2
self._build_ui()
def _build_ui(self):
# 连接区
frm = ttk.Frame(self.root, padding=5)
frm.pack(fill=tk.X)
ttk.Label(frm, text=f"目标: {self.host}:7003").pack(side=tk.LEFT)
self._status = ttk.Label(frm, text="未连接")
self._status.pack(side=tk.RIGHT)
# 画布
self._canvas = tk.Canvas(self.root, width=320, height=320, bg="black")
self._canvas.pack(padx=5, pady=5)
# 按键区
btn_frm = ttk.Frame(self.root, padding=5)
btn_frm.pack(fill=tk.X)
keys = [
("", KEY_U), ("", KEY_D), ("", KEY_L), ("", KEY_R),
("确认", KEY_ENT), ("取消", KEY_ESC), ("F1", KEY_F1), ("F2", KEY_F2),
]
for label, key in keys:
ttk.Button(btn_frm, text=label, width=6,
command=lambda k=key: self._on_key(k)).pack(side=tk.LEFT, padx=2)
# 刷新
ttk.Button(btn_frm, text="刷新", command=self._refresh).pack(side=tk.LEFT, padx=2)
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
def _connect(self) -> bool:
if self.client.connect():
if self.client.request_init():
self._status.config(text="已连接")
return True
self.client.disconnect()
self._status.config(text="连接失败")
return False
def _on_key(self, key: int):
if not self.client._sock:
if not self._connect():
messagebox.showerror("错误", "请先连接装置")
return
self.client.send_key(key)
def _refresh(self):
if not self.client._sock:
if not self._connect():
messagebox.showerror("错误", "连接失败")
return
data = self.client.fetch_screen()
if not data:
messagebox.showerror("错误", "拉取显存失败")
return
info = self.client._init_info
w, h = info["width"], info["height"]
# 单色转图像
try:
from PIL import Image, ImageTk
img = Image.new("1", (w, h))
pix = img.load()
for y in range(h):
for x in range(w):
bi = (y * w + x) // 8
bit = 7 - (x % 8)
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0
img = img.resize((w * self._scale, h * self._scale), Image.NEAREST)
self._photo = ImageTk.PhotoImage(img)
self._canvas.delete("all")
self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo)
except ImportError:
# 无 PIL用简单矩形模拟
self._canvas.delete("all")
step = max(1, 320 // w)
for y in range(min(h, 160)):
for x in range(min(w, 160)):
bi = (y * w + x) // 8
bit = 7 - (x % 8)
if bi < len(data) and (data[bi] >> bit) & 1:
self._canvas.create_rectangle(
x * step, y * step, (x + 1) * step, (y + 1) * step,
fill="white", outline=""
)
def _on_close(self):
self.client.disconnect()
self.root.destroy()
def run(self):
self._connect()
self._refresh()
self.root.mainloop()
def main():
if len(sys.argv) < 2:
print("用法: python remo_disp_viewer.py <装置IP>")
sys.exit(1)
app = ViewerApp(sys.argv[1])
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,6 @@
# 远程显示工具依赖 # Dependencies for DTU-RemoteLCD web service
# 命令行工具 remo_disp_client.py 仅需 Python 标准库
# Web 服务 remo_disp_server.py 需要 Flask
# 保存 PNG 需要 Pillow可选
Flask>=2.0.0 Flask>=2.0.0
Pillow>=9.0.0 Pillow>=9.0.0
flask-socketio>=5.0.0
python-socketio>=5.0.0
loguru>=0.7.0

BIN
static/favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 MiB