Ignore "connection reset" errors while initializing the targets with queue store enabled (#8185)

Fixes #8178
master
Praveen raj Mani 5 years ago committed by Harshavardhana
parent 259a5d825b
commit 428836d4e1
  1. 2
      pkg/event/target/amqp.go
  2. 2
      pkg/event/target/mysql.go
  3. 2
      pkg/event/target/nsq.go
  4. 2
      pkg/event/target/postgresql.go
  5. 2
      pkg/event/target/redis.go
  6. 10
      pkg/event/target/store.go

@ -231,7 +231,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarge
conn, err = amqp.Dial(args.URL.String()) conn, err = amqp.Dial(args.URL.String())
if err != nil { if err != nil {
if store == nil || !IsConnRefusedErr(err) { if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err return nil, err
} }
} }

@ -351,7 +351,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTa
err = target.db.Ping() err = target.db.Ping()
if err != nil { if err != nil {
if target.store == nil || !IsConnRefusedErr(err) { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err return nil, err
} }
} else { } else {

@ -182,7 +182,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget,
if err := target.producer.Ping(); err != nil { if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected. // To treat "connection refused" errors as errNotConnected.
if target.store == nil || !IsConnRefusedErr(err) { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err return nil, err
} }
} }

@ -358,7 +358,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{})
err = target.db.Ping() err = target.db.Ping()
if err != nil { if err != nil {
if target.store == nil || !IsConnRefusedErr(err) { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err return nil, err
} }
} else { } else {

@ -276,7 +276,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa
_, pingErr := conn.Do("PING") _, pingErr := conn.Do("PING")
if pingErr != nil { if pingErr != nil {
if target.store == nil || !IsConnRefusedErr(pingErr) { if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
return nil, pingErr return nil, pingErr
} }
} else { } else {

@ -93,8 +93,12 @@ func IsConnRefusedErr(err error) bool {
return false return false
} }
// isConnResetErr - Checks for connection reset errors. // IsConnResetErr - Checks for connection reset errors.
func isConnResetErr(err error) bool { func IsConnResetErr(err error) bool {
if strings.Contains(err.Error(), "connection reset by peer") {
return true
}
// incase if error message is wrapped.
if opErr, ok := err.(*net.OpError); ok { if opErr, ok := err.(*net.OpError); ok {
if syscallErr, ok := opErr.Err.(*os.SyscallError); ok { if syscallErr, ok := opErr.Err.(*os.SyscallError); ok {
if syscallErr.Err == syscall.ECONNRESET { if syscallErr.Err == syscall.ECONNRESET {
@ -117,7 +121,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str
break break
} }
if err != errNotConnected && !isConnResetErr(err) { if err != errNotConnected && !IsConnResetErr(err) {
panic(fmt.Errorf("target.Send() failed with '%v'", err)) panic(fmt.Errorf("target.Send() failed with '%v'", err))
} }

Loading…
Cancel
Save