364 lines
10 KiB
Go
364 lines
10 KiB
Go
|
|
package persistence_test
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"database/sql"
|
|||
|
|
"fmt"
|
|||
|
|
"testing"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
_ "github.com/lib/pq"
|
|||
|
|
"github.com/stretchr/testify/assert"
|
|||
|
|
"github.com/stretchr/testify/require"
|
|||
|
|
|
|||
|
|
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
|
|||
|
|
"go.yandata.net/iod/iod/go-trustlog/api/logger"
|
|||
|
|
"go.yandata.net/iod/iod/go-trustlog/api/model"
|
|||
|
|
"go.yandata.net/iod/iod/go-trustlog/api/persistence"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
// PostgreSQL 连接配置
|
|||
|
|
postgresHost = "localhost"
|
|||
|
|
postgresPort = 5432
|
|||
|
|
postgresUser = "postgres"
|
|||
|
|
postgresPassword = "postgres"
|
|||
|
|
postgresDatabase = "trustlog"
|
|||
|
|
|
|||
|
|
// Pulsar 连接配置
|
|||
|
|
pulsarURL = "pulsar://localhost:6650"
|
|||
|
|
pulsarTopic = "trustlog-integration-test"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// TestIntegration_PostgreSQL_Basic 测试基本的 PostgreSQL 持久化功能
|
|||
|
|
func TestIntegration_PostgreSQL_Basic(t *testing.T) {
|
|||
|
|
if testing.Short() {
|
|||
|
|
t.Skip("Skipping integration test in short mode")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ctx := context.Background()
|
|||
|
|
log := logger.NewNopLogger()
|
|||
|
|
|
|||
|
|
// 创建数据库连接
|
|||
|
|
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
|||
|
|
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDatabase)
|
|||
|
|
|
|||
|
|
db, err := sql.Open("postgres", dsn)
|
|||
|
|
require.NoError(t, err, "Failed to connect to PostgreSQL")
|
|||
|
|
defer db.Close()
|
|||
|
|
|
|||
|
|
// 测试连接
|
|||
|
|
err = db.PingContext(ctx)
|
|||
|
|
require.NoError(t, err, "Failed to ping PostgreSQL")
|
|||
|
|
|
|||
|
|
// 创建持久化客户端(仅落库策略)
|
|||
|
|
config := persistence.PersistenceClientConfig{
|
|||
|
|
DBConfig: persistence.DBConfig{
|
|||
|
|
DSN: dsn,
|
|||
|
|
DriverName: "postgres",
|
|||
|
|
MaxOpenConns: 10,
|
|||
|
|
},
|
|||
|
|
PersistenceConfig: persistence.PersistenceConfig{
|
|||
|
|
Strategy: persistence.StrategyDBOnly,
|
|||
|
|
},
|
|||
|
|
Logger: log,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
client, err := persistence.NewPersistenceClient(ctx, config)
|
|||
|
|
require.NoError(t, err, "Failed to create persistence client")
|
|||
|
|
defer client.Close()
|
|||
|
|
|
|||
|
|
// 创建测试 Operation
|
|||
|
|
now := time.Now()
|
|||
|
|
clientIP := "192.168.1.100"
|
|||
|
|
serverIP := "10.0.0.1"
|
|||
|
|
|
|||
|
|
operation := &model.Operation{
|
|||
|
|
OpID: fmt.Sprintf("test-op-%d", now.Unix()),
|
|||
|
|
Timestamp: now,
|
|||
|
|
OpSource: model.OpSourceDOIP,
|
|||
|
|
OpType: model.OpTypeCreate,
|
|||
|
|
DoPrefix: "test",
|
|||
|
|
ClientIP: &clientIP,
|
|||
|
|
ServerIP: &serverIP,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 存储 Operation
|
|||
|
|
err = client.OperationPublish(ctx, operation)
|
|||
|
|
require.NoError(t, err, "Failed to publish operation")
|
|||
|
|
|
|||
|
|
// 等待一小段时间确保写入完成
|
|||
|
|
time.Sleep(100 * time.Millisecond)
|
|||
|
|
|
|||
|
|
// 从数据库查询验证
|
|||
|
|
var count int
|
|||
|
|
err = db.QueryRowContext(ctx, "SELECT COUNT(*) FROM operation WHERE op_id = $1", operation.OpID).Scan(&count)
|
|||
|
|
require.NoError(t, err, "Failed to query operation")
|
|||
|
|
assert.Equal(t, 1, count, "Operation should be stored in database")
|
|||
|
|
|
|||
|
|
// 验证 IP 字段
|
|||
|
|
var storedClientIP, storedServerIP sql.NullString
|
|||
|
|
err = db.QueryRowContext(ctx,
|
|||
|
|
"SELECT client_ip, server_ip FROM operation WHERE op_id = $1",
|
|||
|
|
operation.OpID).Scan(&storedClientIP, &storedServerIP)
|
|||
|
|
require.NoError(t, err, "Failed to query IP fields")
|
|||
|
|
|
|||
|
|
assert.True(t, storedClientIP.Valid, "ClientIP should be valid")
|
|||
|
|
assert.Equal(t, clientIP, storedClientIP.String, "ClientIP should match")
|
|||
|
|
assert.True(t, storedServerIP.Valid, "ServerIP should be valid")
|
|||
|
|
assert.Equal(t, serverIP, storedServerIP.String, "ServerIP should match")
|
|||
|
|
|
|||
|
|
t.Logf("✅ PostgreSQL basic test passed")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestIntegration_PostgreSQL_NullableIP 测试可空 IP 字段
|
|||
|
|
func TestIntegration_PostgreSQL_NullableIP(t *testing.T) {
|
|||
|
|
if testing.Short() {
|
|||
|
|
t.Skip("Skipping integration test in short mode")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ctx := context.Background()
|
|||
|
|
log := logger.NewNopLogger()
|
|||
|
|
|
|||
|
|
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
|||
|
|
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDatabase)
|
|||
|
|
|
|||
|
|
db, err := sql.Open("postgres", dsn)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer db.Close()
|
|||
|
|
|
|||
|
|
config := persistence.PersistenceClientConfig{
|
|||
|
|
DBConfig: persistence.DBConfig{
|
|||
|
|
DSN: dsn,
|
|||
|
|
DriverName: "postgres",
|
|||
|
|
MaxOpenConns: 10,
|
|||
|
|
},
|
|||
|
|
PersistenceConfig: persistence.PersistenceConfig{
|
|||
|
|
Strategy: persistence.StrategyDBOnly,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
client, err := persistence.NewPersistenceClient(ctx, config)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer client.Close()
|
|||
|
|
|
|||
|
|
// 创建不带 IP 的 Operation
|
|||
|
|
now := time.Now()
|
|||
|
|
operation := &model.Operation{
|
|||
|
|
OpID: fmt.Sprintf("test-op-noip-%d", now.Unix()),
|
|||
|
|
Timestamp: now,
|
|||
|
|
OpSource: model.OpSourceDOIP,
|
|||
|
|
OpType: model.OpTypeUpdate,
|
|||
|
|
DoPrefix: "test",
|
|||
|
|
// ClientIP 和 ServerIP 为 nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
err = client.OperationPublish(ctx, operation)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
time.Sleep(100 * time.Millisecond)
|
|||
|
|
|
|||
|
|
// 验证 IP 字段为 NULL
|
|||
|
|
var storedClientIP, storedServerIP sql.NullString
|
|||
|
|
err = db.QueryRowContext(ctx,
|
|||
|
|
"SELECT client_ip, server_ip FROM operation WHERE op_id = $1",
|
|||
|
|
operation.OpID).Scan(&storedClientIP, &storedServerIP)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
assert.False(t, storedClientIP.Valid, "ClientIP should be NULL")
|
|||
|
|
assert.False(t, storedServerIP.Valid, "ServerIP should be NULL")
|
|||
|
|
|
|||
|
|
t.Logf("✅ PostgreSQL nullable IP test passed")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestIntegration_Pulsar_PostgreSQL 测试 Pulsar + PostgreSQL 集成
|
|||
|
|
func TestIntegration_Pulsar_PostgreSQL(t *testing.T) {
|
|||
|
|
if testing.Short() {
|
|||
|
|
t.Skip("Skipping integration test in short mode")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|||
|
|
defer cancel()
|
|||
|
|
|
|||
|
|
log := logger.NewNopLogger()
|
|||
|
|
|
|||
|
|
// PostgreSQL 配置
|
|||
|
|
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
|||
|
|
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDatabase)
|
|||
|
|
|
|||
|
|
db, err := sql.Open("postgres", dsn)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer db.Close()
|
|||
|
|
|
|||
|
|
err = db.PingContext(ctx)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
// 创建 Pulsar publisher
|
|||
|
|
pulsarConfig := adapter.PublisherConfig{
|
|||
|
|
Logger: log,
|
|||
|
|
URL: pulsarURL,
|
|||
|
|
Topic: pulsarTopic,
|
|||
|
|
ProducerName: "integration-test-producer",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
publisher, err := adapter.NewPublisher(ctx, pulsarConfig)
|
|||
|
|
require.NoError(t, err, "Failed to create Pulsar publisher")
|
|||
|
|
defer publisher.Close()
|
|||
|
|
|
|||
|
|
// 创建持久化客户端(DB + Trustlog 策略)
|
|||
|
|
config := persistence.PersistenceClientConfig{
|
|||
|
|
DBConfig: persistence.DBConfig{
|
|||
|
|
DSN: dsn,
|
|||
|
|
DriverName: "postgres",
|
|||
|
|
MaxOpenConns: 10,
|
|||
|
|
},
|
|||
|
|
PersistenceConfig: persistence.PersistenceConfig{
|
|||
|
|
Strategy: persistence.StrategyDBAndTrustlog,
|
|||
|
|
},
|
|||
|
|
EnableCursorWorker: true,
|
|||
|
|
CursorWorkerConfig: persistence.CursorWorkerConfig{
|
|||
|
|
ScanInterval: 5 * time.Second,
|
|||
|
|
BatchSize: 10,
|
|||
|
|
MaxRetries: 3,
|
|||
|
|
RetryInterval: 2 * time.Second,
|
|||
|
|
InitialBackoff: 1 * time.Second,
|
|||
|
|
MaxBackoff: 10 * time.Second,
|
|||
|
|
},
|
|||
|
|
EnableRetryWorker: true,
|
|||
|
|
RetryWorkerConfig: persistence.RetryWorkerConfig{
|
|||
|
|
CheckInterval: 5 * time.Second,
|
|||
|
|
BatchSize: 10,
|
|||
|
|
MaxRetries: 5,
|
|||
|
|
InitialDelay: 1 * time.Minute,
|
|||
|
|
MaxDelay: 1 * time.Hour,
|
|||
|
|
BackoffFactor: 2.0,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
client, err := persistence.NewPersistenceClient(ctx, config)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer client.Close()
|
|||
|
|
|
|||
|
|
// 设置 publisher
|
|||
|
|
client.SetPublisher(publisher)
|
|||
|
|
|
|||
|
|
// 创建测试 Operation
|
|||
|
|
now := time.Now()
|
|||
|
|
clientIP := "172.16.0.100"
|
|||
|
|
|
|||
|
|
operation := &model.Operation{
|
|||
|
|
OpID: fmt.Sprintf("pulsar-test-%d", now.Unix()),
|
|||
|
|
Timestamp: now,
|
|||
|
|
OpSource: model.OpSourceDOIP,
|
|||
|
|
OpType: model.OpTypeCreate,
|
|||
|
|
DoPrefix: "integration",
|
|||
|
|
ClientIP: &clientIP,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 发布 Operation(应该同时写入 DB 和 Pulsar)
|
|||
|
|
err = client.OperationPublish(ctx, operation)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
// 等待异步处理
|
|||
|
|
time.Sleep(2 * time.Second)
|
|||
|
|
|
|||
|
|
// 验证 DB 中存在记录
|
|||
|
|
var count int
|
|||
|
|
err = db.QueryRowContext(ctx, "SELECT COUNT(*) FROM operation WHERE op_id = $1", operation.OpID).Scan(&count)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
assert.Equal(t, 1, count, "Operation should be in database")
|
|||
|
|
|
|||
|
|
// 验证 trustlog_status 初始为 NOT_TRUSTLOGGED
|
|||
|
|
var status string
|
|||
|
|
err = db.QueryRowContext(ctx, "SELECT trustlog_status FROM operation WHERE op_id = $1", operation.OpID).Scan(&status)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
assert.Equal(t, "NOT_TRUSTLOGGED", status, "Initial status should be NOT_TRUSTLOGGED")
|
|||
|
|
|
|||
|
|
t.Logf("✅ Pulsar + PostgreSQL integration test passed")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestIntegration_CursorWorker 测试 Cursor Worker 功能
|
|||
|
|
func TestIntegration_CursorWorker(t *testing.T) {
|
|||
|
|
if testing.Short() {
|
|||
|
|
t.Skip("Skipping integration test in short mode")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
|||
|
|
defer cancel()
|
|||
|
|
|
|||
|
|
log := logger.NewNopLogger()
|
|||
|
|
|
|||
|
|
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
|||
|
|
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDatabase)
|
|||
|
|
|
|||
|
|
db, err := sql.Open("postgres", dsn)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer db.Close()
|
|||
|
|
|
|||
|
|
// 创建带 Cursor Worker 的持久化客户端
|
|||
|
|
config := persistence.PersistenceClientConfig{
|
|||
|
|
DBConfig: persistence.DBConfig{
|
|||
|
|
DSN: dsn,
|
|||
|
|
DriverName: "postgres",
|
|||
|
|
MaxOpenConns: 10,
|
|||
|
|
},
|
|||
|
|
PersistenceConfig: persistence.PersistenceConfig{
|
|||
|
|
Strategy: persistence.StrategyDBAndTrustlog,
|
|||
|
|
},
|
|||
|
|
EnableCursorWorker: true,
|
|||
|
|
CursorWorkerConfig: persistence.CursorWorkerConfig{
|
|||
|
|
ScanInterval: 2 * time.Second, // 更频繁的扫描用于测试
|
|||
|
|
BatchSize: 10,
|
|||
|
|
MaxRetries: 3,
|
|||
|
|
RetryInterval: 1 * time.Second,
|
|||
|
|
InitialBackoff: 500 * time.Millisecond,
|
|||
|
|
MaxBackoff: 5 * time.Second,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
client, err := persistence.NewPersistenceClient(ctx, config)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer client.Close()
|
|||
|
|
|
|||
|
|
// 创建 mock publisher
|
|||
|
|
pulsarConfig := adapter.PublisherConfig{
|
|||
|
|
Logger: log,
|
|||
|
|
URL: pulsarURL,
|
|||
|
|
Topic: pulsarTopic + "-cursor",
|
|||
|
|
ProducerName: "cursor-test-producer",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
publisher, err := adapter.NewPublisher(ctx, pulsarConfig)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
defer publisher.Close()
|
|||
|
|
|
|||
|
|
client.SetPublisher(publisher)
|
|||
|
|
|
|||
|
|
// 插入测试记录
|
|||
|
|
now := time.Now()
|
|||
|
|
operation := &model.Operation{
|
|||
|
|
OpID: fmt.Sprintf("cursor-test-%d", now.Unix()),
|
|||
|
|
Timestamp: now,
|
|||
|
|
OpSource: model.OpSourceDOIP,
|
|||
|
|
OpType: model.OpTypeCreate,
|
|||
|
|
DoPrefix: "cursor-test",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
err = client.OperationPublish(ctx, operation)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
// 等待 Cursor Worker 处理
|
|||
|
|
time.Sleep(10 * time.Second)
|
|||
|
|
|
|||
|
|
// 验证状态变化(可能已被处理)
|
|||
|
|
var status string
|
|||
|
|
err = db.QueryRowContext(ctx, "SELECT trustlog_status FROM operation WHERE op_id = $1", operation.OpID).Scan(&status)
|
|||
|
|
require.NoError(t, err)
|
|||
|
|
|
|||
|
|
t.Logf("Operation status after cursor worker: %s", status)
|
|||
|
|
// 注意:由于 Pulsar 可能不可用,状态可能仍是 NOT_TRUSTLOGGED 或进入 retry 表
|
|||
|
|
|
|||
|
|
t.Logf("✅ Cursor Worker test completed")
|
|||
|
|
}
|
|||
|
|
|