311 lines
7.8 KiB
Go
311 lines
7.8 KiB
Go
|
|
package adapter
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"math/rand"
|
|||
|
|
"net"
|
|||
|
|
"sync"
|
|||
|
|
|
|||
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|||
|
|
|
|||
|
|
"go.yandata.net/iod/iod/trustlog-sdk/api/logger"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 订阅者配置常量.
|
|||
|
|
const (
|
|||
|
|
defaultOutputChannelSize = 100
|
|||
|
|
minOutputChannelSize = 10
|
|||
|
|
maxOutputChannelSize = 10000
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 预定义错误.
|
|||
|
|
var (
|
|||
|
|
ErrListenAddrRequired = errors.New("listen address is required")
|
|||
|
|
ErrSubscriberClosed = errors.New("subscriber is closed")
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// TCPSubscriberConfig TCP 订阅者配置
|
|||
|
|
type TCPSubscriberConfig struct {
|
|||
|
|
// ListenAddr 监听地址,格式: "host:port"
|
|||
|
|
ListenAddr string
|
|||
|
|
|
|||
|
|
// OutputChannelSize 输出 channel 的缓冲大小
|
|||
|
|
// 较小的值(如 10-50):更快的背压传递,但可能降低吞吐量
|
|||
|
|
// 较大的值(如 500-1000):更高的吞吐量,但背压传递较慢
|
|||
|
|
// 默认值:100(平衡吞吐量和背压)
|
|||
|
|
OutputChannelSize int
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TCPSubscriber 实现基于 TCP 的 watermill Subscriber
|
|||
|
|
type TCPSubscriber struct {
|
|||
|
|
config TCPSubscriberConfig
|
|||
|
|
logger logger.Logger
|
|||
|
|
listener net.Listener
|
|||
|
|
|
|||
|
|
subsLock sync.RWMutex
|
|||
|
|
subs map[string][]chan *message.Message // topic -> channels
|
|||
|
|
|
|||
|
|
closed bool
|
|||
|
|
closedMu sync.RWMutex
|
|||
|
|
closeChan chan struct{}
|
|||
|
|
|
|||
|
|
// 连接管理
|
|||
|
|
connMu sync.Mutex
|
|||
|
|
conns []net.Conn
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewTCPSubscriber 创建一个新的 TCP Subscriber.
|
|||
|
|
func NewTCPSubscriber(config TCPSubscriberConfig, logger logger.Logger) (*TCPSubscriber, error) {
|
|||
|
|
if config.ListenAddr == "" {
|
|||
|
|
return nil, ErrListenAddrRequired
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证和设置 channel 大小
|
|||
|
|
channelSize := config.OutputChannelSize
|
|||
|
|
if channelSize <= 0 {
|
|||
|
|
channelSize = defaultOutputChannelSize
|
|||
|
|
}
|
|||
|
|
if channelSize < minOutputChannelSize {
|
|||
|
|
channelSize = minOutputChannelSize
|
|||
|
|
logger.WarnContext(context.Background(), "OutputChannelSize too small, using minimum",
|
|||
|
|
"configured", config.OutputChannelSize, "actual", minOutputChannelSize)
|
|||
|
|
}
|
|||
|
|
if channelSize > maxOutputChannelSize {
|
|||
|
|
channelSize = maxOutputChannelSize
|
|||
|
|
logger.WarnContext(context.Background(), "OutputChannelSize too large, using maximum",
|
|||
|
|
"configured", config.OutputChannelSize, "actual", maxOutputChannelSize)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
listener, err := net.Listen("tcp", config.ListenAddr)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, fmt.Errorf("failed to listen on %s: %w", config.ListenAddr, err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 更新配置中的实际 channel 大小
|
|||
|
|
config.OutputChannelSize = channelSize
|
|||
|
|
|
|||
|
|
s := &TCPSubscriber{
|
|||
|
|
config: config,
|
|||
|
|
logger: logger,
|
|||
|
|
listener: listener,
|
|||
|
|
subs: make(map[string][]chan *message.Message),
|
|||
|
|
closeChan: make(chan struct{}),
|
|||
|
|
conns: make([]net.Conn, 0),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 启动接受连接的协程
|
|||
|
|
go s.acceptConnections()
|
|||
|
|
|
|||
|
|
logger.InfoContext(context.Background(), "TCP Subscriber listening",
|
|||
|
|
"addr", config.ListenAddr,
|
|||
|
|
"channel_size", channelSize)
|
|||
|
|
|
|||
|
|
return s, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// acceptConnections 接受客户端连接
|
|||
|
|
func (s *TCPSubscriber) acceptConnections() {
|
|||
|
|
ctx := context.Background()
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case <-s.closeChan:
|
|||
|
|
s.logger.InfoContext(ctx, "Stopping connection acceptor")
|
|||
|
|
return
|
|||
|
|
default:
|
|||
|
|
conn, err := s.listener.Accept()
|
|||
|
|
if err != nil {
|
|||
|
|
s.closedMu.RLock()
|
|||
|
|
closed := s.closed
|
|||
|
|
s.closedMu.RUnlock()
|
|||
|
|
|
|||
|
|
if closed {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.ErrorContext(ctx, "Failed to accept connection", "error", err)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.InfoContext(ctx, "Accepted new connection", "remote", conn.RemoteAddr().String())
|
|||
|
|
|
|||
|
|
// 保存连接
|
|||
|
|
s.connMu.Lock()
|
|||
|
|
s.conns = append(s.conns, conn)
|
|||
|
|
s.connMu.Unlock()
|
|||
|
|
|
|||
|
|
// 为每个连接启动处理协程
|
|||
|
|
go s.handleConnection(conn)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleConnection 处理单个客户端连接
|
|||
|
|
func (s *TCPSubscriber) handleConnection(conn net.Conn) {
|
|||
|
|
ctx := context.Background()
|
|||
|
|
defer func() {
|
|||
|
|
conn.Close()
|
|||
|
|
s.logger.InfoContext(ctx, "Connection closed", "remote", conn.RemoteAddr().String())
|
|||
|
|
}()
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case <-s.closeChan:
|
|||
|
|
return
|
|||
|
|
default:
|
|||
|
|
// 读取消息
|
|||
|
|
tcpMsg, err := DecodeTCPMessage(conn)
|
|||
|
|
if err != nil {
|
|||
|
|
s.closedMu.RLock()
|
|||
|
|
closed := s.closed
|
|||
|
|
s.closedMu.RUnlock()
|
|||
|
|
|
|||
|
|
if closed {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.ErrorContext(ctx, "Failed to decode message", "error", err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if tcpMsg.Type != MessageTypeData {
|
|||
|
|
s.logger.WarnContext(ctx, "Unexpected message type", "type", tcpMsg.Type)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 处理消息
|
|||
|
|
s.handleMessage(ctx, conn, tcpMsg)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleMessage 处理消息(发送即成功模式,无需 ACK/NACK)
|
|||
|
|
func (s *TCPSubscriber) handleMessage(ctx context.Context, conn net.Conn, tcpMsg *TCPMessage) {
|
|||
|
|
s.logger.DebugContext(ctx, "Received message", "uuid", tcpMsg.UUID, "topic", tcpMsg.Topic)
|
|||
|
|
|
|||
|
|
// 获取该 topic 的订阅者
|
|||
|
|
s.subsLock.RLock()
|
|||
|
|
channels, found := s.subs[tcpMsg.Topic]
|
|||
|
|
s.subsLock.RUnlock()
|
|||
|
|
|
|||
|
|
if !found || len(channels) == 0 {
|
|||
|
|
s.logger.WarnContext(ctx, "No subscribers for topic", "topic", tcpMsg.Topic)
|
|||
|
|
// 不再发送 NACK,直接丢弃消息
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建 watermill 消息
|
|||
|
|
msg := message.NewMessage(tcpMsg.UUID, tcpMsg.Payload)
|
|||
|
|
|
|||
|
|
// 使用随机策略选择订阅者(无锁,性能更好)
|
|||
|
|
randomIndex := rand.Intn(len(channels))
|
|||
|
|
outputChan := channels[randomIndex]
|
|||
|
|
|
|||
|
|
// 记录 channel 使用情况,便于监控背压
|
|||
|
|
channelLen := len(outputChan)
|
|||
|
|
channelCap := cap(outputChan)
|
|||
|
|
usage := float64(channelLen) / float64(channelCap) * 100
|
|||
|
|
|
|||
|
|
s.logger.DebugContext(ctx, "Dispatching message via random selection",
|
|||
|
|
"uuid", tcpMsg.UUID,
|
|||
|
|
"subscriber_index", randomIndex,
|
|||
|
|
"total_subscribers", len(channels),
|
|||
|
|
"channel_usage", fmt.Sprintf("%.1f%% (%d/%d)", usage, channelLen, channelCap))
|
|||
|
|
|
|||
|
|
// 阻塞式发送:当 channel 满时会阻塞,从而触发 TCP 背压
|
|||
|
|
// 这会导致:
|
|||
|
|
// 1. 当前 goroutine 阻塞
|
|||
|
|
// 2. TCP 读取停止
|
|||
|
|
// 3. TCP 接收窗口填满
|
|||
|
|
// 4. 发送端收到零窗口通知
|
|||
|
|
// 5. 发送端停止发送
|
|||
|
|
select {
|
|||
|
|
case outputChan <- msg:
|
|||
|
|
s.logger.DebugContext(ctx, "Message sent to subscriber", "uuid", tcpMsg.UUID, "index", randomIndex)
|
|||
|
|
// 发送即成功:立即 Ack 消息,不等待处理结果
|
|||
|
|
msg.Ack()
|
|||
|
|
case <-s.closeChan:
|
|||
|
|
s.logger.DebugContext(ctx, "Subscriber closed, message discarded", "uuid", tcpMsg.UUID)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 不再等待消息被 ACK 或 NACK,也不发送 ACK/NACK 回执
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendAck 方法已移除
|
|||
|
|
// 采用发送即成功模式,不再发送 ACK/NACK 回执以提高性能
|
|||
|
|
|
|||
|
|
// Subscribe 订阅指定 topic 的消息.
|
|||
|
|
func (s *TCPSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
|
|||
|
|
s.closedMu.RLock()
|
|||
|
|
if s.closed {
|
|||
|
|
s.closedMu.RUnlock()
|
|||
|
|
return nil, ErrSubscriberClosed
|
|||
|
|
}
|
|||
|
|
s.closedMu.RUnlock()
|
|||
|
|
|
|||
|
|
// 使用配置的 channel 大小
|
|||
|
|
channelSize := s.config.OutputChannelSize
|
|||
|
|
if channelSize <= 0 {
|
|||
|
|
channelSize = defaultOutputChannelSize
|
|||
|
|
}
|
|||
|
|
output := make(chan *message.Message, channelSize)
|
|||
|
|
|
|||
|
|
s.subsLock.Lock()
|
|||
|
|
if s.subs[topic] == nil {
|
|||
|
|
s.subs[topic] = make([]chan *message.Message, 0)
|
|||
|
|
}
|
|||
|
|
s.subs[topic] = append(s.subs[topic], output)
|
|||
|
|
subscriberCount := len(s.subs[topic])
|
|||
|
|
s.subsLock.Unlock()
|
|||
|
|
|
|||
|
|
s.logger.InfoContext(ctx, "Subscribed to topic",
|
|||
|
|
"topic", topic,
|
|||
|
|
"subscriber_count", subscriberCount,
|
|||
|
|
"channel_size", channelSize)
|
|||
|
|
|
|||
|
|
return output, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Close 关闭订阅者
|
|||
|
|
func (s *TCPSubscriber) Close() error {
|
|||
|
|
s.closedMu.Lock()
|
|||
|
|
if s.closed {
|
|||
|
|
s.closedMu.Unlock()
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
s.closed = true
|
|||
|
|
s.closedMu.Unlock()
|
|||
|
|
|
|||
|
|
close(s.closeChan)
|
|||
|
|
|
|||
|
|
// 关闭监听器
|
|||
|
|
if s.listener != nil {
|
|||
|
|
if err := s.listener.Close(); err != nil {
|
|||
|
|
s.logger.ErrorContext(context.Background(), "Failed to close listener", "error", err)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 关闭所有连接
|
|||
|
|
s.connMu.Lock()
|
|||
|
|
for _, conn := range s.conns {
|
|||
|
|
conn.Close()
|
|||
|
|
}
|
|||
|
|
s.connMu.Unlock()
|
|||
|
|
|
|||
|
|
// 关闭所有订阅通道
|
|||
|
|
s.subsLock.Lock()
|
|||
|
|
for topic, channels := range s.subs {
|
|||
|
|
for _, ch := range channels {
|
|||
|
|
close(ch)
|
|||
|
|
}
|
|||
|
|
delete(s.subs, topic)
|
|||
|
|
}
|
|||
|
|
s.subsLock.Unlock()
|
|||
|
|
|
|||
|
|
s.logger.InfoContext(context.Background(), "TCP Subscriber closed")
|
|||
|
|
return nil
|
|||
|
|
}
|