Skip to content

Commit fd56e89

Browse files
committed
feat(trace): 异步化 Geo 查询以降低探测延迟
- 将 fetchIPData 改为通过 addWithGeoAsync 异步执行 - 新增 waitGeo 机制,在打印每个 TTL 前等待 Geo 数据就绪 - 为 geoWG.Wait 添加 30 秒总超时,防止无限阻塞 - 无论是否有错误都等待 geoWG 完成,避免返回后结果仍被修改 - 在 printer 中增加 pending 状态保护,避免空指针访问 - 导出 PendingGeoSource 常量供跨包使用 - 修改 Result.add 返回 (bool, int) 以追踪插入索引 此变更将 Geo/DNS 查询从主探测路径解耦,显著降低用户感知延迟, 同时通过 sync.Map 缓存和 singleflight 去重保持结果一致性。
1 parent b53f62c commit fd56e89

File tree

10 files changed

+217
-57
lines changed

10 files changed

+217
-57
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,4 @@ NTrace-core
172172

173173
.gocache
174174
.gomodcache
175+
.cache

printer/basic.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ func PrintTraceRouteNav(ip net.IP, domain string, dataOrigin string, maxHops int
7777
}
7878

7979
func applyLangSetting(h *trace.Hop) {
80+
if h.Geo == nil || h.Geo.Source == trace.PendingGeoSource {
81+
return
82+
}
8083
if len(h.Geo.Country) <= 1 {
8184
// 打印 h.Geo
8285
if h.Geo.Whois != "" {

printer/realtime_printer_router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func RealtimePrinterWithRouter(res *trace.Result, ttl int) {
8585
fmt.Fprintf(color.Output, " %s", color.New(color.FgHiGreen, color.Bold).Sprintf("%-16s", whoisFormat[0]))
8686
}
8787

88-
if res.Hops[ttl][i].Geo.Country == "" {
88+
if res.Hops[ttl][i].Geo.Country == "" && res.Hops[ttl][i].Geo.Source != trace.PendingGeoSource {
8989
res.Hops[ttl][i].Geo.Country = "LAN Address"
9090
}
9191

trace/icmp_ipv4.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (t *ICMPTracer) PrintFunc(ctx context.Context, cancel context.CancelCauseFu
7474
// 接收的时候检查一下是不是 3 跳都齐了
7575
if t.ttlComp(ttl + 1) {
7676
if t.RealtimePrinter != nil {
77+
t.res.waitGeo(ctx, ttl)
7778
t.RealtimePrinter(&t.res, ttl)
7879
}
7980
ttl++
@@ -192,10 +193,7 @@ func (t *ICMPTracer) addHopWithIndex(peer net.Addr, ttl, i int, rtt time.Duratio
192193
RTT: rtt,
193194
MPLS: mpls,
194195
}
195-
196-
_ = h.fetchIPData(t.Config) // 忽略错误,继续添加结果
197-
198-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
196+
t.res.addWithGeoAsync(h, i, t.NumMeasurements, t.MaxAttempts, t.Config)
199197
}
200198

201199
func (t *ICMPTracer) matchWorker(ctx context.Context) {
@@ -263,6 +261,7 @@ func (t *ICMPTracer) Execute() (res *Result, err error) {
263261
// 初始化 res.Hops 和 res.tailDone,并预分配到 MaxHops
264262
t.res.Hops = make([][]Hop, t.MaxHops)
265263
t.res.tailDone = make([]bool, t.MaxHops)
264+
t.res.setGeoWait(t.NumMeasurements)
266265

267266
// 解析并校验用户指定的 IPv4 源地址
268267
SrcAddr := net.ParseIP(t.SrcAddr).To4()
@@ -436,7 +435,7 @@ func (t *ICMPTracer) send(ctx context.Context, s *internal.ICMPSpec, ttl, i int)
436435
Error: errHopLimitTimeout,
437436
}
438437

439-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
438+
_, _ = t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
440439
t.dropSent(seq)
441440
}
442441
}(seq, ttl, i)

trace/icmp_ipv6.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (t *ICMPTracerv6) PrintFunc(ctx context.Context, cancel context.CancelCause
7474
// 接收的时候检查一下是不是 3 跳都齐了
7575
if t.ttlComp(ttl + 1) {
7676
if t.RealtimePrinter != nil {
77+
t.res.waitGeo(ctx, ttl)
7778
t.RealtimePrinter(&t.res, ttl)
7879
}
7980
ttl++
@@ -192,10 +193,7 @@ func (t *ICMPTracerv6) addHopWithIndex(peer net.Addr, ttl, i int, rtt time.Durat
192193
RTT: rtt,
193194
MPLS: mpls,
194195
}
195-
196-
_ = h.fetchIPData(t.Config) // 忽略错误,继续添加结果
197-
198-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
196+
t.res.addWithGeoAsync(h, i, t.NumMeasurements, t.MaxAttempts, t.Config)
199197
}
200198

201199
func (t *ICMPTracerv6) matchWorker(ctx context.Context) {
@@ -263,6 +261,7 @@ func (t *ICMPTracerv6) Execute() (res *Result, err error) {
263261
// 初始化 res.Hops 和 res.tailDone,并预分配到 MaxHops
264262
t.res.Hops = make([][]Hop, t.MaxHops)
265263
t.res.tailDone = make([]bool, t.MaxHops)
264+
t.res.setGeoWait(t.NumMeasurements)
266265

267266
// 解析并校验用户指定的 IPv6 源地址
268267
SrcAddr := net.ParseIP(t.SrcAddr)
@@ -439,7 +438,7 @@ func (t *ICMPTracerv6) send(ctx context.Context, s *internal.ICMPSpec, ttl, i in
439438
Error: errHopLimitTimeout,
440439
}
441440

442-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
441+
_, _ = t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
443442
t.dropSent(seq)
444443
}
445444
}(seq, ttl, i)

trace/tcp_ipv4.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func (t *TCPTracer) PrintFunc(ctx context.Context, cancel context.CancelCauseFun
7676
// 接收的时候检查一下是不是 3 跳都齐了
7777
if t.ttlComp(ttl + 1) {
7878
if t.RealtimePrinter != nil {
79+
t.res.waitGeo(ctx, ttl)
7980
t.RealtimePrinter(&t.res, ttl)
8081
}
8182
ttl++
@@ -180,10 +181,7 @@ func (t *TCPTracer) addHopWithIndex(peer net.Addr, ttl, i int, rtt time.Duration
180181
RTT: rtt,
181182
MPLS: mpls,
182183
}
183-
184-
_ = h.fetchIPData(t.Config) // 忽略错误,继续添加结果
185-
186-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
184+
t.res.addWithGeoAsync(h, i, t.NumMeasurements, t.MaxAttempts, t.Config)
187185
}
188186

189187
func (t *TCPTracer) matchWorker(ctx context.Context) {
@@ -253,6 +251,7 @@ func (t *TCPTracer) Execute() (res *Result, err error) {
253251
// 初始化 res.Hops 和 res.tailDone,并预分配到 MaxHops
254252
t.res.Hops = make([][]Hop, t.MaxHops)
255253
t.res.tailDone = make([]bool, t.MaxHops)
254+
t.res.setGeoWait(t.NumMeasurements)
256255

257256
// 解析并校验用户指定的 IPv4 源地址
258257
SrcAddr := net.ParseIP(t.SrcAddr).To4()
@@ -475,7 +474,7 @@ func (t *TCPTracer) send(ctx context.Context, s *internal.TCPSpec, ttl, i int) e
475474
Error: errHopLimitTimeout,
476475
}
477476

478-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
477+
_, _ = t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
479478
t.dropSent(seq)
480479
}
481480
}(seq, ttl, i)

trace/tcp_ipv6.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func (t *TCPTracerIPv6) PrintFunc(ctx context.Context, cancel context.CancelCaus
7676
// 接收的时候检查一下是不是 3 跳都齐了
7777
if t.ttlComp(ttl + 1) {
7878
if t.RealtimePrinter != nil {
79+
t.res.waitGeo(ctx, ttl)
7980
t.RealtimePrinter(&t.res, ttl)
8081
}
8182
ttl++
@@ -180,10 +181,7 @@ func (t *TCPTracerIPv6) addHopWithIndex(peer net.Addr, ttl, i int, rtt time.Dura
180181
RTT: rtt,
181182
MPLS: mpls,
182183
}
183-
184-
_ = h.fetchIPData(t.Config) // 忽略错误,继续添加结果
185-
186-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
184+
t.res.addWithGeoAsync(h, i, t.NumMeasurements, t.MaxAttempts, t.Config)
187185
}
188186

189187
func (t *TCPTracerIPv6) matchWorker(ctx context.Context) {
@@ -253,6 +251,7 @@ func (t *TCPTracerIPv6) Execute() (res *Result, err error) {
253251
// 初始化 res.Hops 和 res.tailDone,并预分配到 MaxHops
254252
t.res.Hops = make([][]Hop, t.MaxHops)
255253
t.res.tailDone = make([]bool, t.MaxHops)
254+
t.res.setGeoWait(t.NumMeasurements)
256255

257256
// 解析并校验用户指定的 IPv6 源地址
258257
SrcAddr := net.ParseIP(t.SrcAddr)
@@ -475,7 +474,7 @@ func (t *TCPTracerIPv6) send(ctx context.Context, s *internal.TCPSpec, ttl, i in
475474
Error: errHopLimitTimeout,
476475
}
477476

478-
t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
477+
_, _ = t.res.add(h, i, t.NumMeasurements, t.MaxAttempts)
479478
t.dropSent(seq)
480479
}
481480
}(seq, ttl, i)

0 commit comments

Comments
 (0)