@ -20,23 +20,56 @@ func SetNotifyKafka(s config.Config, kName string, cfg target.KafkaArgs) error {
}
s [ config . NotifyKafkaSubSys ] [ kName ] = config . KVS {
config . State : config . StateOn ,
target . KafkaBrokers : func ( ) string {
var brokers [ ] string
for _ , broker := range cfg . Brokers {
brokers = append ( brokers , broker . String ( ) )
}
return strings . Join ( brokers , config . ValueSeparator )
} ( ) ,
target . KafkaTopic : cfg . Topic ,
target . KafkaQueueDir : cfg . QueueDir ,
target . KafkaQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
target . KafkaTLS : config . FormatBool ( cfg . TLS . Enable ) ,
target . KafkaTLSSkipVerify : config . FormatBool ( cfg . TLS . SkipVerify ) ,
target . KafkaTLSClientAuth : strconv . Itoa ( int ( cfg . TLS . ClientAuth ) ) ,
target . KafkaSASL : config . FormatBool ( cfg . SASL . Enable ) ,
target . KafkaSASLUsername : cfg . SASL . User ,
target . KafkaSASLPassword : cfg . SASL . Password ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . KafkaBrokers ,
Value : func ( ) string {
var brokers [ ] string
for _ , broker := range cfg . Brokers {
brokers = append ( brokers , broker . String ( ) )
}
return strings . Join ( brokers , config . ValueSeparator )
} ( ) ,
} ,
config . KV {
Key : target . KafkaTopic ,
Value : cfg . Topic ,
} ,
config . KV {
Key : target . KafkaQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . KafkaQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
config . KV {
Key : target . KafkaTLS ,
Value : config . FormatBool ( cfg . TLS . Enable ) ,
} ,
config . KV {
Key : target . KafkaTLSSkipVerify ,
Value : config . FormatBool ( cfg . TLS . SkipVerify ) ,
} ,
config . KV {
Key : target . KafkaTLSClientAuth ,
Value : strconv . Itoa ( int ( cfg . TLS . ClientAuth ) ) ,
} ,
config . KV {
Key : target . KafkaSASL ,
Value : config . FormatBool ( cfg . SASL . Enable ) ,
} ,
config . KV {
Key : target . KafkaSASLUsername ,
Value : cfg . SASL . User ,
} ,
config . KV {
Key : target . KafkaSASLPassword ,
Value : cfg . SASL . Password ,
} ,
}
return nil
}
@ -52,19 +85,58 @@ func SetNotifyAMQP(s config.Config, amqpName string, cfg target.AMQPArgs) error
}
s [ config . NotifyAMQPSubSys ] [ amqpName ] = config . KVS {
config . State : config . StateOn ,
target . AmqpURL : cfg . URL . String ( ) ,
target . AmqpExchange : cfg . Exchange ,
target . AmqpRoutingKey : cfg . RoutingKey ,
target . AmqpExchangeType : cfg . ExchangeType ,
target . AmqpDeliveryMode : strconv . Itoa ( int ( cfg . DeliveryMode ) ) ,
target . AmqpMandatory : config . FormatBool ( cfg . Mandatory ) ,
target . AmqpInternal : config . FormatBool ( cfg . Immediate ) ,
target . AmqpDurable : config . FormatBool ( cfg . Durable ) ,
target . AmqpNoWait : config . FormatBool ( cfg . NoWait ) ,
target . AmqpAutoDeleted : config . FormatBool ( cfg . AutoDeleted ) ,
target . AmqpQueueDir : cfg . QueueDir ,
target . AmqpQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . AmqpURL ,
Value : cfg . URL . String ( ) ,
} ,
config . KV {
Key : target . AmqpExchange ,
Value : cfg . Exchange ,
} ,
config . KV {
Key : target . AmqpRoutingKey ,
Value : cfg . RoutingKey ,
} ,
config . KV {
Key : target . AmqpExchangeType ,
Value : cfg . ExchangeType ,
} ,
config . KV {
Key : target . AmqpDeliveryMode ,
Value : strconv . Itoa ( int ( cfg . DeliveryMode ) ) ,
} ,
config . KV {
Key : target . AmqpMandatory ,
Value : config . FormatBool ( cfg . Mandatory ) ,
} ,
config . KV {
Key : target . AmqpInternal ,
Value : config . FormatBool ( cfg . Immediate ) ,
} ,
config . KV {
Key : target . AmqpDurable ,
Value : config . FormatBool ( cfg . Durable ) ,
} ,
config . KV {
Key : target . AmqpNoWait ,
Value : config . FormatBool ( cfg . NoWait ) ,
} ,
config . KV {
Key : target . AmqpAutoDeleted ,
Value : config . FormatBool ( cfg . AutoDeleted ) ,
} ,
config . KV {
Key : target . AmqpQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . AmqpQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -81,12 +153,30 @@ func SetNotifyES(s config.Config, esName string, cfg target.ElasticsearchArgs) e
}
s [ config . NotifyESSubSys ] [ esName ] = config . KVS {
config . State : config . StateOn ,
target . ElasticFormat : cfg . Format ,
target . ElasticURL : cfg . URL . String ( ) ,
target . ElasticIndex : cfg . Index ,
target . ElasticQueueDir : cfg . QueueDir ,
target . ElasticQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . ElasticFormat ,
Value : cfg . Format ,
} ,
config . KV {
Key : target . ElasticURL ,
Value : cfg . URL . String ( ) ,
} ,
config . KV {
Key : target . ElasticIndex ,
Value : cfg . Index ,
} ,
config . KV {
Key : target . ElasticQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . ElasticQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -103,13 +193,34 @@ func SetNotifyRedis(s config.Config, redisName string, cfg target.RedisArgs) err
}
s [ config . NotifyRedisSubSys ] [ redisName ] = config . KVS {
config . State : config . StateOn ,
target . RedisFormat : cfg . Format ,
target . RedisAddress : cfg . Addr . String ( ) ,
target . RedisPassword : cfg . Password ,
target . RedisKey : cfg . Key ,
target . RedisQueueDir : cfg . QueueDir ,
target . RedisQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . RedisFormat ,
Value : cfg . Format ,
} ,
config . KV {
Key : target . RedisAddress ,
Value : cfg . Addr . String ( ) ,
} ,
config . KV {
Key : target . RedisPassword ,
Value : cfg . Password ,
} ,
config . KV {
Key : target . RedisKey ,
Value : cfg . Key ,
} ,
config . KV {
Key : target . RedisQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . RedisQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -126,11 +237,26 @@ func SetNotifyWebhook(s config.Config, whName string, cfg target.WebhookArgs) er
}
s [ config . NotifyWebhookSubSys ] [ whName ] = config . KVS {
config . State : config . StateOn ,
target . WebhookEndpoint : cfg . Endpoint . String ( ) ,
target . WebhookAuthToken : cfg . AuthToken ,
target . WebhookQueueDir : cfg . QueueDir ,
target . WebhookQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . WebhookEndpoint ,
Value : cfg . Endpoint . String ( ) ,
} ,
config . KV {
Key : target . WebhookAuthToken ,
Value : cfg . AuthToken ,
} ,
config . KV {
Key : target . WebhookQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . WebhookQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -147,17 +273,50 @@ func SetNotifyPostgres(s config.Config, psqName string, cfg target.PostgreSQLArg
}
s [ config . NotifyPostgresSubSys ] [ psqName ] = config . KVS {
config . State : config . StateOn ,
target . PostgresFormat : cfg . Format ,
target . PostgresConnectionString : cfg . ConnectionString ,
target . PostgresTable : cfg . Table ,
target . PostgresHost : cfg . Host . String ( ) ,
target . PostgresPort : cfg . Port ,
target . PostgresUsername : cfg . User ,
target . PostgresPassword : cfg . Password ,
target . PostgresDatabase : cfg . Database ,
target . PostgresQueueDir : cfg . QueueDir ,
target . PostgresQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . PostgresFormat ,
Value : cfg . Format ,
} ,
config . KV {
Key : target . PostgresConnectionString ,
Value : cfg . ConnectionString ,
} ,
config . KV {
Key : target . PostgresTable ,
Value : cfg . Table ,
} ,
config . KV {
Key : target . PostgresHost ,
Value : cfg . Host . String ( ) ,
} ,
config . KV {
Key : target . PostgresPort ,
Value : cfg . Port ,
} ,
config . KV {
Key : target . PostgresUsername ,
Value : cfg . User ,
} ,
config . KV {
Key : target . PostgresPassword ,
Value : cfg . Password ,
} ,
config . KV {
Key : target . PostgresDatabase ,
Value : cfg . Database ,
} ,
config . KV {
Key : target . PostgresQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . PostgresQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -174,13 +333,34 @@ func SetNotifyNSQ(s config.Config, nsqName string, cfg target.NSQArgs) error {
}
s [ config . NotifyNSQSubSys ] [ nsqName ] = config . KVS {
config . State : config . StateOn ,
target . NSQAddress : cfg . NSQDAddress . String ( ) ,
target . NSQTopic : cfg . Topic ,
target . NSQTLS : config . FormatBool ( cfg . TLS . Enable ) ,
target . NSQTLSSkipVerify : config . FormatBool ( cfg . TLS . SkipVerify ) ,
target . NSQQueueDir : cfg . QueueDir ,
target . NSQQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . NSQAddress ,
Value : cfg . NSQDAddress . String ( ) ,
} ,
config . KV {
Key : target . NSQTopic ,
Value : cfg . Topic ,
} ,
config . KV {
Key : target . NSQTLS ,
Value : config . FormatBool ( cfg . TLS . Enable ) ,
} ,
config . KV {
Key : target . NSQTLSSkipVerify ,
Value : config . FormatBool ( cfg . TLS . SkipVerify ) ,
} ,
config . KV {
Key : target . NSQQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . NSQQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -197,28 +377,79 @@ func SetNotifyNATS(s config.Config, natsName string, cfg target.NATSArgs) error
}
s [ config . NotifyNATSSubSys ] [ natsName ] = config . KVS {
config . State : config . StateOn ,
target . NATSAddress : cfg . Address . String ( ) ,
target . NATSSubject : cfg . Subject ,
target . NATSUsername : cfg . Username ,
target . NATSPassword : cfg . Password ,
target . NATSToken : cfg . Token ,
target . NATSCertAuthority : cfg . CertAuthority ,
target . NATSClientCert : cfg . ClientCert ,
target . NATSClientKey : cfg . ClientKey ,
target . NATSSecure : config . FormatBool ( cfg . Secure ) ,
target . NATSPingInterval : strconv . FormatInt ( cfg . PingInterval , 10 ) ,
target . NATSQueueDir : cfg . QueueDir ,
target . NATSQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
target . NATSStreaming : func ( ) string {
if cfg . Streaming . Enable {
return config . StateOn
}
return config . StateOff
} ( ) ,
target . NATSStreamingClusterID : cfg . Streaming . ClusterID ,
target . NATSStreamingAsync : config . FormatBool ( cfg . Streaming . Async ) ,
target . NATSStreamingMaxPubAcksInFlight : strconv . Itoa ( cfg . Streaming . MaxPubAcksInflight ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . NATSAddress ,
Value : cfg . Address . String ( ) ,
} ,
config . KV {
Key : target . NATSSubject ,
Value : cfg . Subject ,
} ,
config . KV {
Key : target . NATSUsername ,
Value : cfg . Username ,
} ,
config . KV {
Key : target . NATSPassword ,
Value : cfg . Password ,
} ,
config . KV {
Key : target . NATSToken ,
Value : cfg . Token ,
} ,
config . KV {
Key : target . NATSCertAuthority ,
Value : cfg . CertAuthority ,
} ,
config . KV {
Key : target . NATSClientCert ,
Value : cfg . ClientCert ,
} ,
config . KV {
Key : target . NATSClientKey ,
Value : cfg . ClientKey ,
} ,
config . KV {
Key : target . NATSSecure ,
Value : config . FormatBool ( cfg . Secure ) ,
} ,
config . KV {
Key : target . NATSPingInterval ,
Value : strconv . FormatInt ( cfg . PingInterval , 10 ) ,
} ,
config . KV {
Key : target . NATSQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . NATSQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
config . KV {
Key : target . NATSStreaming ,
Value : func ( ) string {
if cfg . Streaming . Enable {
return config . StateOn
}
return config . StateOff
} ( ) ,
} ,
config . KV {
Key : target . NATSStreamingClusterID ,
Value : cfg . Streaming . ClusterID ,
} ,
config . KV {
Key : target . NATSStreamingAsync ,
Value : config . FormatBool ( cfg . Streaming . Async ) ,
} ,
config . KV {
Key : target . NATSStreamingMaxPubAcksInFlight ,
Value : strconv . Itoa ( cfg . Streaming . MaxPubAcksInflight ) ,
} ,
}
return nil
@ -235,17 +466,50 @@ func SetNotifyMySQL(s config.Config, sqlName string, cfg target.MySQLArgs) error
}
s [ config . NotifyMySQLSubSys ] [ sqlName ] = config . KVS {
config . State : config . StateOn ,
target . MySQLFormat : cfg . Format ,
target . MySQLDSNString : cfg . DSN ,
target . MySQLTable : cfg . Table ,
target . MySQLHost : cfg . Host . String ( ) ,
target . MySQLPort : cfg . Port ,
target . MySQLUsername : cfg . User ,
target . MySQLPassword : cfg . Password ,
target . MySQLDatabase : cfg . Database ,
target . MySQLQueueDir : cfg . QueueDir ,
target . MySQLQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . MySQLFormat ,
Value : cfg . Format ,
} ,
config . KV {
Key : target . MySQLDSNString ,
Value : cfg . DSN ,
} ,
config . KV {
Key : target . MySQLTable ,
Value : cfg . Table ,
} ,
config . KV {
Key : target . MySQLHost ,
Value : cfg . Host . String ( ) ,
} ,
config . KV {
Key : target . MySQLPort ,
Value : cfg . Port ,
} ,
config . KV {
Key : target . MySQLUsername ,
Value : cfg . User ,
} ,
config . KV {
Key : target . MySQLPassword ,
Value : cfg . Password ,
} ,
config . KV {
Key : target . MySQLDatabase ,
Value : cfg . Database ,
} ,
config . KV {
Key : target . MySQLQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . MySQLQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil
@ -262,16 +526,46 @@ func SetNotifyMQTT(s config.Config, mqttName string, cfg target.MQTTArgs) error
}
s [ config . NotifyMQTTSubSys ] [ mqttName ] = config . KVS {
config . State : config . StateOn ,
target . MqttBroker : cfg . Broker . String ( ) ,
target . MqttTopic : cfg . Topic ,
target . MqttQoS : fmt . Sprintf ( "%d" , cfg . QoS ) ,
target . MqttUsername : cfg . User ,
target . MqttPassword : cfg . Password ,
target . MqttReconnectInterval : cfg . MaxReconnectInterval . String ( ) ,
target . MqttKeepAliveInterval : cfg . KeepAlive . String ( ) ,
target . MqttQueueDir : cfg . QueueDir ,
target . MqttQueueLimit : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
config . KV {
Key : config . State ,
Value : config . StateOn ,
} ,
config . KV {
Key : target . MqttBroker ,
Value : cfg . Broker . String ( ) ,
} ,
config . KV {
Key : target . MqttTopic ,
Value : cfg . Topic ,
} ,
config . KV {
Key : target . MqttQoS ,
Value : fmt . Sprintf ( "%d" , cfg . QoS ) ,
} ,
config . KV {
Key : target . MqttUsername ,
Value : cfg . User ,
} ,
config . KV {
Key : target . MqttPassword ,
Value : cfg . Password ,
} ,
config . KV {
Key : target . MqttReconnectInterval ,
Value : cfg . MaxReconnectInterval . String ( ) ,
} ,
config . KV {
Key : target . MqttKeepAliveInterval ,
Value : cfg . KeepAlive . String ( ) ,
} ,
config . KV {
Key : target . MqttQueueDir ,
Value : cfg . QueueDir ,
} ,
config . KV {
Key : target . MqttQueueLimit ,
Value : strconv . Itoa ( int ( cfg . QueueLimit ) ) ,
} ,
}
return nil