2025-12-07-杂谈-复杂异构系统监控与可视化项目设计

First Post:

Last Update:

Page View: loading...

复杂异构系统监控与可视化项目设计

问题背景:

现在需要设计一个系统,使用 py 的 fastapi 作为后端

目前有一个使用 Docker Compose 编排的复杂容器化环境,基于 SeedEmu(SEED Internet Emulator)框架构建。该框架专用于模拟大规模互联网基础设施,常用于网络安全研究、教育和实验,特别是区块链网络的安全性测试。
此 docker-compose.yml 定义了一个模拟的互联网环境,其中部署了一个完整的 Ethereum Proof-of-Stake (PoS) 区块链网络,分布在多个自治系统(Autonomous Systems, AS)中,并通过互联网交换点(Internet Exchange Points, IXP)和路由器实现互联。该环境的主要目的是模拟真实互联网拓扑下的区块链网络行为,支持研究区块链在复杂网络环境下的安全性、性能、攻击与防御(如 Eclipse 攻击、分区攻击、路由攻击等),并提供可视化监控、数据采集和分析工具。

主要组件与功能有:数据库与辅助服务,包括用于存储区块链监控数据的 postgresql(数据库名为 ethereum_monitor),作为缓存或消息队列的 redis,以及用于存储和分析网络拓扑、区块链节点关系等复杂关系数据的图数据库 neo4j。

Ethereum 区块链网络在 AS 101–112(共 12 个自治系统)中部署了大量 Ethereum PoS 节点。每个 AS 内部包含 3 个本地网络(inet0、inet1、inet2),每个网络内有 3 个 Ethereum 节点(共 9 个节点/AS)。总计约 108 个 Ethereum validator/miner 节点(节点 ID 从 2 到 108),加上一个 BootNode 和 BeaconSetup 节点。所有节点运行在自定义的链上(chain_id: 1337, chain_name: posCurrentEnhancedNet)。部分节点(如 AS101 的 host0)暴露了 JSON-RPC (8545)、WebSocket (8546) 和 Web 界面 (8000) 端口,便于外部交互。

网络路由基础设施基于 SeedEmu,AS 2 作为一个骨干/中转 AS,包含四个边界路由器(r51–r54),通过点对点链路(net_2_net_51_52 等)连接。IXP(互联网交换点)包括四个全球 IXP(ix51–ix54),每个有 Route Server(路由服务器),用于多边对等互联。AS 21–24 作为 IXP 的参与者(peering AS),每个连接一个 IXP。AS 101–112 每个 AS 有一个边界路由器连接到对应的 IXP(例如 AS101 连接 ix51),实现与外部互联网的连通。所有路由器运行真实路由协议(如 BGP),支持模拟路由攻击、劫持等。

可视化与监控工具包括运行 SeedEmu 的互联网拓扑可视化界面的 seedemu-internet-client,映射端口 8080,提供整个网络拓扑的图形化视图,以及运行 Ethereum 网络专用可视化界面的 seedemu-ether-client,映射端口 5000,用于查看区块链节点状态、同步情况、交易等。eth_node_cleaner 是自定义服务,暴露端口 8888,可能用于中央数据收集、节点状态清理或监控指标聚合,连接 PostgreSQL、Redis 和 Neo4j。

其他特性包括大量自定义网络(local 和 global 类型),精确分配 IP 地址段,节点标签丰富(org.seedsecuritylabs.seedemu.meta.*),便于 SeedEmu 工具识别和渲染,部分服务使用 privileged 模式和 cap_add: ALL,以支持模拟路由所需的网络权限。

现在需要实现一系列功能,提供基于 FastAPI 框架的 RESTful API 路由模块(topology_router),专用于提供区块链仿真环境(特别是结合 SeedEmu 和 Ethereum PoS 网络)的完整拓扑数据访问接口如下:GET /overview 获取整个仿真环境的拓扑概览信息(如节点总数、层级结构等);GET /statistics 获取拓扑统计数据(如节点、链路数量等汇总指标);GET /health 检查拓扑服务的健康状态,返回组件运行状况。

GET /ethereum 获取完整的以太坊网络拓扑数据(节点与 P2P 连接);GET /ethereum/nodes 获取所有以太坊节点列表,支持按层级过滤(execution 或 consensus);GET /ethereum/nodes/{node_id}获取指定以太坊节点(执行层或共识层)的详细信息;GET /ethereum/validators/{validator_id}根据验证者公钥获取单个验证者节点的详细状态和信息。

