@ -20,7 +20,11 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"net"
"net/url"
"os"
"path/filepath"
"syscall"
"github.com/nsqio/go-nsq"
@ -37,6 +41,8 @@ type NSQArgs struct {
Enable bool ` json:"enable" `
SkipVerify bool ` json:"skipVerify" `
} ` json:"tls" `
QueueDir string ` json:"queueDir" `
QueueLimit uint64 ` json:"queueLimit" `
}
// Validate NSQArgs fields
@ -52,6 +58,14 @@ func (n NSQArgs) Validate() error {
if n . Topic == "" {
return errors . New ( "empty topic" )
}
if n . QueueDir != "" {
if ! filepath . IsAbs ( n . QueueDir ) {
return errors . New ( "queueDir path should be absolute" )
}
}
if n . QueueLimit > 10000 {
return errors . New ( "queueLimit should not exceed 10000" )
}
return nil
}
@ -61,6 +75,7 @@ type NSQTarget struct {
id event . TargetID
args NSQArgs
producer * nsq . Producer
store Store
}
// ID - returns target ID.
@ -68,12 +83,37 @@ func (target *NSQTarget) ID() event.TargetID {
return target . id
}
// Save - Sends event directly without persisting .
// Save - saves the events to the store which will be replayed when the nsq connection is active .
func ( target * NSQTarget ) Save ( eventData event . Event ) error {
if target . store != nil {
return target . store . Put ( eventData )
}
if err := target . producer . Ping ( ) ; err != nil {
// To treat "connection refused" errors as errNotConnected.
if isConnRefusedErr ( err ) {
return errNotConnected
}
return err
}
return target . send ( eventData )
}
func ( target * NSQTarget ) send ( eventData event . Event ) ( err error ) {
// isConnRefusedErr - To check fot "connection refused" error.
func isConnRefusedErr ( err error ) bool {
if opErr , ok := err . ( * net . OpError ) ; ok {
if sysErr , ok := opErr . Err . ( * os . SyscallError ) ; ok {
if errno , ok := sysErr . Err . ( syscall . Errno ) ; ok {
if errno == syscall . ECONNREFUSED {
return true
}
}
}
}
return false
}
// send - sends an event to the NSQ.
func ( target * NSQTarget ) send ( eventData event . Event ) error {
objectName , err := url . QueryUnescape ( eventData . S3 . Object . Key )
if err != nil {
return err
@ -85,14 +125,36 @@ func (target *NSQTarget) send(eventData event.Event) (err error) {
return err
}
err = target . producer . Publish ( target . args . Topic , data )
return err
return target . producer . Publish ( target . args . Topic , data )
}
// Send - interface compatible method does no-op .
// Send - reads an event from store and sends it to NSQ .
func ( target * NSQTarget ) Send ( eventKey string ) error {
return nil
if err := target . producer . Ping ( ) ; err != nil {
// To treat "connection refused" errors as errNotConnected.
if isConnRefusedErr ( err ) {
return errNotConnected
}
return err
}
eventData , eErr := target . store . Get ( eventKey )
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os . IsNotExist ( eErr ) {
return nil
}
return eErr
}
if err := target . send ( eventData ) ; err != nil {
return err
}
// Delete the event from store.
return target . store . Del ( eventKey )
}
// Close - closes underneath connections to NSQD server.
@ -103,7 +165,7 @@ func (target *NSQTarget) Close() (err error) {
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget ( id string , args NSQArgs ) ( * NSQTarget , error ) {
func NewNSQTarget ( id string , args NSQArgs , doneCh <- chan struct { } ) ( * NSQTarget , error ) {
config := nsq . NewConfig ( )
if args . TLS . Enable {
config . TlsV1 = true
@ -111,15 +173,42 @@ func NewNSQTarget(id string, args NSQArgs) (*NSQTarget, error) {
InsecureSkipVerify : args . TLS . SkipVerify ,
}
}
producer , err := nsq . NewProducer ( args . NSQDAddress . String ( ) , config )
var store Store
if args . QueueDir != "" {
queueDir := filepath . Join ( args . QueueDir , storePrefix + "-nsq-" + id )
store = NewQueueStore ( queueDir , args . QueueLimit )
if oErr := store . Open ( ) ; oErr != nil {
return nil , oErr
}
}
producer , err := nsq . NewProducer ( args . NSQDAddress . String ( ) , config )
if err != nil {
return nil , err
}
return & NSQTarget {
target := & NSQTarget {
id : event . TargetID { ID : id , Name : "nsq" } ,
args : args ,
producer : producer ,
} , nil
store : store ,
}
if err := target . producer . Ping ( ) ; err != nil {
// To treat "connection refused" errors as errNotConnected.
if target . store == nil || ! isConnRefusedErr ( err ) {
return nil , err
}
}
if target . store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents ( target . store , doneCh )
// Start replaying events from the store.
go sendEvents ( target , eventKeyCh , doneCh )
}
return target , nil
}