Compare commits

...

10 Commits

Author SHA1 Message Date
c2726db8a1 logrus添加支持彩虹日志的配置字段force_colors 2025-07-17 03:18:31 +08:00
Bobo
47c72651db feat: database. 2025-06-29 18:36:43 +08:00
Bobo
f267c19c73 feat: database. 2025-06-29 14:17:03 +08:00
Bobo
8c017a34e0 feat: database. 2025-06-29 11:12:16 +08:00
Bobo
ac6f0d1987 feat: database. 2025-06-29 09:41:13 +08:00
Bobo
29a8782662 feat: database. 2025-06-29 09:29:47 +08:00
Bobo
d0e55cf372 feat: api. 2025-06-29 08:50:08 +08:00
Bobo
45d364280b feat: mongodb. 2025-06-26 21:53:55 +08:00
Bobo
fcd2a5ee43 feat: influxdb. 2025-06-26 18:07:24 +08:00
Bobo
989f5da01f feat: influxdb. 2025-06-26 16:58:49 +08:00
34 changed files with 3545 additions and 288 deletions

View File

@@ -433,21 +433,22 @@ func (x *Data_Redis) GetEnableMetrics() bool {
type Data_MongoDB struct {
state protoimpl.MessageState `protogen:"open.v1"`
Uri string `protobuf:"bytes,1,opt,name=uri,proto3" json:"uri,omitempty"`
Username *string `protobuf:"bytes,2,opt,name=username,proto3,oneof" json:"username,omitempty"`
Password *string `protobuf:"bytes,3,opt,name=password,proto3,oneof" json:"password,omitempty"`
AuthMechanism *string `protobuf:"bytes,4,opt,name=auth_mechanism,json=authMechanism,proto3,oneof" json:"auth_mechanism,omitempty"` // 认证机制SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
AuthMechanismProperties map[string]string `protobuf:"bytes,5,rep,name=auth_mechanism_properties,json=authMechanismProperties,proto3" json:"auth_mechanism_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // 认证机制属性
AuthSource *string `protobuf:"bytes,6,opt,name=auth_source,json=authSource,proto3,oneof" json:"auth_source,omitempty"` // 认证源admin、$external等
ConnectTimeout *durationpb.Duration `protobuf:"bytes,50,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` // 连接超时时间
HeartbeatInterval *durationpb.Duration `protobuf:"bytes,51,opt,name=heartbeat_interval,json=heartbeatInterval,proto3" json:"heartbeat_interval,omitempty"` // 心跳间隔
LocalThreshold *durationpb.Duration `protobuf:"bytes,52,opt,name=local_threshold,json=localThreshold,proto3" json:"local_threshold,omitempty"` // 本地延迟阈值
MaxConnIdleTime *durationpb.Duration `protobuf:"bytes,53,opt,name=max_conn_idle_time,json=maxConnIdleTime,proto3" json:"max_conn_idle_time,omitempty"` // 最大连接空闲时间
MaxStaleness *durationpb.Duration `protobuf:"bytes,54,opt,name=max_staleness,json=maxStaleness,proto3" json:"max_staleness,omitempty"` // 最大陈旧时间
ServerSelectionTimeout *durationpb.Duration `protobuf:"bytes,55,opt,name=server_selection_timeout,json=serverSelectionTimeout,proto3" json:"server_selection_timeout,omitempty"` // 服务器选择超时时间
SocketTimeout *durationpb.Duration `protobuf:"bytes,56,opt,name=socket_timeout,json=socketTimeout,proto3" json:"socket_timeout,omitempty"` // 套接字超时时间
Timeout *durationpb.Duration `protobuf:"bytes,57,opt,name=timeout,proto3" json:"timeout,omitempty"` // 超时时间
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
Database *string `protobuf:"bytes,2,opt,name=database,proto3,oneof" json:"database,omitempty"`
Username *string `protobuf:"bytes,10,opt,name=username,proto3,oneof" json:"username,omitempty"`
Password *string `protobuf:"bytes,11,opt,name=password,proto3,oneof" json:"password,omitempty"`
AuthMechanism *string `protobuf:"bytes,20,opt,name=auth_mechanism,json=authMechanism,proto3,oneof" json:"auth_mechanism,omitempty"` // 认证机制SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
AuthMechanismProperties map[string]string `protobuf:"bytes,21,rep,name=auth_mechanism_properties,json=authMechanismProperties,proto3" json:"auth_mechanism_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // 认证机制属性
AuthSource *string `protobuf:"bytes,22,opt,name=auth_source,json=authSource,proto3,oneof" json:"auth_source,omitempty"` // 认证源admin、$external等
ConnectTimeout *durationpb.Duration `protobuf:"bytes,50,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` // 连接超时时间
HeartbeatInterval *durationpb.Duration `protobuf:"bytes,51,opt,name=heartbeat_interval,json=heartbeatInterval,proto3" json:"heartbeat_interval,omitempty"` // 心跳间隔
LocalThreshold *durationpb.Duration `protobuf:"bytes,52,opt,name=local_threshold,json=localThreshold,proto3" json:"local_threshold,omitempty"` // 本地延迟阈值
MaxConnIdleTime *durationpb.Duration `protobuf:"bytes,53,opt,name=max_conn_idle_time,json=maxConnIdleTime,proto3" json:"max_conn_idle_time,omitempty"` // 最大连接空闲时间
MaxStaleness *durationpb.Duration `protobuf:"bytes,54,opt,name=max_staleness,json=maxStaleness,proto3" json:"max_staleness,omitempty"` // 最大陈旧时间
ServerSelectionTimeout *durationpb.Duration `protobuf:"bytes,55,opt,name=server_selection_timeout,json=serverSelectionTimeout,proto3" json:"server_selection_timeout,omitempty"` // 服务器选择超时时间
SocketTimeout *durationpb.Duration `protobuf:"bytes,56,opt,name=socket_timeout,json=socketTimeout,proto3" json:"socket_timeout,omitempty"` // 套接字超时时间
Timeout *durationpb.Duration `protobuf:"bytes,57,opt,name=timeout,proto3" json:"timeout,omitempty"` // 总超时时间
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -489,6 +490,13 @@ func (x *Data_MongoDB) GetUri() string {
return ""
}
func (x *Data_MongoDB) GetDatabase() string {
if x != nil && x.Database != nil {
return *x.Database
}
return ""
}
func (x *Data_MongoDB) GetUsername() string {
if x != nil && x.Username != nil {
return *x.Username
@@ -596,30 +604,30 @@ func (x *Data_MongoDB) GetEnableMetrics() bool {
// ClickHouse
type Data_ClickHouse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Addresses []string `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty"` // 对端网络地址
Database string `protobuf:"bytes,2,opt,name=database,proto3" json:"database,omitempty"` // 数据库名
Username string `protobuf:"bytes,3,opt,name=username,proto3" json:"username,omitempty"` // 用户名
Password string `protobuf:"bytes,4,opt,name=password,proto3" json:"password,omitempty"` // 密码
Debug bool `protobuf:"varint,5,opt,name=debug,proto3" json:"debug,omitempty"` // 调试开关
Protocol string `protobuf:"bytes,6,opt,name=protocol,proto3" json:"protocol,omitempty"` // 协议http、https、tcp、native
Tls *TLS `protobuf:"bytes,7,opt,name=tls,proto3" json:"tls,omitempty"` // TLS配置
CompressionMethod string `protobuf:"bytes,10,opt,name=compression_method,json=compressionMethod,proto3" json:"compression_method,omitempty"` // 压缩方法lz4、zstd、none
ConnOpenStrategy string `protobuf:"bytes,11,opt,name=conn_open_strategy,json=connOpenStrategy,proto3" json:"conn_open_strategy,omitempty"` // 连接打开策略default、lazy、always
DialTimeout *durationpb.Duration `protobuf:"bytes,20,opt,name=dial_timeout,json=dialTimeout,proto3" json:"dial_timeout,omitempty"`
ConnMaxLifeTime *durationpb.Duration `protobuf:"bytes,21,opt,name=conn_max_life_time,json=connMaxLifeTime,proto3" json:"conn_max_life_time,omitempty"`
ConnectionMaxLifetime *durationpb.Duration `protobuf:"bytes,22,opt,name=connection_max_lifetime,json=connectionMaxLifetime,proto3" json:"connection_max_lifetime,omitempty"` // 连接可重用的最大时间长度
MaxExecutionTime int32 `protobuf:"varint,23,opt,name=max_execution_time,json=maxExecutionTime,proto3" json:"max_execution_time,omitempty"` // 最大执行时间(秒)
MaxOpenConns int32 `protobuf:"varint,30,opt,name=max_open_conns,json=maxOpenConns,proto3" json:"max_open_conns,omitempty"` // 连接池最大打开连接数
MaxIdleConns int32 `protobuf:"varint,31,opt,name=max_idle_conns,json=maxIdleConns,proto3" json:"max_idle_conns,omitempty"` // 连接池最大空闲连接数
MaxIdleConnections int32 `protobuf:"varint,32,opt,name=max_idle_connections,json=maxIdleConnections,proto3" json:"max_idle_connections,omitempty"` // 连接池最大空闲连接数
MaxOpenConnections int32 `protobuf:"varint,33,opt,name=max_open_connections,json=maxOpenConnections,proto3" json:"max_open_connections,omitempty"` // 连接池最大打开连接数
BlockBufferSize int32 `protobuf:"varint,40,opt,name=block_buffer_size,json=blockBufferSize,proto3" json:"block_buffer_size,omitempty"` // 数据块缓冲区大小
MaxCompressionBuffer int32 `protobuf:"varint,41,opt,name=max_compression_buffer,json=maxCompressionBuffer,proto3" json:"max_compression_buffer,omitempty"` // 最大压缩缓冲区大小
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Addresses []string `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty"` // 对端网络地址
Database *string `protobuf:"bytes,2,opt,name=database,proto3,oneof" json:"database,omitempty"` // 数据库名
Username *string `protobuf:"bytes,3,opt,name=username,proto3,oneof" json:"username,omitempty"` // 用户名
Password *string `protobuf:"bytes,4,opt,name=password,proto3,oneof" json:"password,omitempty"` // 密码
Debug *bool `protobuf:"varint,5,opt,name=debug,proto3,oneof" json:"debug,omitempty"` // 调试开关
Scheme *string `protobuf:"bytes,6,opt,name=scheme,proto3,oneof" json:"scheme,omitempty"` // 协议http、https、native
Tls *TLS `protobuf:"bytes,7,opt,name=tls,proto3,oneof" json:"tls,omitempty"` // TLS配置
BlockBufferSize *int32 `protobuf:"varint,8,opt,name=block_buffer_size,json=blockBufferSize,proto3,oneof" json:"block_buffer_size,omitempty"` // 数据块缓冲区大小
CompressionMethod *string `protobuf:"bytes,10,opt,name=compression_method,json=compressionMethod,proto3,oneof" json:"compression_method,omitempty"` // 压缩方法zstd、lz4、lz4hc、gzip、deflate、br、none
CompressionLevel *int32 `protobuf:"varint,11,opt,name=compression_level,json=compressionLevel,proto3,oneof" json:"compression_level,omitempty"` // 压缩级别0-9
MaxCompressionBuffer *int32 `protobuf:"varint,12,opt,name=max_compression_buffer,json=maxCompressionBuffer,proto3,oneof" json:"max_compression_buffer,omitempty"` // 最大压缩缓冲区大小
ConnectionOpenStrategy *string `protobuf:"bytes,20,opt,name=connection_open_strategy,json=connectionOpenStrategy,proto3,oneof" json:"connection_open_strategy,omitempty"` // 连接打开策略in_order、round_robin、random
DialTimeout *durationpb.Duration `protobuf:"bytes,30,opt,name=dial_timeout,json=dialTimeout,proto3,oneof" json:"dial_timeout,omitempty"` // 连接超时时间
ReadTimeout *durationpb.Duration `protobuf:"bytes,31,opt,name=read_timeout,json=readTimeout,proto3,oneof" json:"read_timeout,omitempty"` // 读取超时时间
ConnMaxLifetime *durationpb.Duration `protobuf:"bytes,32,opt,name=conn_max_lifetime,json=connMaxLifetime,proto3,oneof" json:"conn_max_lifetime,omitempty"` // 连接可重用的最大时间长度
MaxIdleConns *int32 `protobuf:"varint,40,opt,name=max_idle_conns,json=maxIdleConns,proto3,oneof" json:"max_idle_conns,omitempty"` // 连接池最大空闲连接数
MaxOpenConns *int32 `protobuf:"varint,41,opt,name=max_open_conns,json=maxOpenConns,proto3,oneof" json:"max_open_conns,omitempty"` // 连接池最大打开连接数
Dsn *string `protobuf:"bytes,50,opt,name=dsn,proto3,oneof" json:"dsn,omitempty"` // 数据源名称DSN字符串
HttpProxy *string `protobuf:"bytes,60,opt,name=http_proxy,json=httpProxy,proto3,oneof" json:"http_proxy,omitempty"` // HTTP代理地址
EnableTracing *bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3,oneof" json:"enable_tracing,omitempty"` // 打开链路追踪
EnableMetrics *bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3,oneof" json:"enable_metrics,omitempty"` // 打开性能度量
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Data_ClickHouse) Reset() {
@@ -660,36 +668,36 @@ func (x *Data_ClickHouse) GetAddresses() []string {
}
func (x *Data_ClickHouse) GetDatabase() string {
if x != nil {
return x.Database
if x != nil && x.Database != nil {
return *x.Database
}
return ""
}
func (x *Data_ClickHouse) GetUsername() string {
if x != nil {
return x.Username
if x != nil && x.Username != nil {
return *x.Username
}
return ""
}
func (x *Data_ClickHouse) GetPassword() string {
if x != nil {
return x.Password
if x != nil && x.Password != nil {
return *x.Password
}
return ""
}
func (x *Data_ClickHouse) GetDebug() bool {
if x != nil {
return x.Debug
if x != nil && x.Debug != nil {
return *x.Debug
}
return false
}
func (x *Data_ClickHouse) GetProtocol() string {
if x != nil {
return x.Protocol
func (x *Data_ClickHouse) GetScheme() string {
if x != nil && x.Scheme != nil {
return *x.Scheme
}
return ""
}
@@ -701,16 +709,37 @@ func (x *Data_ClickHouse) GetTls() *TLS {
return nil
}
func (x *Data_ClickHouse) GetBlockBufferSize() int32 {
if x != nil && x.BlockBufferSize != nil {
return *x.BlockBufferSize
}
return 0
}
func (x *Data_ClickHouse) GetCompressionMethod() string {
if x != nil {
return x.CompressionMethod
if x != nil && x.CompressionMethod != nil {
return *x.CompressionMethod
}
return ""
}
func (x *Data_ClickHouse) GetConnOpenStrategy() string {
if x != nil {
return x.ConnOpenStrategy
func (x *Data_ClickHouse) GetCompressionLevel() int32 {
if x != nil && x.CompressionLevel != nil {
return *x.CompressionLevel
}
return 0
}
func (x *Data_ClickHouse) GetMaxCompressionBuffer() int32 {
if x != nil && x.MaxCompressionBuffer != nil {
return *x.MaxCompressionBuffer
}
return 0
}
func (x *Data_ClickHouse) GetConnectionOpenStrategy() string {
if x != nil && x.ConnectionOpenStrategy != nil {
return *x.ConnectionOpenStrategy
}
return ""
}
@@ -722,79 +751,58 @@ func (x *Data_ClickHouse) GetDialTimeout() *durationpb.Duration {
return nil
}
func (x *Data_ClickHouse) GetConnMaxLifeTime() *durationpb.Duration {
func (x *Data_ClickHouse) GetReadTimeout() *durationpb.Duration {
if x != nil {
return x.ConnMaxLifeTime
return x.ReadTimeout
}
return nil
}
func (x *Data_ClickHouse) GetConnectionMaxLifetime() *durationpb.Duration {
func (x *Data_ClickHouse) GetConnMaxLifetime() *durationpb.Duration {
if x != nil {
return x.ConnectionMaxLifetime
return x.ConnMaxLifetime
}
return nil
}
func (x *Data_ClickHouse) GetMaxExecutionTime() int32 {
if x != nil {
return x.MaxExecutionTime
func (x *Data_ClickHouse) GetMaxIdleConns() int32 {
if x != nil && x.MaxIdleConns != nil {
return *x.MaxIdleConns
}
return 0
}
func (x *Data_ClickHouse) GetMaxOpenConns() int32 {
if x != nil {
return x.MaxOpenConns
if x != nil && x.MaxOpenConns != nil {
return *x.MaxOpenConns
}
return 0
}
func (x *Data_ClickHouse) GetMaxIdleConns() int32 {
if x != nil {
return x.MaxIdleConns
func (x *Data_ClickHouse) GetDsn() string {
if x != nil && x.Dsn != nil {
return *x.Dsn
}
return 0
return ""
}
func (x *Data_ClickHouse) GetMaxIdleConnections() int32 {
if x != nil {
return x.MaxIdleConnections
func (x *Data_ClickHouse) GetHttpProxy() string {
if x != nil && x.HttpProxy != nil {
return *x.HttpProxy
}
return 0
}
func (x *Data_ClickHouse) GetMaxOpenConnections() int32 {
if x != nil {
return x.MaxOpenConnections
}
return 0
}
func (x *Data_ClickHouse) GetBlockBufferSize() int32 {
if x != nil {
return x.BlockBufferSize
}
return 0
}
func (x *Data_ClickHouse) GetMaxCompressionBuffer() int32 {
if x != nil {
return x.MaxCompressionBuffer
}
return 0
return ""
}
func (x *Data_ClickHouse) GetEnableTracing() bool {
if x != nil {
return x.EnableTracing
if x != nil && x.EnableTracing != nil {
return *x.EnableTracing
}
return false
}
func (x *Data_ClickHouse) GetEnableMetrics() bool {
if x != nil {
return x.EnableMetrics
if x != nil && x.EnableMetrics != nil {
return *x.EnableMetrics
}
return false
}
@@ -1835,7 +1843,7 @@ var File_conf_v1_kratos_conf_data_proto protoreflect.FileDescriptor
const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
"\n" +
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\x995\n" +
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\xc38\n" +
"\x04Data\x124\n" +
"\bdatabase\x18\x01 \x01(\v2\x13.conf.Data.DatabaseH\x00R\bdatabase\x88\x01\x01\x12+\n" +
"\x05redis\x18\n" +
@@ -1886,14 +1894,16 @@ const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
"\fread_timeout\x183 \x01(\v2\x19.google.protobuf.DurationR\vreadTimeout\x12>\n" +
"\rwrite_timeout\x184 \x01(\v2\x19.google.protobuf.DurationR\fwriteTimeout\x12%\n" +
"\x0eenable_tracing\x18d \x01(\bR\renableTracing\x12&\n" +
"\x0eenable_metrics\x18\xe9\a \x01(\bR\renableMetrics\x1a\x99\b\n" +
"\x0eenable_metrics\x18\xe9\a \x01(\bR\renableMetrics\x1a\xc7\b\n" +
"\aMongoDB\x12\x10\n" +
"\x03uri\x18\x01 \x01(\tR\x03uri\x12\x1f\n" +
"\busername\x18\x02 \x01(\tH\x00R\busername\x88\x01\x01\x12\x1f\n" +
"\bpassword\x18\x03 \x01(\tH\x01R\bpassword\x88\x01\x01\x12*\n" +
"\x0eauth_mechanism\x18\x04 \x01(\tH\x02R\rauthMechanism\x88\x01\x01\x12k\n" +
"\x19auth_mechanism_properties\x18\x05 \x03(\v2/.conf.Data.MongoDB.AuthMechanismPropertiesEntryR\x17authMechanismProperties\x12$\n" +
"\vauth_source\x18\x06 \x01(\tH\x03R\n" +
"\bdatabase\x18\x02 \x01(\tH\x00R\bdatabase\x88\x01\x01\x12\x1f\n" +
"\busername\x18\n" +
" \x01(\tH\x01R\busername\x88\x01\x01\x12\x1f\n" +
"\bpassword\x18\v \x01(\tH\x02R\bpassword\x88\x01\x01\x12*\n" +
"\x0eauth_mechanism\x18\x14 \x01(\tH\x03R\rauthMechanism\x88\x01\x01\x12k\n" +
"\x19auth_mechanism_properties\x18\x15 \x03(\v2/.conf.Data.MongoDB.AuthMechanismPropertiesEntryR\x17authMechanismProperties\x12$\n" +
"\vauth_source\x18\x16 \x01(\tH\x04R\n" +
"authSource\x88\x01\x01\x12B\n" +
"\x0fconnect_timeout\x182 \x01(\v2\x19.google.protobuf.DurationR\x0econnectTimeout\x12H\n" +
"\x12heartbeat_interval\x183 \x01(\v2\x19.google.protobuf.DurationR\x11heartbeatInterval\x12B\n" +
@@ -1908,34 +1918,58 @@ const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
"\x1cAuthMechanismPropertiesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01B\v\n" +
"\t_databaseB\v\n" +
"\t_usernameB\v\n" +
"\t_passwordB\x11\n" +
"\x0f_auth_mechanismB\x0e\n" +
"\f_auth_source\x1a\x91\a\n" +
"\f_auth_source\x1a\x8d\n" +
"\n" +
"\n" +
"ClickHouse\x12\x1c\n" +
"\taddresses\x18\x01 \x03(\tR\taddresses\x12\x1a\n" +
"\bdatabase\x18\x02 \x01(\tR\bdatabase\x12\x1a\n" +
"\busername\x18\x03 \x01(\tR\busername\x12\x1a\n" +
"\bpassword\x18\x04 \x01(\tR\bpassword\x12\x14\n" +
"\x05debug\x18\x05 \x01(\bR\x05debug\x12\x1a\n" +
"\bprotocol\x18\x06 \x01(\tR\bprotocol\x12\x1b\n" +
"\x03tls\x18\a \x01(\v2\t.conf.TLSR\x03tls\x12-\n" +
"\taddresses\x18\x01 \x03(\tR\taddresses\x12\x1f\n" +
"\bdatabase\x18\x02 \x01(\tH\x00R\bdatabase\x88\x01\x01\x12\x1f\n" +
"\busername\x18\x03 \x01(\tH\x01R\busername\x88\x01\x01\x12\x1f\n" +
"\bpassword\x18\x04 \x01(\tH\x02R\bpassword\x88\x01\x01\x12\x19\n" +
"\x05debug\x18\x05 \x01(\bH\x03R\x05debug\x88\x01\x01\x12\x1b\n" +
"\x06scheme\x18\x06 \x01(\tH\x04R\x06scheme\x88\x01\x01\x12 \n" +
"\x03tls\x18\a \x01(\v2\t.conf.TLSH\x05R\x03tls\x88\x01\x01\x12/\n" +
"\x11block_buffer_size\x18\b \x01(\x05H\x06R\x0fblockBufferSize\x88\x01\x01\x122\n" +
"\x12compression_method\x18\n" +
" \x01(\tR\x11compressionMethod\x12,\n" +
"\x12conn_open_strategy\x18\v \x01(\tR\x10connOpenStrategy\x12<\n" +
"\fdial_timeout\x18\x14 \x01(\v2\x19.google.protobuf.DurationR\vdialTimeout\x12F\n" +
"\x12conn_max_life_time\x18\x15 \x01(\v2\x19.google.protobuf.DurationR\x0fconnMaxLifeTime\x12Q\n" +
"\x17connection_max_lifetime\x18\x16 \x01(\v2\x19.google.protobuf.DurationR\x15connectionMaxLifetime\x12,\n" +
"\x12max_execution_time\x18\x17 \x01(\x05R\x10maxExecutionTime\x12$\n" +
"\x0emax_open_conns\x18\x1e \x01(\x05R\fmaxOpenConns\x12$\n" +
"\x0emax_idle_conns\x18\x1f \x01(\x05R\fmaxIdleConns\x120\n" +
"\x14max_idle_connections\x18 \x01(\x05R\x12maxIdleConnections\x120\n" +
"\x14max_open_connections\x18! \x01(\x05R\x12maxOpenConnections\x12*\n" +
"\x11block_buffer_size\x18( \x01(\x05R\x0fblockBufferSize\x124\n" +
"\x16max_compression_buffer\x18) \x01(\x05R\x14maxCompressionBuffer\x12%\n" +
"\x0eenable_tracing\x18d \x01(\bR\renableTracing\x12%\n" +
"\x0eenable_metrics\x18e \x01(\bR\renableMetrics\x1a\xe5\x02\n" +
" \x01(\tH\aR\x11compressionMethod\x88\x01\x01\x120\n" +
"\x11compression_level\x18\v \x01(\x05H\bR\x10compressionLevel\x88\x01\x01\x129\n" +
"\x16max_compression_buffer\x18\f \x01(\x05H\tR\x14maxCompressionBuffer\x88\x01\x01\x12=\n" +
"\x18connection_open_strategy\x18\x14 \x01(\tH\n" +
"R\x16connectionOpenStrategy\x88\x01\x01\x12A\n" +
"\fdial_timeout\x18\x1e \x01(\v2\x19.google.protobuf.DurationH\vR\vdialTimeout\x88\x01\x01\x12A\n" +
"\fread_timeout\x18\x1f \x01(\v2\x19.google.protobuf.DurationH\fR\vreadTimeout\x88\x01\x01\x12J\n" +
"\x11conn_max_lifetime\x18 \x01(\v2\x19.google.protobuf.DurationH\rR\x0fconnMaxLifetime\x88\x01\x01\x12)\n" +
"\x0emax_idle_conns\x18( \x01(\x05H\x0eR\fmaxIdleConns\x88\x01\x01\x12)\n" +
"\x0emax_open_conns\x18) \x01(\x05H\x0fR\fmaxOpenConns\x88\x01\x01\x12\x15\n" +
"\x03dsn\x182 \x01(\tH\x10R\x03dsn\x88\x01\x01\x12\"\n" +
"\n" +
"http_proxy\x18< \x01(\tH\x11R\thttpProxy\x88\x01\x01\x12*\n" +
"\x0eenable_tracing\x18d \x01(\bH\x12R\renableTracing\x88\x01\x01\x12*\n" +
"\x0eenable_metrics\x18e \x01(\bH\x13R\renableMetrics\x88\x01\x01B\v\n" +
"\t_databaseB\v\n" +
"\t_usernameB\v\n" +
"\t_passwordB\b\n" +
"\x06_debugB\t\n" +
"\a_schemeB\x06\n" +
"\x04_tlsB\x14\n" +
"\x12_block_buffer_sizeB\x15\n" +
"\x13_compression_methodB\x14\n" +
"\x12_compression_levelB\x19\n" +
"\x17_max_compression_bufferB\x1b\n" +
"\x19_connection_open_strategyB\x0f\n" +
"\r_dial_timeoutB\x0f\n" +
"\r_read_timeoutB\x14\n" +
"\x12_conn_max_lifetimeB\x11\n" +
"\x0f_max_idle_connsB\x11\n" +
"\x0f_max_open_connsB\x06\n" +
"\x04_dsnB\r\n" +
"\v_http_proxyB\x11\n" +
"\x0f_enable_tracingB\x11\n" +
"\x0f_enable_metrics\x1a\xe5\x02\n" +
"\bInfluxDB\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x14\n" +
"\x05token\x18\x02 \x01(\tR\x05token\x12\x1f\n" +
@@ -2118,8 +2152,8 @@ var file_conf_v1_kratos_conf_data_proto_depIdxs = []int32{
19, // 28: conf.Data.MongoDB.timeout:type_name -> google.protobuf.Duration
20, // 29: conf.Data.ClickHouse.tls:type_name -> conf.TLS
19, // 30: conf.Data.ClickHouse.dial_timeout:type_name -> google.protobuf.Duration
19, // 31: conf.Data.ClickHouse.conn_max_life_time:type_name -> google.protobuf.Duration
19, // 32: conf.Data.ClickHouse.connection_max_lifetime:type_name -> google.protobuf.Duration
19, // 31: conf.Data.ClickHouse.read_timeout:type_name -> google.protobuf.Duration
19, // 32: conf.Data.ClickHouse.conn_max_lifetime:type_name -> google.protobuf.Duration
19, // 33: conf.Data.InfluxDB.timeout:type_name -> google.protobuf.Duration
19, // 34: conf.Data.InfluxDB.idle_connection_timeout:type_name -> google.protobuf.Duration
19, // 35: conf.Data.ElasticSearch.discover_nodes_interval:type_name -> google.protobuf.Duration
@@ -2146,6 +2180,7 @@ func file_conf_v1_kratos_conf_data_proto_init() {
file_conf_v1_kratos_conf_data_proto_msgTypes[0].OneofWrappers = []any{}
file_conf_v1_kratos_conf_data_proto_msgTypes[1].OneofWrappers = []any{}
file_conf_v1_kratos_conf_data_proto_msgTypes[3].OneofWrappers = []any{}
file_conf_v1_kratos_conf_data_proto_msgTypes[4].OneofWrappers = []any{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{

View File

@@ -191,6 +191,7 @@ type Logger_Logrus struct {
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
ForceColors bool `protobuf:"varint,6,opt,name=force_colors,json=forceColors,proto3" json:"force_colors,omitempty"` // 是否开启彩色日志
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -260,6 +261,13 @@ func (x *Logger_Logrus) GetDisableTimestamp() bool {
return false
}
func (x *Logger_Logrus) GetForceColors() bool {
if x != nil {
return x.ForceColors
}
return false
}
// Fluent
type Logger_Fluent struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -447,7 +455,7 @@ var File_conf_v1_kratos_conf_logger_proto protoreflect.FileDescriptor
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\n" +
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xc4\a\n" +
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xe7\a\n" +
"\x06Logger\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
@@ -461,13 +469,14 @@ const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
"\vmax_backups\x18\x05 \x01(\x05R\n" +
"maxBackups\x1a\xbb\x01\n" +
"maxBackups\x1a\xde\x01\n" +
"\x06Logrus\x12\x14\n" +
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x1a$\n" +
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x12!\n" +
"\fforce_colors\x18\x06 \x01(\bR\vforceColors\x1a$\n" +
"\x06Fluent\x12\x1a\n" +
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
"\x06Aliyun\x12\x1a\n" +

View File

@@ -48,12 +48,14 @@ message Data {
message MongoDB {
string uri = 1;
optional string username = 2;
optional string password = 3;
optional string database = 2;
optional string auth_mechanism = 4; // 认证机制SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
map<string, string> auth_mechanism_properties = 5; // 认证机制属性
optional string auth_source = 6; // 认证源admin、$external等
optional string username = 10;
optional string password = 11;
optional string auth_mechanism = 20; // 认证机制SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
map<string, string> auth_mechanism_properties = 21; // 认证机制属性
optional string auth_source = 22; // 认证源admin、$external等
google.protobuf.Duration connect_timeout = 50; // 连接超时时间
google.protobuf.Duration heartbeat_interval = 51; // 心跳间隔
@@ -72,33 +74,36 @@ message Data {
message ClickHouse {
repeated string addresses = 1; // 对端网络地址
string database = 2; // 数据库名
string username = 3; // 用户名
string password = 4; // 密码
optional string database = 2; // 数据库名
optional string username = 3; // 用户名
optional string password = 4; // 密码
bool debug = 5; // 调试开关
string protocol = 6; // 协议http、https、tcp、native
optional bool debug = 5; // 调试开关
optional string scheme = 6; // 协议http、https、native
TLS tls = 7; // TLS配置
optional TLS tls = 7; // TLS配置
string compression_method = 10; // 压缩方法lz4、zstd、none
string conn_open_strategy = 11; // 连接打开策略default、lazy、always
optional int32 block_buffer_size = 8; // 数据块缓冲区大小
google.protobuf.Duration dial_timeout = 20;
google.protobuf.Duration conn_max_life_time = 21;
google.protobuf.Duration connection_max_lifetime = 22; // 连接可重用的最大时间长度
int32 max_execution_time = 23; // 最大执行时间(秒)
optional string compression_method = 10; // 压缩方法zstd、lz4、lz4hc、gzip、deflate、br、none
optional int32 compression_level = 11; // 压缩级别0-9
optional int32 max_compression_buffer = 12; // 最大压缩缓冲区大小
int32 max_open_conns = 30; // 连接池最大打开连接数
int32 max_idle_conns = 31; // 连接池最大空闲连接数
int32 max_idle_connections = 32; // 连接池最大空闲连接数
int32 max_open_connections = 33; // 连接池最大打开连接数
optional string connection_open_strategy = 20; // 连接打开策略in_order、round_robin、random
int32 block_buffer_size = 40; // 数据块缓冲区大小
int32 max_compression_buffer = 41; // 最大压缩缓冲区大小
optional google.protobuf.Duration dial_timeout = 30; // 连接超时时间
optional google.protobuf.Duration read_timeout = 31; // 读取超时时间
optional google.protobuf.Duration conn_max_lifetime = 32; // 连接可重用的最大时间长度
bool enable_tracing = 100; // 打开链路追踪
bool enable_metrics = 101; // 打开性能度量
optional int32 max_idle_conns = 40; // 连接池最大空闲连接数
optional int32 max_open_conns = 41; // 连接池最大打开连接数
optional string dsn = 50; // 数据源名称DSN字符串
optional string http_proxy = 60; // HTTP代理地址
optional bool enable_tracing = 100; // 打开链路追踪
optional bool enable_metrics = 101; // 打开性能度量
}
// InfluxDB

View File

@@ -22,6 +22,7 @@ message Logger {
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
bool disable_colors = 4; // 不需要彩色日志
bool disable_timestamp = 5; // 不需要时间戳
bool force_colors = 6; // 是否开启彩色日志
}
// Fluent

View File

@@ -0,0 +1,18 @@
# ClickHouse
## Docker部署
```bash
docker pull bitnami/clickhouse:latest
docker run -itd \
--name clickhouse-server \
--network=app-tier \
-p 8123:8123 \
-p 9000:9000 \
-p 9004:9004 \
-e ALLOW_EMPTY_PASSWORD=no \
-e CLICKHOUSE_ADMIN_USER=default \
-e CLICKHOUSE_ADMIN_PASSWORD=123456 \
bitnami/clickhouse:latest
```

View File

@@ -0,0 +1,199 @@
package clickhouse
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"sync"
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
// BatchInserter 批量插入器
type BatchInserter struct {
conn clickhouseV2.Conn
tableName string
columns []string
batchSize int
rows []interface{}
insertStmt string
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
// NewBatchInserter 创建新的批量插入器
func NewBatchInserter(
ctx context.Context,
conn clickhouseV2.Conn,
tableName string,
batchSize int,
columns []string,
) (*BatchInserter, error) {
if batchSize <= 0 {
batchSize = 1000 // 默认批量大小
}
if len(columns) == 0 {
return nil, errors.New("必须指定列名")
}
// 构建INSERT语句
placeholders := make([]string, len(columns))
for i := range placeholders {
placeholders[i] = "?"
}
insertStmt := fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s)",
tableName,
strings.Join(columns, ", "),
strings.Join(placeholders, ", "),
)
ctx, cancel := context.WithCancel(ctx)
return &BatchInserter{
conn: conn,
tableName: tableName,
columns: columns,
batchSize: batchSize,
rows: make([]interface{}, 0, batchSize),
insertStmt: insertStmt,
ctx: ctx,
cancel: cancel,
}, nil
}
// Add 添加数据行
func (bi *BatchInserter) Add(row interface{}) error {
bi.mu.Lock()
defer bi.mu.Unlock()
// 检查上下文是否已取消
if bi.ctx.Err() != nil {
return bi.ctx.Err()
}
bi.rows = append(bi.rows, row)
// 达到批量大小时自动提交
if len(bi.rows) >= bi.batchSize {
return bi.flush()
}
return nil
}
// Flush 强制提交当前批次
func (bi *BatchInserter) Flush() error {
bi.mu.Lock()
defer bi.mu.Unlock()
return bi.flush()
}
// Close 关闭插入器并提交剩余数据
func (bi *BatchInserter) Close() error {
defer bi.cancel()
bi.mu.Lock()
defer bi.mu.Unlock()
return bi.flush()
}
// flush 内部提交方法
func (bi *BatchInserter) flush() error {
if len(bi.rows) == 0 {
return nil
}
// 创建批量
batch, err := bi.conn.PrepareBatch(bi.ctx, bi.insertStmt)
if err != nil {
return ErrBatchPrepareFailed
}
// 添加所有行
for _, row := range bi.rows {
// 使用反射获取字段值
if err = appendStructToBatch(batch, row, bi.columns); err != nil {
return ErrBatchAppendFailed
}
}
// 提交批量
if err = batch.Send(); err != nil {
return ErrBatchSendFailed
}
// 清空批次
bi.rows = bi.rows[:0]
return nil
}
// appendStructToBatch 使用反射将结构体字段添加到批次
func appendStructToBatch(batch driverV2.Batch, obj interface{}, columns []string) error {
v := reflect.ValueOf(obj)
// 如果是指针,获取指针指向的值
if v.Kind() == reflect.Ptr {
if v.IsNil() {
return errors.New("nil指针")
}
v = v.Elem()
}
// 必须是结构体
if v.Kind() != reflect.Struct {
return fmt.Errorf("期望结构体类型,得到 %v", v.Kind())
}
// 获取结构体类型
t := v.Type()
// 准备参数值
values := make([]interface{}, len(columns))
// 映射列名到结构体字段
for i, col := range columns {
// 查找匹配的字段
found := false
for j := 0; j < v.NumField(); j++ {
field := t.Field(j)
// 检查ch标签
if tag := field.Tag.Get("ch"); strings.TrimSpace(tag) == col {
values[i] = v.Field(j).Interface()
found = true
break
}
// 检查json标签
jsonTags := strings.Split(field.Tag.Get("json"), ",")
if len(jsonTags) > 0 && strings.TrimSpace(jsonTags[0]) == col {
values[i] = v.Field(j).Interface()
found = true
break
}
// 检查字段名
if field.Name == col {
values[i] = v.Field(j).Interface()
found = true
break
}
}
if !found {
return fmt.Errorf("未找到列 %s 对应的结构体字段", col)
}
}
// 添加到批次
return batch.Append(values...)
}

View File

@@ -1,55 +1,632 @@
package clickhouse
import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"net/url"
"reflect"
"strings"
"github.com/ClickHouse/clickhouse-go/v2"
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/go-kratos/kratos/v2/log"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
"github.com/tx7do/kratos-bootstrap/utils"
)
func NewClickHouseClient(cfg *conf.Bootstrap, l *log.Helper) clickhouse.Conn {
type Creator func() any
var compressionMap = map[string]clickhouseV2.CompressionMethod{
"none": clickhouseV2.CompressionNone,
"zstd": clickhouseV2.CompressionZSTD,
"lz4": clickhouseV2.CompressionLZ4,
"lz4hc": clickhouseV2.CompressionLZ4HC,
"gzip": clickhouseV2.CompressionGZIP,
"deflate": clickhouseV2.CompressionDeflate,
"br": clickhouseV2.CompressionBrotli,
}
type Client struct {
log *log.Helper
conn clickhouseV2.Conn
db *sql.DB
}
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
c := &Client{
log: log.NewHelper(log.With(logger, "module", "clickhouse-client")),
}
if err := c.createClickHouseClient(cfg); err != nil {
return nil, err
}
return c, nil
}
// createClickHouseClient 创建ClickHouse客户端
func (c *Client) createClickHouseClient(cfg *conf.Bootstrap) error {
if cfg.Data == nil || cfg.Data.Clickhouse == nil {
l.Warn("ClickHouse config is nil")
return nil
}
options := &clickhouse.Options{
Addr: []string{cfg.Data.Clickhouse.Address},
Auth: clickhouse.Auth{
Database: cfg.Data.Clickhouse.Database,
Username: cfg.Data.Clickhouse.Username,
Password: cfg.Data.Clickhouse.Password,
},
Debug: cfg.Data.Clickhouse.Debug,
DialTimeout: cfg.Data.Clickhouse.DialTimeout.AsDuration(),
MaxOpenConns: int(cfg.Data.Clickhouse.MaxOpenConns),
MaxIdleConns: int(cfg.Data.Clickhouse.MaxIdleConns),
ConnMaxLifetime: cfg.Data.Clickhouse.ConnMaxLifeTime.AsDuration(),
opts := &clickhouseV2.Options{}
if cfg.Data.Clickhouse.Dsn != nil {
tmp, err := clickhouseV2.ParseDSN(cfg.Data.Clickhouse.GetDsn())
if err != nil {
c.log.Errorf("failed to parse clickhouse DSN: %v", err)
return ErrInvalidDSN
}
opts = tmp
}
if cfg.Data.Clickhouse.Addresses != nil {
opts.Addr = cfg.Data.Clickhouse.GetAddresses()
}
if cfg.Data.Clickhouse.Database != nil ||
cfg.Data.Clickhouse.Username != nil ||
cfg.Data.Clickhouse.Password != nil {
opts.Auth = clickhouseV2.Auth{}
if cfg.Data.Clickhouse.Database != nil {
opts.Auth.Database = cfg.Data.Clickhouse.GetDatabase()
}
if cfg.Data.Clickhouse.Username != nil {
opts.Auth.Username = cfg.Data.Clickhouse.GetUsername()
}
if cfg.Data.Clickhouse.Password != nil {
opts.Auth.Password = cfg.Data.Clickhouse.GetPassword()
}
}
if cfg.Data.Clickhouse.Debug != nil {
opts.Debug = cfg.Data.Clickhouse.GetDebug()
}
if cfg.Data.Clickhouse.MaxOpenConns != nil {
opts.MaxOpenConns = int(cfg.Data.Clickhouse.GetMaxOpenConns())
}
if cfg.Data.Clickhouse.MaxIdleConns != nil {
opts.MaxIdleConns = int(cfg.Data.Clickhouse.GetMaxIdleConns())
}
// 设置ssl
if cfg.Data.Clickhouse.Tls != nil {
var tlsCfg *tls.Config
var err error
if tlsCfg, err = utils.LoadServerTlsConfig(cfg.Data.Clickhouse.Tls); err != nil {
if tlsCfg, err = utils.LoadServerTlsConfig(cfg.Server.Grpc.Tls); err != nil {
panic(err)
}
if tlsCfg != nil {
options.TLS = tlsCfg
opts.TLS = tlsCfg
}
}
conn, err := clickhouse.Open(options)
if err != nil {
l.Fatalf("failed opening connection to clickhouse: %v", err)
return nil
if cfg.Data.Clickhouse.CompressionMethod != nil || cfg.Data.Clickhouse.CompressionLevel != nil {
opts.Compression = &clickhouseV2.Compression{}
if cfg.Data.Clickhouse.GetCompressionMethod() != "" {
opts.Compression.Method = compressionMap[cfg.Data.Clickhouse.GetCompressionMethod()]
}
if cfg.Data.Clickhouse.CompressionLevel != nil {
opts.Compression.Level = int(cfg.Data.Clickhouse.GetCompressionLevel())
}
}
if cfg.Data.Clickhouse.MaxCompressionBuffer != nil {
opts.MaxCompressionBuffer = int(cfg.Data.Clickhouse.GetMaxCompressionBuffer())
}
return conn
if cfg.Data.Clickhouse.DialTimeout != nil {
opts.DialTimeout = cfg.Data.Clickhouse.GetDialTimeout().AsDuration()
}
if cfg.Data.Clickhouse.ReadTimeout != nil {
opts.ReadTimeout = cfg.Data.Clickhouse.GetReadTimeout().AsDuration()
}
if cfg.Data.Clickhouse.ConnMaxLifetime != nil {
opts.ConnMaxLifetime = cfg.Data.Clickhouse.GetConnMaxLifetime().AsDuration()
}
if cfg.Data.Clickhouse.HttpProxy != nil {
proxyURL, err := url.Parse(cfg.Data.Clickhouse.GetHttpProxy())
if err != nil {
c.log.Errorf("failed to parse HTTP proxy URL: %v", err)
return ErrInvalidProxyURL
}
opts.HTTPProxyURL = proxyURL
}
if cfg.Data.Clickhouse.ConnectionOpenStrategy != nil {
strategy := clickhouseV2.ConnOpenInOrder
switch cfg.Data.Clickhouse.GetConnectionOpenStrategy() {
case "in_order":
strategy = clickhouseV2.ConnOpenInOrder
case "round_robin":
strategy = clickhouseV2.ConnOpenRoundRobin
case "random":
strategy = clickhouseV2.ConnOpenRandom
}
opts.ConnOpenStrategy = strategy
}
if cfg.Data.Clickhouse.Scheme != nil {
switch cfg.Data.Clickhouse.GetScheme() {
case "http":
opts.Protocol = clickhouseV2.HTTP
case "https":
opts.Protocol = clickhouseV2.HTTP
default:
opts.Protocol = clickhouseV2.Native
}
}
if cfg.Data.Clickhouse.BlockBufferSize != nil {
opts.BlockBufferSize = uint8(cfg.Data.Clickhouse.GetBlockBufferSize())
}
// 创建ClickHouse连接
conn, err := clickhouseV2.Open(opts)
if err != nil {
c.log.Errorf("failed to create clickhouse client: %v", err)
return ErrConnectionFailed
}
c.conn = conn
return nil
}
// Close 关闭ClickHouse客户端连接
func (c *Client) Close() {
if c.conn == nil {
c.log.Warn("clickhouse client is already closed or not initialized")
return
}
if err := c.conn.Close(); err != nil {
c.log.Errorf("failed to close clickhouse client: %v", err)
} else {
c.log.Info("clickhouse client closed successfully")
}
}
// GetServerVersion 获取ClickHouse服务器版本
func (c *Client) GetServerVersion() string {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ""
}
version, err := c.conn.ServerVersion()
if err != nil {
c.log.Errorf("failed to get server version: %v", err)
return ""
} else {
c.log.Infof("ClickHouse server version: %s", version)
return version.String()
}
}
// CheckConnection 检查ClickHouse客户端连接是否正常
func (c *Client) CheckConnection(ctx context.Context) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.Ping(ctx); err != nil {
c.log.Errorf("ping failed: %v", err)
return ErrPingFailed
}
c.log.Info("clickhouse client connection is healthy")
return nil
}
// Query 执行查询并返回结果
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if creator == nil {
c.log.Error("creator function cannot be nil")
return ErrCreatorFunctionNil
}
rows, err := c.conn.Query(ctx, query, args...)
if err != nil {
c.log.Errorf("query failed: %v", err)
return ErrQueryExecutionFailed
}
defer func(rows driverV2.Rows) {
if err = rows.Close(); err != nil {
c.log.Errorf("failed to close rows: %v", err)
}
}(rows)
for rows.Next() {
row := creator()
if err = rows.ScanStruct(row); err != nil {
c.log.Errorf("failed to scan row: %v", err)
return ErrRowScanFailed
}
*results = append(*results, row)
}
// 检查是否有未处理的错误
if rows.Err() != nil {
c.log.Errorf("Rows iteration error: %v", rows.Err())
return ErrRowsIterationError
}
return nil
}
// QueryRow 执行查询并返回单行结果
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error {
row := c.conn.QueryRow(ctx, query, args...)
if row == nil {
c.log.Error("query row returned nil")
return ErrRowNotFound
}
if err := row.ScanStruct(dest); err != nil {
c.log.Errorf("")
return ErrRowScanFailed
}
return nil
}
// Select 封装 SELECT 子句
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
err := c.conn.Select(ctx, dest, query, args...)
if err != nil {
c.log.Errorf("select failed: %v", err)
return ErrQueryExecutionFailed
}
return nil
}
// Exec 执行非查询语句
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.Exec(ctx, query, args...); err != nil {
c.log.Errorf("exec failed: %v", err)
return ErrExecutionFailed
}
return nil
}
func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
val := reflect.ValueOf(data)
if val.Kind() != reflect.Ptr || val.IsNil() {
return "", "", nil, fmt.Errorf("data must be a non-nil pointer")
}
val = val.Elem()
typ := val.Type()
columns := make([]string, 0, typ.NumField())
placeholders := make([]string, 0, typ.NumField())
values := make([]any, 0, typ.NumField())
values = structToValueArray(data)
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
// 优先获取 `ch` 标签,其次获取 `json` 标签,最后使用字段名
columnName := field.Tag.Get("ch")
if columnName == "" {
jsonTag := field.Tag.Get("json")
if jsonTag != "" {
tags := strings.Split(jsonTag, ",") // 只取逗号前的部分
if len(tags) > 0 {
columnName = tags[0]
}
}
}
if columnName == "" {
columnName = field.Name
}
//columnName = strings.TrimSpace(columnName)
columns = append(columns, columnName)
placeholders = append(placeholders, "?")
}
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
}
// Insert 插入数据到指定表
func (c *Client) Insert(ctx context.Context, tableName string, in any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
columns, placeholders, values, err := c.prepareInsertData(in)
if err != nil {
c.log.Errorf("prepare insert in failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行插入操作
if err = c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert failed: %v", err)
return ErrInsertFailed
}
return nil
}
func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行插入操作
if err := c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert many failed: %v", err)
return ErrInsertFailed
}
return nil
}
// AsyncInsert 异步插入数据
func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备插入数据
columns, placeholders, values, err := c.prepareInsertData(data)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行异步插入
if err = c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
return nil
}
// asyncInsert 异步插入数据
func (c *Client) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
return nil
}
// AsyncInsertMany 批量异步插入数据
func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行异步插入操作
if err := c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return err
}
return nil
}
// BatchInsert 批量插入数据
func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var values [][]any
for _, item := range data {
itemColumns, _, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
values = append(values, itemValues)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES", tableName, columns)
// 调用 batchExec 方法执行批量插入
if err := c.batchExec(ctx, query, values); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return ErrBatchInsertFailed
}
return nil
}
// batchExec 执行批量操作
func (c *Client) batchExec(ctx context.Context, query string, data [][]any) error {
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
return ErrBatchPrepareFailed
}
for _, row := range data {
if err = batch.Append(row...); err != nil {
c.log.Errorf("failed to append batch data: %v", err)
return ErrBatchAppendFailed
}
}
if err = batch.Send(); err != nil {
c.log.Errorf("failed to send batch: %v", err)
return ErrBatchSendFailed
}
return nil
}
// BatchStructs 批量插入结构体数据
func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备批量插入
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
return ErrBatchPrepareFailed
}
// 遍历数据并添加到批量插入
for _, row := range data {
if err := batch.AppendStruct(row); err != nil {
c.log.Errorf("failed to append batch struct data: %v", err)
return ErrBatchAppendFailed
}
}
// 发送批量插入
if err = batch.Send(); err != nil {
c.log.Errorf("failed to send batch: %v", err)
return ErrBatchSendFailed
}
return nil
}

View File

@@ -0,0 +1,500 @@
package clickhouse
import (
"context"
"testing"
"time"
"github.com/go-kratos/kratos/v2/log"
"github.com/stretchr/testify/assert"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
)
type Candle struct {
Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
Symbol *string `json:"symbol" ch:"symbol"`
Open *float64 `json:"open" ch:"open"`
High *float64 `json:"high" ch:"high"`
Low *float64 `json:"low" ch:"low"`
Close *float64 `json:"close" ch:"close"`
Volume *float64 `json:"volume" ch:"volume"`
}
func createTestClient() *Client {
cli, _ := NewClient(
log.DefaultLogger,
&conf.Bootstrap{
Data: &conf.Data{
Clickhouse: &conf.Data_ClickHouse{
Addresses: []string{"localhost:9000"},
Database: Ptr("finances"),
Username: Ptr("default"),
Password: Ptr("*Abcd123456"),
},
},
},
)
return cli
}
func createCandlesTable(client *Client) {
// 创建表的 SQL 语句
createTableQuery := `
CREATE TABLE IF NOT EXISTS candles (
timestamp DateTime64(3),
symbol String,
open Float64,
high Float64,
low Float64,
close Float64,
volume Float64
) ENGINE = MergeTree()
ORDER BY timestamp
`
err := client.Exec(context.Background(), createTableQuery)
if err != nil {
log.Errorf("Failed to create candles table: %v", err)
return
}
}
func TestNewClient(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
// 测试 CheckConnection
err := client.CheckConnection(context.Background())
assert.NoError(t, err, "CheckConnection 应该成功执行")
// 测试 GetServerVersion
version := client.GetServerVersion()
assert.NotEmpty(t, version, "GetServerVersion 应该返回非空值")
createCandlesTable(client)
}
func TestInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
candle := &Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
// 插入数据
err := client.Insert(context.Background(), "candles", candle)
assert.NoError(t, err, "InsertCandlesTable 应该成功执行")
}
func TestInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 插入数据
err := client.InsertMany(context.Background(), "candles", data)
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
}
func TestAsyncInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
candle := &Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("BTC/USD"),
Open: Ptr(30000.0),
High: Ptr(31000.0),
Low: Ptr(29000.0),
Close: Ptr(30500.0),
Volume: Ptr(500.0),
}
// 异步插入数据
err := client.AsyncInsert(context.Background(), "candles", candle, true)
assert.NoError(t, err, "AsyncInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "BTC/USD")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "BTC/USD", *result.Symbol, "symbol 列值应该为 BTC/USD")
assert.Equal(t, 30500.0, *result.Close, "close 列值应该为 30500.0")
assert.Equal(t, 500.0, *result.Volume, "volume 列值应该为 500.0")
}
func TestAsyncInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.AsyncInsertMany(context.Background(), "candles", data, true)
assert.NoError(t, err, "AsyncInsertMany 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestInternalBatchExecCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
data := [][]interface{}{
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
}
// 批量插入数据
err := client.batchExec(context.Background(), insertQuery, data)
assert.NoError(t, err, "batchExec 应该成功执行")
}
func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.BatchInsert(context.Background(), "candles", data)
assert.NoError(t, err, "BatchInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestBatchStructsCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 批量插入数据
err := client.BatchStructs(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchStructsCandlesTable 应该成功执行")
}
func TestInternalAsyncInsertIntoCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
}
func TestQueryCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 查询数据的 SQL 语句
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
// 定义结果集
var results []any
// 执行查询
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
candle, ok := result.(*Candle)
assert.True(t, ok, "结果应该是 Candle 类型")
assert.NotNil(t, candle.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, candle.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, candle.Open, "Open 列不应该为 nil")
assert.NotNil(t, candle.High, "High 列不应该为 nil")
assert.NotNil(t, candle.Low, "Low 列不应该为 nil")
assert.NotNil(t, candle.Close, "Close 列不应该为 nil")
assert.NotNil(t, candle.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
candle.Timestamp.String(),
*candle.Symbol,
*candle.Open, *candle.High, *candle.Low, *candle.Close, *candle.Volume,
)
}
}
func TestSelectCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 查询数据的 SQL 语句
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
// 定义结果集
var results []Candle
// 执行查询
err := client.Select(context.Background(), &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
assert.NotNil(t, result.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, result.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, result.Open, "Open 列不应该为 nil")
assert.NotNil(t, result.High, "High 列不应该为 nil")
assert.NotNil(t, result.Low, "Low 列不应该为 nil")
assert.NotNil(t, result.Close, "Close 列不应该为 nil")
assert.NotNil(t, result.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
}
func TestQueryRow(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入测试数据
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "数据插入失败")
// 查询单行数据
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "AAPL")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "AAPL", *result.Symbol, "symbol 列值应该为 AAPL")
assert.Equal(t, 100.5, *result.Open, "open 列值应该为 100.5")
assert.Equal(t, 1500.0, *result.Volume, "volume 列值应该为 1500.0")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
func TestDropCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
// 删除表的 SQL 语句
dropTableQuery := `DROP TABLE IF EXISTS candles`
// 执行删除表操作
err := client.Exec(context.Background(), dropTableQuery)
assert.NoError(t, err, "DropCandlesTable 应该成功执行")
}
func TestAggregateCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 聚合查询的 SQL 语句
query := `
SELECT symbol,
MAX(high) AS max_high,
MIN(low) AS min_low,
AVG(close) AS avg_close,
SUM(volume) AS total_volume
FROM candles
GROUP BY symbol
`
// 定义结果集
var results []struct {
Symbol string `ch:"symbol"`
MaxHigh float64 `ch:"max_high"`
MinLow float64 `ch:"min_low"`
AvgClose float64 `ch:"avg_close"`
TotalVolume float64 `ch:"total_volume"`
}
// 执行查询
err := client.Select(context.Background(), &results, query)
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
}

View File

@@ -0,0 +1,89 @@
package clickhouse
import "github.com/go-kratos/kratos/v2/errors"
var (
// ErrInvalidColumnName is returned when an invalid column name is used.
ErrInvalidColumnName = errors.InternalServer("INVALID_COLUMN_NAME", "invalid column name")
// ErrInvalidTableName is returned when an invalid table name is used.
ErrInvalidTableName = errors.InternalServer("INVALID_TABLE_NAME", "invalid table name")
// ErrInvalidCondition is returned when an invalid condition is used in a query.
ErrInvalidCondition = errors.InternalServer("INVALID_CONDITION", "invalid condition in query")
// ErrQueryExecutionFailed is returned when a query execution fails.
ErrQueryExecutionFailed = errors.InternalServer("QUERY_EXECUTION_FAILED", "query execution failed")
// ErrExecutionFailed is returned when a general execution fails.
ErrExecutionFailed = errors.InternalServer("EXECUTION_FAILED", "execution failed")
// ErrAsyncInsertFailed is returned when an asynchronous insert operation fails.
ErrAsyncInsertFailed = errors.InternalServer("ASYNC_INSERT_FAILED", "async insert operation failed")
// ErrRowScanFailed is returned when scanning rows from a query result fails.
ErrRowScanFailed = errors.InternalServer("ROW_SCAN_FAILED", "row scan failed")
// ErrRowsIterationError is returned when there is an error iterating over rows.
ErrRowsIterationError = errors.InternalServer("ROWS_ITERATION_ERROR", "rows iteration error")
// ErrRowNotFound is returned when a specific row is not found in the result set.
ErrRowNotFound = errors.InternalServer("ROW_NOT_FOUND", "row not found")
// ErrConnectionFailed is returned when the connection to ClickHouse fails.
ErrConnectionFailed = errors.InternalServer("CONNECTION_FAILED", "failed to connect to ClickHouse")
// ErrDatabaseNotFound is returned when the specified database is not found.
ErrDatabaseNotFound = errors.InternalServer("DATABASE_NOT_FOUND", "specified database not found")
// ErrTableNotFound is returned when the specified table is not found.
ErrTableNotFound = errors.InternalServer("TABLE_NOT_FOUND", "specified table not found")
// ErrInsertFailed is returned when an insert operation fails.
ErrInsertFailed = errors.InternalServer("INSERT_FAILED", "insert operation failed")
// ErrUpdateFailed is returned when an update operation fails.
ErrUpdateFailed = errors.InternalServer("UPDATE_FAILED", "update operation failed")
// ErrDeleteFailed is returned when a delete operation fails.
ErrDeleteFailed = errors.InternalServer("DELETE_FAILED", "delete operation failed")
// ErrTransactionFailed is returned when a transaction fails.
ErrTransactionFailed = errors.InternalServer("TRANSACTION_FAILED", "transaction failed")
// ErrClientNotInitialized is returned when the ClickHouse client is not initialized.
ErrClientNotInitialized = errors.InternalServer("CLIENT_NOT_INITIALIZED", "clickhouse client not initialized")
// ErrGetServerVersionFailed is returned when getting the server version fails.
ErrGetServerVersionFailed = errors.InternalServer("GET_SERVER_VERSION_FAILED", "failed to get server version")
// ErrPingFailed is returned when a ping to the ClickHouse server fails.
ErrPingFailed = errors.InternalServer("PING_FAILED", "ping to ClickHouse server failed")
// ErrCreatorFunctionNil is returned when the creator function is nil.
ErrCreatorFunctionNil = errors.InternalServer("CREATOR_FUNCTION_NIL", "creator function cannot be nil")
// ErrBatchPrepareFailed is returned when a batch prepare operation fails.
ErrBatchPrepareFailed = errors.InternalServer("BATCH_PREPARE_FAILED", "batch prepare operation failed")
// ErrBatchSendFailed is returned when a batch send operation fails.
ErrBatchSendFailed = errors.InternalServer("BATCH_SEND_FAILED", "batch send operation failed")
// ErrBatchAppendFailed is returned when appending to a batch fails.
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
// ErrBatchInsertFailed is returned when a batch insert operation fails.
ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")
// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")
// ErrInvalidColumnData is returned when the column data type is invalid.
ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")
)

View File

@@ -7,27 +7,32 @@ toolchain go1.23.3
replace github.com/tx7do/kratos-bootstrap/api => ../../api
require (
github.com/ClickHouse/clickhouse-go/v2 v2.35.0
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
github.com/go-kratos/kratos/v2 v2.8.4
github.com/tx7do/kratos-bootstrap/api v0.0.21
github.com/stretchr/testify v1.10.0
github.com/tx7do/kratos-bootstrap/api v0.0.27
github.com/tx7do/kratos-bootstrap/utils v0.1.3
)
require (
github.com/ClickHouse/ch-go v0.66.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/ClickHouse/ch-go v0.66.1 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
golang.org/x/sys v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -1,9 +1,9 @@
github.com/ClickHouse/ch-go v0.66.0 h1:hLslxxAVb2PHpbHr4n0d6aP8CEIpUYGMVT1Yj/Q5Img=
github.com/ClickHouse/ch-go v0.66.0/go.mod h1:noiHWyLMJAZ5wYuq3R/K0TcRhrNA8h7o1AqHX0klEhM=
github.com/ClickHouse/clickhouse-go/v2 v2.35.0 h1:ZMLZqxu+NiW55f4JS32kzyEbMb7CthGn3ziCcULOvSE=
github.com/ClickHouse/clickhouse-go/v2 v2.35.0/go.mod h1:O2FFT/rugdpGEW2VKyEGyMUWyQU0ahmenY9/emxLPxs=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4=
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -15,6 +15,8 @@ github.com/go-kratos/kratos/v2 v2.8.4 h1:eIJLE9Qq9WSoKx+Buy2uPyrahtF/lPh+Xf4MTpx
github.com/go-kratos/kratos/v2 v2.8.4/go.mod h1:mq62W2101a5uYyRxe+7IdWubu7gZCGYqSNKwGFiiRcw=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -65,10 +67,10 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -80,12 +82,14 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -99,6 +103,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@@ -107,6 +113,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=

View File

@@ -0,0 +1,246 @@
package clickhouse
import (
"fmt"
"regexp"
"strings"
"github.com/go-kratos/kratos/v2/log"
)
type QueryBuilder struct {
table string
columns []string
distinct bool
conditions []string
orderBy []string
groupBy []string
having []string
joins []string
with []string
union []string
limit int
offset int
params []interface{} // 用于存储参数
useIndex string // 索引提示
cacheResult bool // 是否缓存查询结果
debug bool // 是否启用调试
log *log.Helper
}
// NewQueryBuilder 创建一个新的 QueryBuilder 实例
func NewQueryBuilder(table string, log *log.Helper) *QueryBuilder {
return &QueryBuilder{
log: log,
table: table,
params: []interface{}{},
}
}
// EnableDebug 启用调试模式
func (qb *QueryBuilder) EnableDebug() *QueryBuilder {
qb.debug = true
return qb
}
// logDebug 打印调试信息
func (qb *QueryBuilder) logDebug(message string) {
if qb.debug {
qb.log.Debug("[QueryBuilder Debug]:", message)
}
}
// Select 设置查询的列
func (qb *QueryBuilder) Select(columns ...string) *QueryBuilder {
for _, column := range columns {
if !isValidIdentifier(column) {
panic("Invalid column name")
}
}
qb.columns = columns
return qb
}
// Distinct 设置 DISTINCT 查询
func (qb *QueryBuilder) Distinct() *QueryBuilder {
qb.distinct = true
return qb
}
// Where 添加查询条件并支持参数化
func (qb *QueryBuilder) Where(condition string, args ...interface{}) *QueryBuilder {
if !isValidCondition(condition) {
panic("Invalid condition")
}
qb.conditions = append(qb.conditions, condition)
qb.params = append(qb.params, args...)
return qb
}
// OrderBy 设置排序条件
func (qb *QueryBuilder) OrderBy(order string) *QueryBuilder {
qb.orderBy = append(qb.orderBy, order)
return qb
}
// GroupBy 设置分组条件
func (qb *QueryBuilder) GroupBy(columns ...string) *QueryBuilder {
qb.groupBy = append(qb.groupBy, columns...)
return qb
}
// Having 添加分组后的过滤条件并支持参数化
func (qb *QueryBuilder) Having(condition string, args ...interface{}) *QueryBuilder {
qb.having = append(qb.having, condition)
qb.params = append(qb.params, args...)
return qb
}
// Join 添加 JOIN 操作
func (qb *QueryBuilder) Join(joinType, table, onCondition string) *QueryBuilder {
join := fmt.Sprintf("%s JOIN %s ON %s", joinType, table, onCondition)
qb.joins = append(qb.joins, join)
return qb
}
// With 添加 WITH 子句
func (qb *QueryBuilder) With(expression string) *QueryBuilder {
qb.with = append(qb.with, expression)
return qb
}
// Union 添加 UNION 操作
func (qb *QueryBuilder) Union(query string) *QueryBuilder {
qb.union = append(qb.union, query)
return qb
}
// Limit 设置查询结果的限制数量
func (qb *QueryBuilder) Limit(limit int) *QueryBuilder {
qb.limit = limit
return qb
}
// Offset 设置查询结果的偏移量
func (qb *QueryBuilder) Offset(offset int) *QueryBuilder {
qb.offset = offset
return qb
}
// UseIndex 设置索引提示
func (qb *QueryBuilder) UseIndex(index string) *QueryBuilder {
qb.useIndex = index
return qb
}
// CacheResult 启用查询结果缓存
func (qb *QueryBuilder) CacheResult() *QueryBuilder {
qb.cacheResult = true
return qb
}
// ArrayJoin 添加 ARRAY JOIN 子句
func (qb *QueryBuilder) ArrayJoin(expression string) *QueryBuilder {
qb.joins = append(qb.joins, fmt.Sprintf("ARRAY JOIN %s", expression))
return qb
}
// Final 添加 FINAL 修饰符
func (qb *QueryBuilder) Final() *QueryBuilder {
qb.table = fmt.Sprintf("%s FINAL", qb.table)
return qb
}
// Sample 添加 SAMPLE 子句
func (qb *QueryBuilder) Sample(sampleRate float64) *QueryBuilder {
qb.table = fmt.Sprintf("%s SAMPLE %f", qb.table, sampleRate)
return qb
}
// LimitBy 添加 LIMIT BY 子句
func (qb *QueryBuilder) LimitBy(limit int, columns ...string) *QueryBuilder {
qb.limit = limit
qb.orderBy = append(qb.orderBy, fmt.Sprintf("LIMIT BY %d (%s)", limit, strings.Join(columns, ", ")))
return qb
}
// PreWhere 添加 PREWHERE 子句
func (qb *QueryBuilder) PreWhere(condition string, args ...interface{}) *QueryBuilder {
qb.conditions = append([]string{condition}, qb.conditions...)
qb.params = append(args, qb.params...)
return qb
}
// Format 添加 FORMAT 子句
func (qb *QueryBuilder) Format(format string) *QueryBuilder {
qb.union = append(qb.union, fmt.Sprintf("FORMAT %s", format))
return qb
}
// Build 构建最终的 SQL 查询
func (qb *QueryBuilder) Build() (string, []interface{}) {
query := ""
if qb.cacheResult {
query += "/* CACHE */ "
}
query += "SELECT "
if qb.distinct {
query += "DISTINCT "
}
query += qb.buildColumns()
query += fmt.Sprintf(" FROM %s", qb.table)
if qb.useIndex != "" {
query += fmt.Sprintf(" USE INDEX (%s)", qb.useIndex)
}
if len(qb.conditions) > 0 {
query += fmt.Sprintf(" WHERE %s", strings.Join(qb.conditions, " AND "))
}
if len(qb.groupBy) > 0 {
query += fmt.Sprintf(" GROUP BY %s", strings.Join(qb.groupBy, ", "))
}
if len(qb.having) > 0 {
query += fmt.Sprintf(" HAVING %s", strings.Join(qb.having, " AND "))
}
if len(qb.orderBy) > 0 {
query += fmt.Sprintf(" ORDER BY %s", strings.Join(qb.orderBy, ", "))
}
if qb.limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qb.limit)
}
if qb.offset > 0 {
query += fmt.Sprintf(" OFFSET %d", qb.offset)
}
return query, qb.params
}
func (qb *QueryBuilder) buildColumns() string {
if len(qb.columns) == 0 {
return "*"
}
return strings.Join(qb.columns, ", ")
}
// isValidIdentifier 验证表名或列名是否合法
func isValidIdentifier(identifier string) bool {
// 仅允许字母、数字、下划线,且不能以数字开头
matched, _ := regexp.MatchString(`^[a-zA-Z_][a-zA-Z0-9_]*$`, identifier)
return matched
}
// isValidCondition 验证条件语句是否合法
func isValidCondition(condition string) bool {
// 简单验证条件中是否包含危险字符
return !strings.Contains(condition, ";") && !strings.Contains(condition, "--")
}

View File

@@ -0,0 +1,120 @@
package clickhouse
import (
"testing"
"github.com/go-kratos/kratos/v2/log"
"github.com/stretchr/testify/assert"
)
func TestQueryBuilder(t *testing.T) {
logger := log.NewHelper(log.DefaultLogger)
qb := NewQueryBuilder("test_table", logger)
// 测试 Select 方法
qb.Select("id", "name")
query, params := qb.Build()
assert.Contains(t, query, "SELECT id, name FROM test_table")
// 测试 Distinct 方法
qb.Distinct()
query, _ = qb.Build()
assert.Contains(t, query, "SELECT DISTINCT id, name FROM test_table")
// 测试 Where 方法
qb.Where("id > ?", 10).Where("name = ?", "example")
query, params = qb.Build()
assert.Contains(t, query, "WHERE id > ? AND name = ?")
assert.Equal(t, []interface{}{10, "example"}, params)
// 测试 OrderBy 方法
qb.OrderBy("name ASC")
query, _ = qb.Build()
assert.Contains(t, query, "ORDER BY name ASC")
// 测试 GroupBy 方法
qb.GroupBy("category")
query, _ = qb.Build()
assert.Contains(t, query, "GROUP BY category")
// 测试 Having 方法
qb.Having("COUNT(id) > ?", 5)
query, params = qb.Build()
assert.Contains(t, query, "HAVING COUNT(id) > ?")
assert.Equal(t, []interface{}{10, "example", 5}, params)
// 测试 Join 方法
qb.Join("INNER", "other_table", "test_table.id = other_table.id")
query, _ = qb.Build()
assert.Contains(t, query, "INNER JOIN other_table ON test_table.id = other_table.id")
// 测试 With 方法
qb.With("temp AS (SELECT id FROM another_table WHERE status = 'active')")
query, _ = qb.Build()
assert.Contains(t, query, "WITH temp AS (SELECT id FROM another_table WHERE status = 'active')")
// 测试 Union 方法
qb.Union("SELECT id FROM another_table")
query, _ = qb.Build()
assert.Contains(t, query, "UNION SELECT id FROM another_table")
// 测试 Limit 和 Offset 方法
qb.Limit(10).Offset(20)
query, _ = qb.Build()
assert.Contains(t, query, "LIMIT 10 OFFSET 20")
// 测试 UseIndex 方法
qb.UseIndex("idx_name")
query, _ = qb.Build()
assert.Contains(t, query, "USE INDEX (idx_name)")
// 测试 CacheResult 方法
qb.CacheResult()
query, _ = qb.Build()
assert.Contains(t, query, "/* CACHE */")
// 测试 EnableDebug 方法
qb.EnableDebug()
assert.True(t, qb.debug)
// 测试 ArrayJoin 方法
qb.ArrayJoin("array_column")
query, _ = qb.Build()
assert.Contains(t, query, "ARRAY JOIN array_column")
// 测试 Final 方法
qb.Final()
query, _ = qb.Build()
assert.Contains(t, query, "test_table FINAL")
// 测试 Sample 方法
qb.Sample(0.1)
query, _ = qb.Build()
assert.Contains(t, query, "test_table SAMPLE 0.100000")
// 测试 LimitBy 方法
qb.LimitBy(5, "name")
query, _ = qb.Build()
assert.Contains(t, query, "LIMIT BY 5 (name)")
// 测试 PreWhere 方法
qb.PreWhere("status = ?", "active")
query, params = qb.Build()
assert.Contains(t, query, "PREWHERE status = ?")
assert.Equal(t, []interface{}{"active"}, params)
// 测试 Format 方法
qb.Format("JSON")
query, _ = qb.Build()
assert.Contains(t, query, "FORMAT JSON")
// 测试边界情况:空列名
assert.Panics(t, func() {
qb.Select("")
}, "应该抛出异常:无效的列名")
// 测试边界情况:无效条件
assert.Panics(t, func() {
qb.Where("id = 1; DROP TABLE test_table")
}, "应该抛出异常:无效的条件")
}

View File

@@ -0,0 +1,146 @@
package clickhouse
import (
"database/sql"
"reflect"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
timeFormat = "2006-01-02 15:04:05.000000000"
)
func structToValueArray(input any) []any {
// 检查是否是指针类型,如果是则解引用
val := reflect.ValueOf(input)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
// 确保输入是结构体
if val.Kind() != reflect.Struct {
return nil
}
var values []any
for i := 0; i < val.NumField(); i++ {
value := val.Field(i).Interface()
switch v := value.(type) {
case *sql.NullString:
if v.Valid {
values = append(values, v.String)
} else {
values = append(values, nil)
}
case *sql.NullInt64:
if v.Valid {
values = append(values, v.Int64)
} else {
values = append(values, nil)
}
case *sql.NullFloat64:
if v.Valid {
values = append(values, v.Float64)
} else {
values = append(values, nil)
}
case *sql.NullBool:
if v.Valid {
values = append(values, v.Bool)
} else {
values = append(values, nil)
}
case *sql.NullTime:
if v != nil && v.Valid {
values = append(values, v.Time.Format(timeFormat))
} else {
values = append(values, nil)
}
case *time.Time:
if v != nil {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil)
}
case time.Time:
// 处理 time.Time 类型
if !v.IsZero() {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case timestamppb.Timestamp:
// 处理 timestamppb.Timestamp 类型
if !v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *timestamppb.Timestamp:
// 处理 *timestamppb.Timestamp 类型
if v != nil && v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case durationpb.Duration:
// 处理 timestamppb.Duration 类型
if v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *durationpb.Duration:
// 处理 *timestamppb.Duration 类型
if v != nil && v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case []any:
// 处理切片类型
if len(v) > 0 {
for _, item := range v {
if item == nil {
values = append(values, nil)
} else {
values = append(values, item)
}
}
} else {
values = append(values, nil) // 如果切片为空,插入 NULL
}
case [][]any:
// 处理二维切片类型
if len(v) > 0 {
for _, item := range v {
if len(item) > 0 {
values = append(values, item)
} else {
values = append(values, nil) // 如果子切片为空,插入 NULL
}
}
} else {
values = append(values, nil) // 如果二维切片为空,插入 NULL
}
default:
values = append(values, v)
}
}
return values
}

View File

@@ -0,0 +1,83 @@
package clickhouse
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// Ptr returns a pointer to the provided value.
func Ptr[T any](v T) *T {
return &v
}
func TestStructToValueArrayWithCandle(t *testing.T) {
now := time.Now()
candle := Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}
func TestStructToValueArrayWithCandlePtr(t *testing.T) {
now := time.Now()
candle := &Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}

View File

@@ -17,39 +17,16 @@
## Docker部署
### 拉取镜像
```bash
docker pull bitnami/elasticsearch:latest
```
### 启动容器
```bash
docker run -itd \
--name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e ELASTICSEARCH_USERNAME=elastic \
-e ELASTICSEARCH_PASSWORD=elastic \
-e xpack.security.enabled=true \
-e discovery.type=single-node \
-e http.cors.enabled=true \
-e http.cors.allow-origin=http://localhost:13580,http://127.0.0.1:13580 \
-e http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization \
-e http.cors.allow-credentials=true \
-e ELASTICSEARCH_NODE_NAME=elasticsearch-node-1 \
-e ELASTICSEARCH_CLUSTER_NAME=elasticsearch-cluster \
bitnami/elasticsearch:latest
```
安装管理工具:
```bash
docker pull appbaseio/dejavu:latest
docker run -itd \
--name dejavu-test \
-p 13580:1358 \
appbaseio/dejavu:latest
```
访问管理工具:<http://localhost:13580/>

View File

@@ -6,8 +6,6 @@ import (
"encoding/json"
"io"
"github.com/go-kratos/kratos/v2/encoding"
_ "github.com/go-kratos/kratos/v2/encoding/json"
"github.com/go-kratos/kratos/v2/log"
elasticsearchV9 "github.com/elastic/go-elasticsearch/v9"
@@ -18,15 +16,13 @@ import (
)
type Client struct {
cli *elasticsearchV9.Client
log *log.Helper
codec encoding.Codec
cli *elasticsearchV9.Client
log *log.Helper
}
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
c := &Client{
log: log.NewHelper(log.With(logger, "module", "elasticsearch-client")),
codec: encoding.GetCodec("json"),
log: log.NewHelper(log.With(logger, "module", "elasticsearch-client")),
}
if err := c.createESClient(cfg); err != nil {

View File

@@ -10,7 +10,7 @@ require (
github.com/elastic/go-elasticsearch/v9 v9.0.0
github.com/go-kratos/kratos/v2 v2.8.4
github.com/stretchr/testify v1.10.0
github.com/tx7do/kratos-bootstrap/api v0.0.25
github.com/tx7do/kratos-bootstrap/api v0.0.27
)
require (
@@ -19,12 +19,13 @@ require (
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.yaml.in/yaml/v3 v3.0.3 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect

View File

@@ -722,8 +722,8 @@ github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6
github.com/google/gnostic v0.7.0 h1:d7EpuFp8vVdML+y0JJJYiKeOLjKTdH/GvVkLOBWqJpw=
github.com/google/gnostic v0.7.0/go.mod h1:IAcUyMl6vtC95f60EZ8oXyqTsOersP6HbwjeG7EyDPM=
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M=
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -891,6 +891,8 @@ go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mx
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View File

@@ -3,11 +3,8 @@ package influxdb
import (
"context"
"github.com/go-kratos/kratos/v2/encoding"
_ "github.com/go-kratos/kratos/v2/encoding/json"
"github.com/go-kratos/kratos/v2/log"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"github.com/go-kratos/kratos/v2/log"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
)
@@ -15,14 +12,12 @@ import (
type Client struct {
cli *influxdb3.Client
log *log.Helper
codec encoding.Codec
log *log.Helper
}
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
c := &Client{
log: log.NewHelper(log.With(logger, "module", "influxdb-client")),
codec: encoding.GetCodec("json"),
log: log.NewHelper(log.With(logger, "module", "influxdb-client")),
}
if err := c.createInfluxdbClient(cfg); err != nil {
@@ -87,6 +82,31 @@ func (c *Client) Query(ctx context.Context, query string) (*influxdb3.QueryItera
return result, nil
}
func (c *Client) QueryWithParams(
ctx context.Context,
table string,
filters map[string]interface{},
operators map[string]string,
fields []string,
) (*influxdb3.QueryIterator, error) {
if c.cli == nil {
return nil, ErrInfluxDBClientNotInitialized
}
query := BuildQueryWithParams(table, filters, operators, fields)
result, err := c.cli.Query(
ctx,
query,
influxdb3.WithQueryType(influxdb3.InfluxQL),
)
if err != nil {
c.log.Errorf("failed to query data: %v", err)
return nil, ErrInfluxDBQueryFailed
}
return result, nil
}
// Insert 插入数据
func (c *Client) Insert(ctx context.Context, point *influxdb3.Point) error {
if c.cli == nil {

View File

@@ -11,7 +11,7 @@ require (
github.com/go-kratos/kratos/v2 v2.8.4
github.com/stretchr/testify v1.10.0
github.com/tx7do/go-utils v1.1.29
github.com/tx7do/kratos-bootstrap/api v0.0.25
github.com/tx7do/kratos-bootstrap/api v0.0.27
google.golang.org/protobuf v1.36.6
)

208
database/influxdb/utils.go Normal file
View File

@@ -0,0 +1,208 @@
package influxdb
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"google.golang.org/protobuf/types/known/timestamppb"
)
func BuildQuery(
table string,
filters map[string]interface{},
operators map[string]string,
fields []string,
) (string, []interface{}) {
var queryBuilder strings.Builder
args := make([]interface{}, 0)
// 构建 SELECT 语句
queryBuilder.WriteString("SELECT ")
if len(fields) > 0 {
queryBuilder.WriteString(strings.Join(fields, ", "))
} else {
queryBuilder.WriteString("*")
}
queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table))
// 构建 WHERE 条件
if len(filters) > 0 {
queryBuilder.WriteString(" WHERE ")
var conditions []string
var operator string
for key, value := range filters {
operator = "=" // 默认操作符
if op, exists := operators[key]; exists {
operator = op
}
conditions = append(conditions, fmt.Sprintf("%s %s ?", key, operator))
args = append(args, value)
}
queryBuilder.WriteString(strings.Join(conditions, " AND "))
}
return queryBuilder.String(), args
}
func GetPointTag(point *influxdb3.Point, name string) *string {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
return &tagValue
}
func GetBoolPointTag(point *influxdb3.Point, name string) *bool {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value := tagValue == "true"
return &value
}
func GetUint32PointTag(point *influxdb3.Point, name string) *uint32 {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value, err := strconv.ParseUint(tagValue, 10, 64)
if err != nil {
return nil
}
value32 := uint32(value)
return &value32
}
func GetUint64PointTag(point *influxdb3.Point, name string) *uint64 {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value, err := strconv.ParseUint(tagValue, 10, 64)
if err != nil {
return nil
}
return &value
}
func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
enumValue, exists := valueMap[tagValue]
if !exists {
return nil
}
enumType := T(enumValue)
return &enumType
}
func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp {
if point == nil {
return nil
}
value := point.GetField(name)
if value == nil {
return nil
}
if timestamp, ok := value.(*timestamppb.Timestamp); ok {
return timestamp
}
if timeValue, ok := value.(time.Time); ok {
return timestamppb.New(timeValue)
}
return nil
}
func GetUint32Field(point *influxdb3.Point, name string) *uint32 {
if point == nil {
return nil
}
value := point.GetUIntegerField(name)
if value == nil {
return nil
}
uint32Value := uint32(*value)
if uint32Value == 0 {
return nil
}
return &uint32Value
}
func BoolToString(value *bool) string {
if value == nil {
return "false"
}
if *value {
return "true"
}
return "false"
}
func Uint64ToString(value *uint64) string {
if value == nil {
return "0"
}
return fmt.Sprintf("%d", *value)
}
func BuildQueryWithParams(
table string,
filters map[string]interface{},
operators map[string]string,
fields []string,
) string {
var queryBuilder strings.Builder
// 构建 SELECT 语句
queryBuilder.WriteString("SELECT ")
if len(fields) > 0 {
queryBuilder.WriteString(strings.Join(fields, ", "))
} else {
queryBuilder.WriteString("*")
}
queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table))
// 构建 WHERE 条件
if len(filters) > 0 {
var operator string
queryBuilder.WriteString(" WHERE ")
var conditions []string
for key, value := range filters {
operator = "=" // 默认操作符
if op, exists := operators[key]; exists {
operator = op
}
conditions = append(conditions, fmt.Sprintf("%s %s %v", key, operator, value))
}
queryBuilder.WriteString(strings.Join(conditions, " AND "))
}
return queryBuilder.String()
}

View File

@@ -0,0 +1,153 @@
package influxdb
import (
"reflect"
"testing"
)
func TestBuildQuery(t *testing.T) {
tests := []struct {
name string
table string
filters map[string]interface{}
operators map[string]string
fields []string
expectedQuery string
expectedArgs []interface{}
}{
{
name: "Basic query with filters and fields",
table: "candles",
filters: map[string]interface{}{"s": "AAPL", "o": 150.0},
fields: []string{"s", "o", "h", "l", "c", "v"},
expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = ? AND o = ?",
expectedArgs: []interface{}{"AAPL", 150.0},
},
{
name: "Query with no filters",
table: "candles",
filters: map[string]interface{}{},
fields: []string{"s", "o", "h"},
expectedQuery: "SELECT s, o, h FROM candles",
expectedArgs: []interface{}{},
},
{
name: "Query with no fields",
table: "candles",
filters: map[string]interface{}{"s": "AAPL"},
fields: []string{},
expectedQuery: "SELECT * FROM candles WHERE s = ?",
expectedArgs: []interface{}{"AAPL"},
},
{
name: "Empty table name",
table: "",
filters: map[string]interface{}{"s": "AAPL"},
fields: []string{"s", "o"},
expectedQuery: "SELECT s, o FROM WHERE s = ?",
expectedArgs: []interface{}{"AAPL"},
},
{
name: "Special characters in filters",
table: "candles",
filters: map[string]interface{}{"name": "O'Reilly"},
fields: []string{"name"},
expectedQuery: "SELECT name FROM candles WHERE name = ?",
expectedArgs: []interface{}{"O'Reilly"},
},
{
name: "Query with interval filters",
table: "candles",
filters: map[string]interface{}{"time": "now() - interval '15 minutes'"},
fields: []string{"*"},
operators: map[string]string{"time": ">="},
expectedQuery: "SELECT * FROM candles WHERE time >= ?",
expectedArgs: []interface{}{"now() - interval '15 minutes'"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query, args := BuildQuery(tt.table, tt.filters, tt.operators, tt.fields)
if query != tt.expectedQuery {
t.Errorf("expected query %s, got %s", tt.expectedQuery, query)
}
if !reflect.DeepEqual(args, tt.expectedArgs) {
t.Errorf("expected args %v, got %v", tt.expectedArgs, args)
}
})
}
}
func TestBuildQueryWithParams(t *testing.T) {
tests := []struct {
name string
table string
filters map[string]interface{}
operators map[string]string
fields []string
expectedQuery string
}{
{
name: "Basic query with filters and fields",
table: "candles",
filters: map[string]interface{}{"s": "'AAPL'", "o": 150.0},
operators: map[string]string{"o": ">"},
fields: []string{"s", "o", "h", "l", "c", "v"},
expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = 'AAPL' AND o > 150",
},
{
name: "Query with no filters",
table: "candles",
filters: map[string]interface{}{},
operators: map[string]string{},
fields: []string{"s", "o", "h"},
expectedQuery: "SELECT s, o, h FROM candles",
},
{
name: "Query with no fields",
table: "candles",
filters: map[string]interface{}{"s": "'AAPL'"},
operators: map[string]string{},
fields: []string{},
expectedQuery: "SELECT * FROM candles WHERE s = 'AAPL'",
},
{
name: "Empty table name",
table: "",
filters: map[string]interface{}{"s": "'AAPL'"},
operators: map[string]string{},
fields: []string{"s", "o"},
expectedQuery: "SELECT s, o FROM WHERE s = 'AAPL'",
},
{
name: "Special characters in filters",
table: "candles",
filters: map[string]interface{}{"name": "'O'Reilly'"},
operators: map[string]string{},
fields: []string{"name"},
expectedQuery: "SELECT name FROM candles WHERE name = 'O'Reilly'",
},
{
name: "Query with interval filters",
table: "candles",
filters: map[string]interface{}{"time": "now() - interval '15 minutes'"},
operators: map[string]string{"time": ">="},
fields: []string{"*"},
expectedQuery: "SELECT * FROM candles WHERE time >= now() - interval '15 minutes'",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := BuildQueryWithParams(tt.table, tt.filters, tt.operators, tt.fields)
if query != tt.expectedQuery {
t.Errorf("expected query %s, got %s", tt.expectedQuery, query)
}
//t.Log(query)
})
}
}

View File

@@ -0,0 +1,51 @@
# MongoDB
## 概念对比
| MongoDB存储结构 | RDBMS存储结构 |
|-------------|-------------|
| database | database |
| collection | table |
| document | row |
| field | column |
| index | 索引 |
| primary key | primary key |
## Docker部署
下载镜像:
```bash
docker pull bitnami/mongodb:latest
docker pull bitnami/mongodb-exporter:latest
```
带密码安装:
```bash
docker run -itd \
--name mongodb-server \
-p 27017:27017 \
-e MONGODB_ROOT_USER=root \
-e MONGODB_ROOT_PASSWORD=123456 \
-e MONGODB_USERNAME=test \
-e MONGODB_PASSWORD=123456 \
-e MONGODB_DATABASE=finances \
bitnami/mongodb:latest
```
不带密码安装:
```bash
docker run -itd \
--name mongodb-server \
-p 27017:27017 \
-e ALLOW_EMPTY_PASSWORD=yes \
bitnami/mongodb:latest
```
有两点需要注意:
1. 如果需要映射数据卷需要把本地路径的所有权改到1001`sudo chown -R 1001:1001 data/db`,否则会报错:
`mkdir: cannot create directory /bitnami/mongodb: Permission denied`
2. 从MongoDB 5.0开始,有些机器运行会报错:`Illegal instruction`,这是因为机器硬件不支持 **AVX 指令集** 的缘故没办法MongoDB降级吧。

View File

@@ -2,35 +2,172 @@ package mongodb
import (
"context"
"fmt"
"time"
"github.com/go-kratos/kratos/v2/log"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
mongoV2 "go.mongodb.org/mongo-driver/v2/mongo"
optionsV2 "go.mongodb.org/mongo-driver/v2/mongo/options"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
)
// NewMongoClient 创建MongoDB客户端
func NewMongoClient(ctx context.Context, cfg *conf.Bootstrap, l *log.Helper) *mongo.Client {
if cfg.Data == nil || cfg.Data.Mongodb == nil {
l.Warn("Mongodb config is nil")
return nil
}
type Client struct {
log *log.Helper
var opts []*options.ClientOptions
uri := fmt.Sprintf("mongodb://%s:%s@%s",
cfg.Data.Mongodb.Username, cfg.Data.Mongodb.Password, cfg.Data.Mongodb.Address,
)
opts = append(opts, options.Client().ApplyURI(uri))
cli, err := mongo.Connect(ctx, opts...)
if err != nil {
l.Fatalf("failed opening connection to mongodb: %v", err)
return nil
}
return cli
cli *mongoV2.Client
database string
timeout time.Duration
}
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
c := &Client{
log: log.NewHelper(log.With(logger, "module", "mongodb-client")),
}
if err := c.createMongodbClient(cfg); err != nil {
return nil, err
}
return c, nil
}
// createMongodbClient 创建MongoDB客户端
func (c *Client) createMongodbClient(cfg *conf.Bootstrap) error {
if cfg.Data == nil || cfg.Data.Mongodb == nil {
return nil
}
var opts []*optionsV2.ClientOptions
if cfg.Data.Mongodb.GetUri() != "" {
opts = append(opts, optionsV2.Client().ApplyURI(cfg.Data.Mongodb.GetUri()))
}
if cfg.Data.Mongodb.GetUsername() != "" && cfg.Data.Mongodb.GetPassword() != "" {
credential := optionsV2.Credential{
Username: cfg.Data.Mongodb.GetUsername(),
Password: cfg.Data.Mongodb.GetPassword(),
}
if cfg.Data.Mongodb.GetPassword() != "" {
credential.PasswordSet = true
}
opts = append(opts, optionsV2.Client().SetAuth(credential))
}
if cfg.Data.Mongodb.ConnectTimeout != nil {
opts = append(opts, optionsV2.Client().SetConnectTimeout(cfg.Data.Mongodb.GetConnectTimeout().AsDuration()))
}
if cfg.Data.Mongodb.ServerSelectionTimeout != nil {
opts = append(opts, optionsV2.Client().SetServerSelectionTimeout(cfg.Data.Mongodb.GetServerSelectionTimeout().AsDuration()))
}
if cfg.Data.Mongodb.Timeout != nil {
opts = append(opts, optionsV2.Client().SetTimeout(cfg.Data.Mongodb.GetTimeout().AsDuration()))
}
opts = append(opts, optionsV2.Client().SetBSONOptions(&optionsV2.BSONOptions{
UseJSONStructTags: true, // 使用JSON结构标签
}))
cli, err := mongoV2.Connect(opts...)
if err != nil {
c.log.Errorf("failed to create mongodb client: %v", err)
return err
}
c.database = cfg.Data.Mongodb.GetDatabase()
if cfg.Data.Mongodb.GetTimeout() != nil {
c.timeout = cfg.Data.Mongodb.GetTimeout().AsDuration()
} else {
c.timeout = 10 * time.Second // 默认超时时间
}
c.cli = cli
return nil
}
// Close 关闭MongoDB客户端
func (c *Client) Close() {
if c.cli == nil {
c.log.Warn("mongodb client is already closed or not initialized")
return
}
if err := c.cli.Disconnect(context.Background()); err != nil {
c.log.Errorf("failed to disconnect mongodb client: %v", err)
} else {
c.log.Info("mongodb client disconnected successfully")
}
}
// CheckConnect 检查MongoDB连接状态
func (c *Client) CheckConnect() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
if err := c.cli.Ping(ctx, nil); err != nil {
c.log.Errorf("failed to ping mongodb: %v", err)
} else {
c.log.Info("mongodb client is connected")
}
}
// InsertOne 插入单个文档
func (c *Client) InsertOne(ctx context.Context, collection string, document interface{}) (*mongoV2.InsertOneResult, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.cli.Database(c.database).Collection(collection).InsertOne(ctx, document)
}
// InsertMany 插入多个文档
func (c *Client) InsertMany(ctx context.Context, collection string, documents []interface{}) (*mongoV2.InsertManyResult, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.cli.Database(c.database).Collection(collection).InsertMany(ctx, documents)
}
// FindOne 查询单个文档
func (c *Client) FindOne(ctx context.Context, collection string, filter interface{}, result interface{}) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.cli.Database(c.database).Collection(collection).FindOne(ctx, filter).Decode(result)
}
// Find 查询多个文档
func (c *Client) Find(ctx context.Context, collection string, filter interface{}, results interface{}) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
cursor, err := c.cli.Database(c.database).Collection(collection).Find(ctx, filter)
if err != nil {
c.log.Errorf("failed to find documents in collection %s: %v", collection, err)
return err
}
defer func(cursor *mongoV2.Cursor, ctx context.Context) {
if err = cursor.Close(ctx); err != nil {
c.log.Errorf("failed to close cursor: %v", err)
}
}(cursor, ctx)
return cursor.All(ctx, results)
}
// UpdateOne 更新单个文档
func (c *Client) UpdateOne(ctx context.Context, collection string, filter, update interface{}) (*mongoV2.UpdateResult, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.cli.Database(c.database).Collection(collection).UpdateOne(ctx, filter, update)
}
// DeleteOne 删除单个文档
func (c *Client) DeleteOne(ctx context.Context, collection string, filter interface{}) (*mongoV2.DeleteResult, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.cli.Database(c.database).Collection(collection).DeleteOne(ctx, filter)
}

View File

@@ -0,0 +1,66 @@
package mongodb
import (
"context"
"testing"
"time"
"github.com/go-kratos/kratos/v2/log"
"github.com/stretchr/testify/assert"
"github.com/tx7do/go-utils/trans"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Candle struct {
Symbol *string `json:"s"`
Open *float64 `json:"o"`
High *float64 `json:"h"`
Low *float64 `json:"l"`
Close *float64 `json:"c"`
Volume *float64 `json:"v"`
StartTime *timestamppb.Timestamp `json:"st"`
EndTime *timestamppb.Timestamp `json:"et"`
}
func createTestClient() *Client {
cli, _ := NewClient(
log.DefaultLogger,
&conf.Bootstrap{
Data: &conf.Data{
Mongodb: &conf.Data_MongoDB{
Uri: "mongodb://root:123456@127.0.0.1:27017/?compressors=snappy,zlib,zstd",
Database: trans.Ptr("finances"),
},
},
},
)
return cli
}
func TestNewClient(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
client.CheckConnect()
}
func TestInsertOne(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
ctx := context.Background()
candle := Candle{
StartTime: timestamppb.New(time.Now()),
Symbol: trans.Ptr("AAPL"),
Open: trans.Ptr(1.0),
High: trans.Ptr(2.0),
Low: trans.Ptr(3.0),
Close: trans.Ptr(4.0),
Volume: trans.Ptr(1000.0),
}
_, err := client.InsertOne(ctx, "candles", candle)
assert.NoError(t, err)
}

110
database/mongodb/consts.go Normal file
View File

@@ -0,0 +1,110 @@
package mongodb
const (
// 比较操作符
OperatorEq = "$eq" // 等于
OperatorNe = "$ne" // 不等于
OperatorGt = "$gt" // 大于
OperatorGte = "$gte" // 大于等于
OperatorLt = "$lt" // 小于
OperatorLte = "$lte" // 小于等于
// 逻辑操作符
OperatorAnd = "$and" // 与
OperatorOr = "$or" // 或
OperatorNot = "$not" // 非
OperatorNor = "$nor" // 非或
// 元素操作符
OperatorExists = "$exists" // 是否存在
OperatorType = "$type" // 类型
// 评估操作符
OperatorExpr = "$expr" // 表达式
OperatorJsonSchema = "$jsonSchema" // JSON Schema 验证
OperatorMod = "$mod" // 取模
OperatorRegex = "$regex" // 正则表达式
OperatorText = "$text" // 文本搜索
OperatorWhere = "$where" // JavaScript 表达式
OperatorSearch = "$search" // 文本搜索
// 数组操作符
OperatorAll = "$all" // 匹配所有
OperatorElemMatch = "$elemMatch" // 匹配数组中的元素
OperatorSize = "$size" // 数组大小
// 集合操作符
OperatorIn = "$in" // 包含
OperatorNin = "$nin" // 不包含
// 更新操作符
OperatorSet = "$set" // 设置字段值
OperatorUnset = "$unset" // 删除字段
OperatorInc = "$inc" // 增加值
OperatorMul = "$mul" // 乘法
OperatorRename = "$rename" // 重命名字段
OperatorCurrentDate = "$currentDate" // 设置当前日期
OperatorAddToSet = "$addToSet" // 添加到集合
OperatorPop = "$pop" // 删除数组中的元素
OperatorPull = "$pull" // 删除匹配的数组元素
OperatorPush = "$push" // 添加数组元素
OperatorEach = "$each" // 批量添加数组元素
OperatorSlice = "$slice" // 截取数组
OperatorSort = "$sort" // 排序数组
OperatorPosition = "$position" // 指定数组位置
// 聚合操作符
OperatorGroup = "$group" // 分组
OperatorMatch = "$match" // 匹配
OperatorProject = "$project" // 投影
OperatorSortAgg = "$sort" // 排序
OperatorLimit = "$limit" // 限制
OperatorSkip = "$skip" // 跳过
OperatorUnwind = "$unwind" // 拆分数组
OperatorLookup = "$lookup" // 关联查询
OperatorAddFields = "$addFields" // 添加字段
OperatorReplaceRoot = "$replaceRoot" // 替换根字段
OperatorCount = "$count" // 计数
OperatorFacet = "$facet" // 多面查询
OperatorBucket = "$bucket" // 分桶
OperatorBucketAuto = "$bucketAuto" // 自动分桶
OperatorIndexStats = "$indexStats" // 索引统计
OperatorOut = "$out" // 输出
OperatorMerge = "$merge" // 合并
OperatorSum = "$sum" // 求和
OperatorAvg = "$avg" // 平均值
OperatorMin = "$min" // 最小值
OperatorMax = "$max" // 最大值
OperatorFirst = "$first" // 第一个值
OperatorLast = "$last" // 最后一个值
OperatorStdDevPop = "$stdDevPop" // 总体标准差
OperatorStdDevSamp = "$stdDevSamp" // 样本标准差
// 类型转换操作符
OperatorToLong = "$toLong" // 转换为 long 类型
OperatorToDouble = "$toDouble" // 转换为 double 类型
OperatorToDecimal = "$toDecimal" // 转换为 decimal 类型
OperatorToString = "$toString" // 转换为 string 类型
OperatorToDate = "$toDate" // 转换为 date 类型
OperatorToInt = "$toInt" // 转换为 int 类型
// 地理空间操作符
OperatorNear = "$near" // 查询距离某点最近的文档
OperatorNearSphere = "$nearSphere" // 查询距离某点最近的文档(球面距离)
OperatorGeoWithin = "$geoWithin" // 地理范围查询
OperatorGeoIntersects = "$geoIntersects" // 地理相交查询
OperatorGeometry = "$geometry" // 几何图形
OperatorMaxDistance = "$maxDistance" // 最大距离
OperatorMinDistance = "$minDistance" // 最小距离
)

View File

@@ -0,0 +1 @@
package mongodb

View File

@@ -8,20 +8,26 @@ replace github.com/tx7do/kratos-bootstrap/api => ../../api
require (
github.com/go-kratos/kratos/v2 v2.8.4
github.com/tx7do/kratos-bootstrap/api v0.0.21
go.mongodb.org/mongo-driver v1.17.3
github.com/stretchr/testify v1.10.0
github.com/tx7do/go-utils v1.1.29
github.com/tx7do/kratos-bootstrap/api v0.0.27
go.mongodb.org/mongo-driver/v2 v2.2.2
google.golang.org/protobuf v1.36.6
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/text v0.25.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/text v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -1,3 +1,4 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kratos/kratos/v2 v2.8.4 h1:eIJLE9Qq9WSoKx+Buy2uPyrahtF/lPh+Xf4MTpxhmjs=
@@ -6,10 +7,22 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tx7do/go-utils v1.1.29 h1:kO1JDMVX++ZY4+aXGk3pOtDz5WBPDA3LxhIWkzXkvH8=
github.com/tx7do/go-utils v1.1.29/go.mod h1:bmt7c85QmHURtd7h6QOu7k0QKOJTwjJ+cFP29nljdSw=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
@@ -19,20 +32,20 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ=
go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
go.mongodb.org/mongo-driver/v2 v2.2.2 h1:9cYuS3fl1Xhqwpfazso10V7BHQD58kCgtzhfAmJYz9c=
go.mongodb.org/mongo-driver/v2 v2.2.2/go.mod h1:qQkDMhCGWl3FN509DfdPd4GRBLU/41zqF/k8eTRceps=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -44,11 +57,16 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

229
database/mongodb/query.go Normal file
View File

@@ -0,0 +1,229 @@
package mongodb
import (
bsonV2 "go.mongodb.org/mongo-driver/v2/bson"
optionsV2 "go.mongodb.org/mongo-driver/v2/mongo/options"
)
type QueryBuilder struct {
filter bsonV2.M
opts *optionsV2.FindOptions
pipeline []bsonV2.M
}
func NewQuery() *QueryBuilder {
return &QueryBuilder{
filter: bsonV2.M{},
opts: &optionsV2.FindOptions{},
}
}
// SetFilter 设置查询过滤条件
func (qb *QueryBuilder) SetFilter(filter bsonV2.M) *QueryBuilder {
qb.filter = filter
return qb
}
// SetOr 设置多个条件的逻辑或
func (qb *QueryBuilder) SetOr(conditions []bsonV2.M) *QueryBuilder {
qb.filter[OperatorOr] = conditions
return qb
}
// SetAnd 设置多个条件的逻辑与
func (qb *QueryBuilder) SetAnd(conditions []bsonV2.M) *QueryBuilder {
qb.filter[OperatorAnd] = conditions
return qb
}
// SetNotEqual 设置字段的不等于条件
func (qb *QueryBuilder) SetNotEqual(field string, value interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorNe: value}
return qb
}
// SetGreaterThan 设置字段的大于条件
func (qb *QueryBuilder) SetGreaterThan(field string, value interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorGt: value}
return qb
}
// SetLessThan 设置字段的小于条件
func (qb *QueryBuilder) SetLessThan(field string, value interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorLt: value}
return qb
}
// SetExists 设置字段是否存在条件
func (qb *QueryBuilder) SetExists(field string, exists bool) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorExists: exists}
return qb
}
// SetType 设置字段的类型条件
func (qb *QueryBuilder) SetType(field string, typeValue interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorType: typeValue}
return qb
}
// SetBetween 设置字段的范围查询条件
func (qb *QueryBuilder) SetBetween(field string, start, end interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{
OperatorGte: start,
OperatorLte: end,
}
return qb
}
// SetIn 设置字段的包含条件
func (qb *QueryBuilder) SetIn(field string, values []interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorIn: values}
return qb
}
// SetNotIn 设置字段的排除条件
func (qb *QueryBuilder) SetNotIn(field string, values []interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorNin: values}
return qb
}
// SetElemMatch 设置数组字段的匹配条件
func (qb *QueryBuilder) SetElemMatch(field string, match bsonV2.M) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorElemMatch: match}
return qb
}
// SetAll 设置字段必须包含所有指定值的条件
func (qb *QueryBuilder) SetAll(field string, values []interface{}) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorAll: values}
return qb
}
// SetSize 设置数组字段的大小条件
func (qb *QueryBuilder) SetSize(field string, size int) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorSize: size}
return qb
}
// SetCurrentDate 设置字段为当前日期
func (qb *QueryBuilder) SetCurrentDate(field string) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorCurrentDate: true}
return qb
}
// SetTextSearch 设置文本搜索条件
func (qb *QueryBuilder) SetTextSearch(search string) *QueryBuilder {
qb.filter[OperatorText] = bsonV2.M{OperatorSearch: search}
return qb
}
// SetMod 设置字段的模运算条件
func (qb *QueryBuilder) SetMod(field string, divisor, remainder int) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorMod: []int{divisor, remainder}}
return qb
}
// SetRegex 设置正则表达式查询条件
func (qb *QueryBuilder) SetRegex(field string, pattern string) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorRegex: pattern}
return qb
}
// SetGeoWithin 设置地理位置范围查询条件
func (qb *QueryBuilder) SetGeoWithin(field string, geometry bsonV2.M) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorGeoWithin: geometry}
return qb
}
// SetGeoIntersects 设置地理位置相交查询条件
func (qb *QueryBuilder) SetGeoIntersects(field string, geometry bsonV2.M) *QueryBuilder {
qb.filter[field] = bsonV2.M{OperatorGeoIntersects: geometry}
return qb
}
// SetNear 设置地理位置附近查询条件
func (qb *QueryBuilder) SetNear(field string, point bsonV2.M, maxDistance, minDistance float64) *QueryBuilder {
qb.filter[field] = bsonV2.M{
OperatorNear: bsonV2.M{
OperatorGeometry: point,
OperatorMaxDistance: maxDistance,
OperatorMinDistance: minDistance,
},
}
return qb
}
// SetNearSphere 设置球面距离附近查询条件
func (qb *QueryBuilder) SetNearSphere(field string, point bsonV2.M, maxDistance, minDistance float64) *QueryBuilder {
qb.filter[field] = bsonV2.M{
OperatorNearSphere: bsonV2.M{
OperatorGeometry: point,
OperatorMaxDistance: maxDistance,
OperatorMinDistance: minDistance,
},
}
return qb
}
// SetLimit 设置查询结果的限制数量
func (qb *QueryBuilder) SetLimit(limit int64) *QueryBuilder {
if qb.opts == nil {
qb.opts = &optionsV2.FindOptions{}
}
qb.opts.Limit = &limit
return qb
}
// SetSort 设置查询结果的排序条件
func (qb *QueryBuilder) SetSort(sort bsonV2.D) *QueryBuilder {
if qb.opts == nil {
qb.opts = &optionsV2.FindOptions{}
}
qb.opts.Sort = sort
return qb
}
// SetSortWithPriority 设置查询结果的排序条件,并指定优先级
func (qb *QueryBuilder) SetSortWithPriority(sortFields []bsonV2.E) *QueryBuilder {
if qb.opts == nil {
qb.opts = &optionsV2.FindOptions{}
}
qb.opts.Sort = bsonV2.D(sortFields)
return qb
}
// SetProjection 设置查询结果的字段投影
func (qb *QueryBuilder) SetProjection(projection bsonV2.M) *QueryBuilder {
qb.opts.Projection = projection
return qb
}
// SetSkip 设置查询结果的跳过数量
func (qb *QueryBuilder) SetSkip(skip int64) *QueryBuilder {
qb.opts.Skip = &skip
return qb
}
// SetPage 设置分页功能page 从 1 开始size 为每页的文档数量
func (qb *QueryBuilder) SetPage(page, size int64) *QueryBuilder {
offset := (page - 1) * size
qb.opts.Skip = &offset
qb.opts.Limit = &size
return qb
}
// AddStage 添加聚合阶段到管道
func (qb *QueryBuilder) AddStage(stage bsonV2.M) *QueryBuilder {
qb.pipeline = append(qb.pipeline, stage)
return qb
}
// BuildPipeline 返回最终的聚合管道
func (qb *QueryBuilder) BuildPipeline() []bsonV2.M {
return qb.pipeline
}
// Build 返回最终的过滤条件和查询选项
func (qb *QueryBuilder) Build() (bsonV2.M, *optionsV2.FindOptions) {
return qb.filter, qb.opts
}