GET /physical 获取纯物理拓扑结构(不包含容器运行时监控数据);GET /physical/devices 获取物理设备列表(路由器、主机等),支持按设备类型过滤;GET /physical/links 获取物理链路(网络连接)列表,支持按连接类型过滤;GET /physical/networks 获取所有物理网络的配置信息(网络 ID、名称、子网、网关等)。

GET /contract 获取智能合约相关的拓扑视图(合约部署、调用关系等);GET /contract/statistics 获取合约层面的统计信息(如合约数量、调用频率等)。

GET /transaction 获取交易拓扑数据,支持通过时间范围(start_time 和 end_time)过滤;GET /transaction/statistics 获取交易层面的统计信息;GET /transaction/address/{address}/analysis 分析特定以太坊地址的资金/交易流向(流入流出关系图)。

GET /layer/{layer}根据指定拓扑层(枚举值,如 ethereum、physical 等)获取对应层级的完整拓扑数据;POST /combined 支持组合多个拓扑层(如以太坊层+物理层)生成统一的拓扑视图,可指定渲染格式。

POST /render 接收任意拓扑数据和渲染请求(格式如 cytoscape、graphviz 等),返回经过布局算法处理的可直接用于前端可视化的数据。

GET /nodes/{node_id}获取任意节点(跨层级)的详细信息,支持指定层级;GET /analysis/{layer}对指定层级进行网络分析(如连通性、中心性、社区检测等指标)。

POST /cache/clear 清空服务内部所有缓存(用于强制刷新数据);GET /debug/info 获取详细的调试信息,包括服务组件状态、各处理器缓存大小、支持的层级与渲染格式等,便于开发与运维排查。


针对该环境和需求的系统设计方案

需要设计一个 FastAPI 后端,它充当一个”中间层(Middleware)”或”聚合层(Aggregator)”,将底层分散的基础设施(Docker)、网络拓扑(SeedEmu/Neo4j)和应用状态(Ethereum/PostgreSQL)整合成统一的 API 暴露给前端

1. 系统架构概览

由于涉及 140+ 容器和多种数据源,系统的核心挑战在于数据聚合和性能优化。建议采用分层架构,包括前端/可视化大屏与 FastAPI 网关的交互,以及 FastAPI 后端应用内的 Redis 缓存层、业务逻辑层 TopologyService,和各种适配器(Docker Adapter、Neo4j Adapter、Ethereum Adapter Web3.py、DB Adapter SQLAlchemy)的协作,最终与基础设施 Docker Compose 中的 Docker Socket、Neo4j 图数据库、JSON-RPC AS101 Host0 和 PostgreSQL ethereum_monitor 进行交互。

2. 核心模块设计

我们需要将代码组织为清晰的模块,以应对你列出的 9 大类接口。

2.1 数据模型层 (Models/Schemas)

使用 Pydantic 定义统一的拓扑数据结构,这是所有接口返回的基础。网络拓扑系统采用了分层架构设计,主要包括以下几个层级:API 层 - topology_api.py 提供 RESTful 接口,服务层 - topology_service.py 协调各种拓扑功能,数据层 - real_topology_service.py 处理真实数据获取,专门处理器层 - 处理合约、交易等特定类型的拓扑,管理与渲染层 - 负责生命周期管理和数据渲染。

用户通过 API 请求拓扑数据(如 /topology/ethereum),API 调用 TopologyService 的 get_ethereum_topology() 方法,TopologyService 委托给 _real_data_service(即 RealTopologyService),RealTopologyService 从 Neo4j 数据库获取真实的以太坊 P2P 网络拓扑数据,数据经过处理和格式化后返回给用户。

对于以太坊拓扑,从 Neo4j 数据库查询执行层和共识层节点及其连接关系,查询验证者节点并与共识节点关联,将原始数据转换为前端友好的拓扑格式,通过 Docker 客户端获取容器信息,建立 IP 地址与容器名称的映射。对于物理拓扑,通过 Docker 客户端获取所有容器的详细信息,根据容器名称识别设备类型(路由器、主机等),根据容器连接的网络建立设备间连接关系,使用共享网络原则确定设备连接。

在 topology_interfaces.py 中定义了核心抽象类:TopologyNode 作为拓扑节点基类,包含 id(节点唯一标识)、name(节点名称)、node_type(节点类型,执行层、共识层、验证者等)、ip_address(IP 地址)、layer(所属层级)、status(状态)、metadata(元数据)等属性;TopologyLink 作为拓扑连接基类,包含 source(源节点 ID)、target(目标节点 ID)、link_type(连接类型)、layer(所属层级)、direction(连接方向)、metadata(元数据)等属性。

