@ -17,13 +17,18 @@
package target
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/gomodule/redigo/redis"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
@ -35,6 +40,8 @@ type RedisArgs struct {
Addr xnet . Host ` json:"address" `
Password string ` json:"password" `
Key string ` json:"key" `
QueueDir string ` json:"queueDir" `
QueueLimit uint64 ` json:"queueLimit" `
}
// Validate RedisArgs fields
@ -54,6 +61,35 @@ func (r RedisArgs) Validate() error {
return fmt . Errorf ( "empty key" )
}
if r . QueueDir != "" {
if ! filepath . IsAbs ( r . QueueDir ) {
return errors . New ( "queueDir path should be absolute" )
}
}
if r . QueueLimit > 10000 {
return errors . New ( "queueLimit should not exceed 10000" )
}
return nil
}
func ( r RedisArgs ) validateFormat ( c redis . Conn ) error {
typeAvailable , err := redis . String ( c . Do ( "TYPE" , r . Key ) )
if err != nil {
return err
}
if typeAvailable != "none" {
expectedType := "hash"
if r . Format == event . AccessFormat {
expectedType = "list"
}
if typeAvailable != expectedType {
return fmt . Errorf ( "expected type %v does not match with available type %v" , expectedType , typeAvailable )
}
}
return nil
}
@ -62,6 +98,8 @@ type RedisTarget struct {
id event . TargetID
args RedisArgs
pool * redis . Pool
store Store
firstPing bool
}
// ID - returns target ID.
@ -69,16 +107,32 @@ func (target *RedisTarget) ID() event.TargetID {
return target . id
}
// Save - Sends event directly without persisting .
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active .
func ( target * RedisTarget ) Save ( eventData event . Event ) error {
if target . store != nil {
return target . store . Put ( eventData )
}
conn := target . pool . Get ( )
defer func ( ) {
cErr := conn . Close ( )
logger . LogOnceIf ( context . Background ( ) , cErr , target . ID ( ) )
} ( )
_ , pingErr := conn . Do ( "PING" )
if pingErr != nil {
if IsConnRefusedErr ( pingErr ) {
return errNotConnected
}
return pingErr
}
return target . send ( eventData )
}
// send - sends an event to the redis.
func ( target * RedisTarget ) send ( eventData event . Event ) error {
conn := target . pool . Get ( )
defer func ( ) {
// FIXME: log returned error. ignore time being.
_ = conn . Close ( )
cErr := conn . Close ( )
logger . LogOnceIf ( context . Background ( ) , cErr , target . ID ( ) )
} ( )
if target . args . Format == event . NamespaceFormat {
@ -98,24 +152,68 @@ func (target *RedisTarget) send(eventData event.Event) error {
_ , err = conn . Do ( "HSET" , target . args . Key , key , data )
}
if err != nil {
return err
}
}
if target . args . Format == event . AccessFormat {
data , err := json . Marshal ( [ ] interface { } { eventData . EventTime , [ ] event . Event { eventData } } )
if err != nil {
return err
}
_ , err = conn . Do ( "RPUSH" , target . args . Key , data )
if _ , err : = conn . Do ( "RPUSH" , target . args . Key , data ) ; err != nil {
return err
}
}
return nil
}
// Send - interface compatible method does no-op .
// Send - reads an event from store and sends it to redis .
func ( target * RedisTarget ) Send ( eventKey string ) error {
conn := target . pool . Get ( )
defer func ( ) {
cErr := conn . Close ( )
logger . LogOnceIf ( context . Background ( ) , cErr , target . ID ( ) )
} ( )
_ , pingErr := conn . Do ( "PING" )
if pingErr != nil {
if IsConnRefusedErr ( pingErr ) {
return errNotConnected
}
return pingErr
}
if ! target . firstPing {
if err := target . args . validateFormat ( conn ) ; err != nil {
if IsConnRefusedErr ( err ) {
return errNotConnected
}
return err
}
target . firstPing = true
}
eventData , eErr := target . store . Get ( eventKey )
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully.
if os . IsNotExist ( eErr ) {
return nil
}
return eErr
}
if err := target . send ( eventData ) ; err != nil {
if IsConnRefusedErr ( err ) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target . store . Del ( eventKey )
}
// Close - does nothing and available for interface compatibility.
@ -124,7 +222,7 @@ func (target *RedisTarget) Close() error {
}
// NewRedisTarget - creates new Redis target.
func NewRedisTarget ( id string , args RedisArgs ) ( * RedisTarget , error ) {
func NewRedisTarget ( id string , args RedisArgs , doneCh <- chan struct { } ) ( * RedisTarget , error ) {
pool := & redis . Pool {
MaxIdle : 3 ,
IdleTimeout : 2 * 60 * time . Second ,
@ -139,8 +237,9 @@ func NewRedisTarget(id string, args RedisArgs) (*RedisTarget, error) {
}
if _ , err = conn . Do ( "AUTH" , args . Password ) ; err != nil {
// FIXME: log returned error. ignore time being.
_ = conn . Close ( )
cErr := conn . Close ( )
targetID := event . TargetID { ID : id , Name : "redis" }
logger . LogOnceIf ( context . Background ( ) , cErr , targetID . String ( ) )
return nil , err
}
@ -152,35 +251,47 @@ func NewRedisTarget(id string, args RedisArgs) (*RedisTarget, error) {
} ,
}
conn := pool . Get ( )
var store Store
if args . QueueDir != "" {
queueDir := filepath . Join ( args . QueueDir , storePrefix + "-redis-" + id )
store = NewQueueStore ( queueDir , args . QueueLimit )
if oErr := store . Open ( ) ; oErr != nil {
return nil , oErr
}
}
target := & RedisTarget {
id : event . TargetID { ID : id , Name : "redis" } ,
args : args ,
pool : pool ,
store : store ,
}
conn := target . pool . Get ( )
defer func ( ) {
// FIXME: log returned error. ignore time being.
_ = conn . Close ( )
cErr := conn . Close ( )
logger . LogOnceIf ( context . Background ( ) , cErr , target . ID ( ) )
} ( )
if _ , err := conn . Do ( "PING" ) ; err != nil {
return nil , err
_ , pingErr := conn . Do ( "PING" )
if pingErr != nil {
if target . store == nil || ! IsConnRefusedErr ( pingErr ) {
return nil , pingErr
}
typeAvailable , err := redis . String ( conn . Do ( "TYPE" , args . Key ) )
if err != nil {
} else {
if err := target . args . validateFormat ( conn ) ; err != nil {
return nil , err
}
if typeAvailable != "none" {
expectedType := "hash"
if args . Format == event . AccessFormat {
expectedType = "list"
target . firstPing = true
}
if typeAvailable != expectedType {
return nil , fmt . Errorf ( "expected type %v does not match with available type %v" , expectedType , typeAvailable )
}
if target . store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents ( target . store , doneCh )
// Start replaying events from the store.
go sendEvents ( target , eventKeyCh , doneCh )
}
return & RedisTarget {
id : event . TargetID { ID : id , Name : "redis" } ,
args : args ,
pool : pool ,
} , nil
return target , nil
}