View File

@@ -0,0 +1,238 @@
package mongodb
import (
"testing"
"github.com/stretchr/testify/assert"
bsonV2 "go.mongodb.org/mongo-driver/v2/bson"
)
func TestQueryBuilder(t *testing.T) {
// 创建 QueryBuilder 实例
qb := NewQuery()
// 测试 SetFilter
filter := bsonV2.M{"name": "test"}
qb.SetFilter(filter)
assert.Equal(t, filter, qb.filter)
// 测试 SetLimit
limit := int64(10)
qb.SetLimit(limit)
assert.NotNil(t, qb.opts.Limit)
assert.Equal(t, &limit, qb.opts.Limit)
// 测试 SetSort
sort := bsonV2.D{{Key: "name", Value: 1}}
qb.SetSort(sort)
assert.NotNil(t, qb.opts.Sort)
assert.Equal(t, sort, qb.opts.Sort)
// 测试 Build
finalFilter, finalOpts := qb.Build()
assert.Equal(t, filter, finalFilter)
assert.Equal(t, qb.opts, finalOpts)
}
func TestQueryBuilderMethods(t *testing.T) {
qb := NewQuery()
// 测试 SetFilter
filter := bsonV2.M{"name": "test"}
qb.SetFilter(filter)
assert.Equal(t, filter, qb.filter)
// 测试 SetNotEqual
qb.SetNotEqual("status", "inactive")
assert.Equal(t, bsonV2.M{OperatorNe: "inactive"}, qb.filter["status"])
// 测试 SetGreaterThan
qb.SetGreaterThan("age", 18)
assert.Equal(t, bsonV2.M{OperatorGt: 18}, qb.filter["age"])
// 测试 SetLessThan
qb.SetLessThan("age", 30)
assert.Equal(t, bsonV2.M{OperatorLt: 30}, qb.filter["age"])
// 测试 SetExists
qb.SetExists("email", true)
assert.Equal(t, bsonV2.M{OperatorExists: true}, qb.filter["email"])
// 测试 SetType
qb.SetType("age", "int")
assert.Equal(t, bsonV2.M{OperatorType: "int"}, qb.filter["age"])
// 测试 SetBetween
qb.SetBetween("price", 10, 100)
assert.Equal(t, bsonV2.M{OperatorGte: 10, OperatorLte: 100}, qb.filter["price"])
// 测试 SetOr
orConditions := []bsonV2.M{
{"status": "active"},
{"status": "pending"},
}
qb.SetOr(orConditions)
assert.Equal(t, orConditions, qb.filter[OperatorOr])
// 测试 SetAnd
andConditions := []bsonV2.M{
{"age": bsonV2.M{OperatorGt: 18}},
{"status": "active"},
}
qb.SetAnd(andConditions)
assert.Equal(t, andConditions, qb.filter[OperatorAnd])
// 测试 SetLimit
limit := int64(10)
qb.SetLimit(limit)
assert.NotNil(t, qb.opts.Limit)
assert.Equal(t, &limit, qb.opts.Limit)
// 测试 SetSort
sort := bsonV2.D{{Key: "name", Value: 1}}
qb.SetSort(sort)
assert.NotNil(t, qb.opts.Sort)
assert.Equal(t, sort, qb.opts.Sort)
// 测试 SetSortWithPriority
sortWithPriority := []bsonV2.E{{Key: "priority", Value: -1}, {Key: "name", Value: 1}}
qb.SetSortWithPriority(sortWithPriority)
assert.Equal(t, bsonV2.D(sortWithPriority), qb.opts.Sort)
// 测试 SetProjection
projection := bsonV2.M{"name": 1, "age": 1}
qb.SetProjection(projection)
assert.Equal(t, projection, qb.opts.Projection)
// 测试 SetSkip
skip := int64(5)
qb.SetSkip(skip)
assert.NotNil(t, qb.opts.Skip)
assert.Equal(t, &skip, qb.opts.Skip)
// 测试 SetPage
page, size := int64(2), int64(10)
qb.SetPage(page, size)
assert.Equal(t, &size, qb.opts.Limit)
assert.Equal(t, int64(10), *qb.opts.Limit)
assert.Equal(t, int64(10), *qb.opts.Skip)
// 测试 SetRegex
qb.SetRegex("name", "^test")
assert.Equal(t, bsonV2.M{OperatorRegex: "^test"}, qb.filter["name"])
// 测试 SetIn
qb.SetIn("tags", []interface{}{"tag1", "tag2"})
assert.Equal(t, bsonV2.M{OperatorIn: []interface{}{"tag1", "tag2"}}, qb.filter["tags"])
// 测试 Build
finalFilter, finalOpts := qb.Build()
assert.Equal(t, qb.filter, finalFilter)
assert.Equal(t, qb.opts, finalOpts)
}
func TestSetGeoWithin(t *testing.T) {
qb := NewQuery()
field := "location"
geometry := bsonV2.M{"type": "Polygon", "coordinates": []interface{}{
[]interface{}{
[]float64{40.0, -70.0},
[]float64{41.0, -70.0},
[]float64{41.0, -71.0},
[]float64{40.0, -71.0},
[]float64{40.0, -70.0},
},
}}
qb.SetGeoWithin(field, geometry)
expected := bsonV2.M{
OperatorGeoWithin: bsonV2.M{
OperatorGeometry: geometry,
},
}
assert.Equal(t, expected, qb.filter[field])
}
func TestSetGeoIntersects(t *testing.T) {
qb := NewQuery()
field := "location"
geometry := bsonV2.M{"type": "LineString", "coordinates": [][]float64{
{40.0, -70.0},
{41.0, -71.0},
}}
qb.SetGeoIntersects(field, geometry)
expected := bsonV2.M{
OperatorGeoIntersects: bsonV2.M{
OperatorGeometry: geometry,
},
}
assert.Equal(t, expected, qb.filter[field])
}
func TestSetNear(t *testing.T) {
qb := NewQuery()
field := "location"
point := bsonV2.M{"type": "Point", "coordinates": []float64{40.7128, -74.0060}}
maxDistance := 500.0
minDistance := 50.0
qb.SetNear(field, point, maxDistance, minDistance)
expected := bsonV2.M{
OperatorNear: bsonV2.M{
OperatorGeometry: point,
OperatorMaxDistance: maxDistance,
OperatorMinDistance: minDistance,
},
}
assert.Equal(t, expected, qb.filter[field])
}
func TestSetNearSphere(t *testing.T) {
qb := NewQuery()
field := "location"
point := bsonV2.M{"type": "Point", "coordinates": []float64{40.7128, -74.0060}}
maxDistance := 1000.0
minDistance := 100.0
qb.SetNearSphere(field, point, maxDistance, minDistance)
expected := bsonV2.M{
OperatorNearSphere: bsonV2.M{
OperatorGeometry: point,
OperatorMaxDistance: maxDistance,
OperatorMinDistance: minDistance,
},
}
assert.Equal(t, expected, qb.filter[field])
}
func TestQueryBuilderPipeline(t *testing.T) {
// 创建 QueryBuilder 实例
qb := NewQuery()
// 添加聚合阶段
matchStage := bsonV2.M{OperatorMatch: bsonV2.M{"status": "active"}}
groupStage := bsonV2.M{OperatorGroup: bsonV2.M{"_id": "$category", "count": bsonV2.M{OperatorSum: 1}}}
sortStage := bsonV2.M{OperatorSortAgg: bsonV2.M{"count": -1}}
qb.AddStage(matchStage).AddStage(groupStage).AddStage(sortStage)
// 构建 Pipeline
pipeline := qb.BuildPipeline()
// 验证 Pipeline
expectedPipeline := []bsonV2.M{matchStage, groupStage, sortStage}
assert.Equal(t, expectedPipeline, pipeline)
}