不同类型的拓扑节点包括以太坊节点(执行层节点、共识层节点、验证者节点)和物理节点(路由器、主机、交换机)。拓扑数据最终以 nodes(节点列表,每个节点包含 id、name、type、ip_address、status 等属性)、links(连接列表,每个连接包含 source、target、type 等属性)、元数据(时间戳、数据源、统计信息等)的格式组织。

这是与底层交互的关键。InfrastructureAdapter (Docker & SeedEmu) 作用是获取物理拓扑,实现方式是使用 docker Python 库读取容器列表,解析 com.docker.compose.service 和 org.seedsecuritylabs.seedemu.meta. 标签来识别节点角色(AS、路由器、主机);对应接口是 /physical/

GraphDBAdapter (Neo4j) 作用是获取网络静态拓扑和关联关系,实现方式是使用 neo4j Python 驱动,SeedEmu 通常会将生成的拓扑导入 Neo4j,查询 Cypher 语句来获取节点间的连接;对应接口是 /overview, /analysis/{layer}。

BlockchainAdapter (Web3.py & Postgres) 作用是获取链上实时状态,实现方式是使用 web3.py (AsyncHTTPProvider) 连接开放 RPC 端口的节点(如 AS101 的 8545)获取 Block Height, Peer Count,以及连接 PostgreSQL (ethereummonitor) 查询交易历史、合约调用统计;对应接口是 /ethereum/, /contract/_, /transaction/*。

2.3 业务服务层 (Services)

这是实现 RealTopologyService 的地方,负责组装数据。TopologyService 能够根据请求的 layer (physical, ethereum) 调用不同的 Adapter,实现 /combined 接口,将 Docker 的运行状态(Up/Down)注入到 Neo4j 查出的静态拓扑中,并将 Ethereum 节点的逻辑 ID(Validator ID)映射到物理容器 IP。非常重要的是使用 Redis 缓存完整的拓扑 JSON,并设置后台定时任务(FastAPI lifespan 或 APScheduler)每 5-10 秒刷新一次缓存。

AnalysisService 将拓扑数据加载到 Python 的 networkx 库中,计算中心性(Centrality)、最短路径(用于分析攻击传播)、社区发现等算法;对应接口是 /analysis/*。

3. 具体接口实现策略

针对你提供的文档,以下是具体实现建议:

物理层 (Physical Layer) 的挑战是如何知道哪个容器连接哪个。方案是 SeedEmu 通常会在生成容器时将连接信息写入 Neo4j 或生成的 metadata 文件。优先从 Neo4j 读取链路关系,从 Docker API 读取节点存活状态(Status: Running/Exited)。

以太坊层 (Ethereum Layer) 的挑战是 108 个节点,如何获取所有节点状态。方案是使用信标链数据 (Consensus),连接 Beacon Node API (如果环境中有) 获取验证者状态 (Active/Slashed);使用 P2P 拓扑,使用 admin_peers RPC 方法(需要节点开启该 API)查询节点的连接对象。由于无法轮询所有 108 个节点,可以只轮询几个关键 Bootnode 和 AS 网关节点,构建局部图;使用 Postgres 补充,从数据库中读取已知的节点列表和 Validator ID 映射。

交易与合约 (Transaction & Contract) 的挑战是实时分析交易流向。方案是不要直接扫描链,而是利用环境中的 postgresql (ethereum_monitor)。SeedEmu 的监控器应该已经将区块和交易写入库中。接口 /transaction/address/{address}/analysis 直接执行 SQL 聚合查询(Group by to/from),返回资金流向图。

可视化渲染 (Rendering) 的接口是 POST /render,逻辑是前端可能只需传递原始数据,后端使用 networkx 计算布局(如 ForceAtlas2 或层级布局),计算出每个节点的 (x, y) 坐标,返回给前端直接绘制。这能减轻前端浏览器处理 140+ 节点布局的压力。

4. 代码结构示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# app/routers/topology.py

from fastapi import APIRouter, Depends, HTTPException
from app.services.topology_service import TopologyService
from app.schemas.topology import TopologyGraph

router = APIRouter(prefix="/topology", tags=["Topology"])

@router.get("/overview")
async def get_overview(service: TopologyService = Depends(get_topology_service)):
# 从缓存获取,如果无则计算
return await service.get_system_overview()

@router.get("/ethereum/nodes/{node_id}")
async def get_eth_node_detail(node_id: str, service: TopologyService = Depends()):
# 1. 查 Neo4j 获取节点基础信息
# 2. 查 Docker 确认容器是否在线
# 3. 查 Web3/DB 获取链上余额和状态
data = await service.get_node_composite_info(node_id, layer="ethereum")
if not data:
raise HTTPException(status_code=404, detail="Node not found")
return data

@router.post("/combined")
async def get_combined_view(layers: list[str], service: TopologyService = Depends()):
# 融合物理层和区块链层
# 例如:展示 区块链P2P连线 覆盖在 物理AS拓扑 上
return await service.generate_combined_topology(layers)

5. 关键技术难点与解决方案

Docker 网络访问方面,FastAPI 容器需要挂载 /var/run/docker.sock 才能查询兄弟容器的状态。docker-compose.yml 配置中需要添加 volumes: - /var/run/docker.sock:/var/run/docker.sock。

跨容器数据库连接方面,连接 Neo4j 和 Postgres 时,Host 应使用 Docker Compose 服务名(如 neo4j, postgresql),而不是 localhost。

性能瓶颈方面,问题在于/ethereum 接口如果实时调用 100+ 次 RPC 会超时。解决方法是使用”后台采集 + 前端读取”模式,编写一个后台 Task (使用 asyncio 或 Celery) 每 10 秒扫描一次网络状态存入 Redis。API 接口只读 Redis。

IP 地址管理方面,SeedEmu 的自定义网络(inet0, net_2_net_51_52)非常复杂。在返回 /physical 拓扑时,务必解析 Docker inspect 结果中的 NetworkSettings,将特定网段 IP 映射给前端,否则前端无法理解路由关系。

6. 总结建议

设计这个系统时,请遵循以下步骤:首先打通 Neo4j,它是拓扑的真理来源(Source of Truth),确保能查询出 AS、Router 和 Host 的关系;其次实现缓存机制,复杂的 Docker 集群状态获取很慢,不要让 API 直接穿透到 Docker Daemon;定义清晰的 ID 映射,最大的坑在于关联 “Docker Container Name” (e.g., as101_host0) 和 “Ethereum Node ID” (e.g., Validator-3),需要在服务启动时建立这个映射表;按层开发,先做 Physical(最简单,只读 Docker),再做 Ethereum(读 DB),最后做高级的 Contract/Transaction 分析。

实现”实时更新”并”高亮变化”的核心在于结合 FastAPI 的 WebSocket 推送机制与 D3.js 的 General Update Pattern(通用更新模式)。这里有三个关键点:后端推送,只推送最新的全量/增量状态;前端数据绑定,D3 必须通过 Key Function 识别哪些是新节点、哪些是旧节点,而不是每次清空画布重绘;视觉反馈,利用 CSS 动画或 D3 Transition 让变化的节点产生”闪烁”或”颜色渐变”。

1. 整体架构设计

后端 (FastAPI) 运行一个后台任务(Background Task),每隔几秒扫描一次 Docker/Ethereum 状态,通过 WebSocket 广播给前端;前端 (D3.js) 维护一个长连接,收到数据后,执行 updateGraph(newData)。

2. 后端:FastAPI WebSocket 实现

我们需要一个 ConnectionManager 来管理前端连接,并推送拓扑数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# app/routers/ws_topology.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.services.topology_service import TopologyService
import asyncio
import json

router = APIRouter()

class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)

async def broadcast(self, message: dict):
for connection in self.active_connections:
await connection.send_json(message)

manager = ConnectionManager()

# 模拟后台数据推送任务
async def topology_broadcaster(service: TopologyService):
"""
这个函数需要在 main.py 的 @app.on_event("startup") 中启动
"""
while True:
# 获取最新拓扑(包含 Docker 状态 + Geth 连接)
topology_data = await service.get_full_topology_snapshot()

# 广播数据
if manager.active_connections:
await manager.broadcast(topology_data)

# 每 5 秒推送一次,避免前端渲染压力过大
await asyncio.sleep(5)

@router.websocket("/ws/topology")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# 保持连接活跃,也可以接收前端的控制指令(比如点击了某个节点)
data = await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)

3. 前端:D3.js 实时更新与高亮逻辑

这是最关键的部分。不要清空 SVG!使用 D3 的 Enter (新增), Update (更新), Exit (删除) 模式。

核心策略包括 ID 绑定,告诉 D3 如何通过 ID(如 enode_id 或 container_name)区分节点,而不是数组索引;平滑模拟,数据更新时,不要将 alpha 重置为 1(会导致整个图剧烈爆炸),而是重置为 0.3 左右(轻微调整);样式 Diff,比对新旧数据,如果状态变化(如 IP 变了,Peer 数变了),添加 CSS 类名触发动画。