View File

@@ -26,6 +26,7 @@ func NewLogger(cfg *conf.Logger) log.Logger {
fallthrough
case "text":
loggerFormatter = &logrus.TextFormatter{
ForceColors: cfg.Logrus.ForceColors,
DisableColors: cfg.Logrus.DisableColors,
DisableTimestamp: cfg.Logrus.DisableTimestamp,
TimestampFormat: cfg.Logrus.TimestampFormat,

10
tag.bat
View File

@@ -1,4 +1,4 @@
git tag api/v0.0.25 --force
git tag api/v0.0.27 --force
git tag utils/v0.1.4 --force
@@ -10,11 +10,11 @@ git tag tracer/v0.0.10 --force
git tag database/ent/v0.0.10 --force
git tag database/gorm/v0.0.10 --force
git tag database/mongodb/v0.0.10 --force
git tag database/influxdb/v0.0.11 --force
git tag database/mongodb/v0.0.12 --force
git tag database/influxdb/v0.0.12 --force
git tag database/clickhouse/v0.0.14 --force
git tag database/elasticsearch/v0.0.12 --force
git tag database/cassandra/v0.0.10 --force
git tag database/clickhouse/v0.0.10 --force
git tag database/elasticsearch/v0.0.1 --force
git tag registry/v0.1.0 --force
git tag registry/consul/v0.1.0 --force