feature: added nsq as broker for events (#6740)

master
Matthias Schneider 6 years ago committed by kannappanr
parent bf414068a3
commit 71c66464c1
  1. 15
      cmd/admin-handlers_test.go
  2. 39
      cmd/config-current.go
  3. 23
      cmd/config-current_test.go
  4. 70
      cmd/config-migrate.go
  5. 91
      cmd/config-versions.go
  6. 73
      docs/bucket/notifications/README.md
  7. 11
      docs/config/config.sample.json
  8. 116
      pkg/event/target/nsq.go
  9. 157
      pkg/event/target/nsq_test.go
  10. 256
      vendor/github.com/nsqio/go-nsq/ChangeLog.md
  11. 14
      vendor/github.com/nsqio/go-nsq/Gopkg.lock
  12. 26
      vendor/github.com/nsqio/go-nsq/Gopkg.toml
  13. 17
      vendor/github.com/nsqio/go-nsq/LICENSE
  14. 19
      vendor/github.com/nsqio/go-nsq/README.md
  15. 180
      vendor/github.com/nsqio/go-nsq/UPGRADING.md
  16. 91
      vendor/github.com/nsqio/go-nsq/api_request.go
  17. 221
      vendor/github.com/nsqio/go-nsq/command.go
  18. 671
      vendor/github.com/nsqio/go-nsq/config.go
  19. 31
      vendor/github.com/nsqio/go-nsq/config_flag.go
  20. 734
      vendor/github.com/nsqio/go-nsq/conn.go
  21. 1178
      vendor/github.com/nsqio/go-nsq/consumer.go
  22. 138
      vendor/github.com/nsqio/go-nsq/delegates.go
  23. 44
      vendor/github.com/nsqio/go-nsq/errors.go
  24. 164
      vendor/github.com/nsqio/go-nsq/message.go
  25. 393
      vendor/github.com/nsqio/go-nsq/producer.go
  26. 96
      vendor/github.com/nsqio/go-nsq/protocol.go
  27. 8
      vendor/github.com/nsqio/go-nsq/states.go
  28. 39
      vendor/github.com/nsqio/go-nsq/test.sh
  29. 8
      vendor/github.com/nsqio/go-nsq/version.go
  30. 6
      vendor/vendor.json

@ -38,7 +38,7 @@ import (
var (
configJSON = []byte(`{
"version": "31",
"version": "32",
"credential": {
"accessKey": "minio",
"secretKey": "minio123"
@ -157,6 +157,17 @@ var (
"maxPubAcksInflight": 0
}
}
},
"nsq": {
"1": {
"enable": false,
"nsqdAddress": "",
"topic": "",
"tls": {
"enable": false,
"skipVerify": false
}
}
},
"postgresql": {
"1": {
@ -746,7 +757,7 @@ func TestSetConfigHandler(t *testing.T) {
rec := httptest.NewRecorder()
adminTestBed.router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("Expected to succeed but failed with %d", rec.Code)
t.Errorf("Expected to succeed but failed with %d, body: %s", rec.Code, rec.Body)
}
// Check that a very large config file returns an error.

@ -43,9 +43,9 @@ import (
// 6. Make changes in config-current_test.go for any test change
// Config version
const serverConfigVersion = "31"
const serverConfigVersion = "32"
type serverConfig = serverConfigV31
type serverConfig = serverConfigV32
var (
// globalServerConfig server config.
@ -210,6 +210,12 @@ func (s *serverConfig) Validate() error {
}
}
for _, v := range s.Notify.NSQ {
if err := v.Validate(); err != nil {
return fmt.Errorf("nsq: %s", err)
}
}
for _, v := range s.Notify.PostgreSQL {
if err := v.Validate(); err != nil {
return fmt.Errorf("postgreSQL: %s", err)
@ -358,6 +364,17 @@ func (s *serverConfig) TestNotificationTargets() error {
t.Close()
}
for k, v := range s.Notify.NSQ {
if !v.Enable {
continue
}
t, err := target.NewNSQTarget(k, v)
if err != nil {
return fmt.Errorf("nsq(%s): %s", k, err.Error())
}
t.Close()
}
for k, v := range s.Notify.PostgreSQL {
if !v.Enable {
continue
@ -405,6 +422,8 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string {
return "AMQP Notification configuration differs"
case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS):
return "NATS Notification configuration differs"
case !reflect.DeepEqual(s.Notify.NSQ, t.Notify.NSQ):
return "NSQ Notification configuration differs"
case !reflect.DeepEqual(s.Notify.Elasticsearch, t.Notify.Elasticsearch):
return "ElasticSearch Notification configuration differs"
case !reflect.DeepEqual(s.Notify.Redis, t.Notify.Redis):
@ -470,6 +489,8 @@ func newServerConfig() *serverConfig {
srvCfg.Notify.Redis["1"] = target.RedisArgs{}
srvCfg.Notify.NATS = make(map[string]target.NATSArgs)
srvCfg.Notify.NATS["1"] = target.NATSArgs{}
srvCfg.Notify.NSQ = make(map[string]target.NSQArgs)
srvCfg.Notify.NSQ["1"] = target.NSQArgs{}
srvCfg.Notify.PostgreSQL = make(map[string]target.PostgreSQLArgs)
srvCfg.Notify.PostgreSQL["1"] = target.PostgreSQLArgs{}
srvCfg.Notify.MySQL = make(map[string]target.MySQLArgs)
@ -705,6 +726,20 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
}
}
for id, args := range config.Notify.NSQ {
if args.Enable {
newTarget, err := target.NewNSQTarget(id, args)
if err != nil {
logger.LogIf(context.Background(), err)
continue
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
continue
}
}
}
for id, args := range config.Notify.PostgreSQL {
if args.Enable {
newTarget, err := target.NewPostgreSQLTarget(id, args)

@ -234,6 +234,9 @@ func TestValidateConfig(t *testing.T) {
// Test 27 - Test MQTT
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mqtt": { "1": { "enable": true, "broker": "", "topic": "", "qos": 0, "clientId": "", "username": "", "password": ""}}}}`, false},
// Test 28 - Test NSQ
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nsq": { "1": { "enable": true, "nsqdAddress": "", "topic": ""} }}}`, false},
}
for i, testCase := range testCases {
@ -293,48 +296,54 @@ func TestConfigDiff(t *testing.T) {
"NATS Notification configuration differs",
},
// 7
{
&serverConfig{Notify: notifier{NSQ: map[string]target.NSQArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{NSQ: map[string]target.NSQArgs{"1": {Enable: false}}}},
"NSQ Notification configuration differs",
},
// 8
{
&serverConfig{Notify: notifier{Elasticsearch: map[string]target.ElasticsearchArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Elasticsearch: map[string]target.ElasticsearchArgs{"1": {Enable: false}}}},
"ElasticSearch Notification configuration differs",
},
// 8
// 9
{
&serverConfig{Notify: notifier{Redis: map[string]target.RedisArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Redis: map[string]target.RedisArgs{"1": {Enable: false}}}},
"Redis Notification configuration differs",
},
// 9
// 10
{
&serverConfig{Notify: notifier{PostgreSQL: map[string]target.PostgreSQLArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{PostgreSQL: map[string]target.PostgreSQLArgs{"1": {Enable: false}}}},
"PostgreSQL Notification configuration differs",
},
// 10
// 11
{
&serverConfig{Notify: notifier{Kafka: map[string]target.KafkaArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Kafka: map[string]target.KafkaArgs{"1": {Enable: false}}}},
"Kafka Notification configuration differs",
},
// 11
// 12
{
&serverConfig{Notify: notifier{Webhook: map[string]target.WebhookArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{Webhook: map[string]target.WebhookArgs{"1": {Enable: false}}}},
"Webhook Notification configuration differs",
},
// 12
// 13
{
&serverConfig{Notify: notifier{MySQL: map[string]target.MySQLArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{MySQL: map[string]target.MySQLArgs{"1": {Enable: false}}}},
"MySQL Notification configuration differs",
},
// 13
// 14
{
&serverConfig{Notify: notifier{MQTT: map[string]target.MQTTArgs{"1": {Enable: true}}}},
&serverConfig{Notify: notifier{MQTT: map[string]target.MQTTArgs{"1": {Enable: false}}}},
"MQTT Notification configuration differs",
},
// 14
// 15
{
&serverConfig{Logger: loggerConfig{
Console: loggerConsole{Enabled: true},

@ -919,7 +919,7 @@ func migrateV12ToV13() error {
// Copy over fields from V12 into V13 config struct
srvConfig := &serverConfigV13{
Logger: &loggerV7{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "13"
srvConfig.Credential = cv12.Credential
@ -999,7 +999,7 @@ func migrateV13ToV14() error {
// Copy over fields from V13 into V14 config struct
srvConfig := &serverConfigV14{
Logger: &loggerV7{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "14"
srvConfig.Credential = cv13.Credential
@ -1084,7 +1084,7 @@ func migrateV14ToV15() error {
// Copy over fields from V14 into V15 config struct
srvConfig := &serverConfigV15{
Logger: &loggerV7{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "15"
srvConfig.Credential = cv14.Credential
@ -1174,7 +1174,7 @@ func migrateV15ToV16() error {
// Copy over fields from V15 into V16 config struct
srvConfig := &serverConfigV16{
Logger: &loggers{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "16"
srvConfig.Credential = cv15.Credential
@ -1264,7 +1264,7 @@ func migrateV16ToV17() error {
// Copy over fields from V16 into V17 config struct
srvConfig := &serverConfigV17{
Logger: &loggers{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "17"
srvConfig.Credential = cv16.Credential
@ -1385,7 +1385,7 @@ func migrateV17ToV18() error {
// Copy over fields from V17 into V18 config struct
srvConfig := &serverConfigV17{
Logger: &loggers{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "18"
srvConfig.Credential = cv17.Credential
@ -1487,7 +1487,7 @@ func migrateV18ToV19() error {
// Copy over fields from V18 into V19 config struct
srvConfig := &serverConfigV18{
Logger: &loggers{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "19"
srvConfig.Credential = cv18.Credential
@ -1593,7 +1593,7 @@ func migrateV19ToV20() error {
// Copy over fields from V19 into V20 config struct
srvConfig := &serverConfigV20{
Logger: &loggers{},
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "20"
srvConfig.Credential = cv19.Credential
@ -1697,7 +1697,7 @@ func migrateV20ToV21() error {
// Copy over fields from V20 into V21 config struct
srvConfig := &serverConfigV21{
Notify: &notifier{},
Notify: &notifierV3{},
}
srvConfig.Version = "21"
srvConfig.Credential = cv20.Credential
@ -1801,7 +1801,7 @@ func migrateV21ToV22() error {
// Copy over fields from V21 into V22 config struct
srvConfig := &serverConfigV22{
Notify: notifier{},
Notify: notifierV3{},
}
srvConfig.Version = "22"
srvConfig.Credential = cv21.Credential
@ -1905,7 +1905,7 @@ func migrateV22ToV23() error {
// Copy over fields from V22 into V23 config struct
srvConfig := &serverConfigV23{
Notify: notifier{},
Notify: notifierV3{},
}
srvConfig.Version = "23"
srvConfig.Credential = cv22.Credential
@ -2018,7 +2018,7 @@ func migrateV23ToV24() error {
// Copy over fields from V23 into V24 config struct
srvConfig := &serverConfigV24{
Notify: notifier{},
Notify: notifierV3{},
}
srvConfig.Version = "24"
srvConfig.Credential = cv23.Credential
@ -2131,7 +2131,7 @@ func migrateV24ToV25() error {
// Copy over fields from V24 into V25 config struct
srvConfig := &serverConfigV25{
Notify: notifier{},
Notify: notifierV3{},
}
srvConfig.Version = "25"
srvConfig.Credential = cv24.Credential
@ -2249,7 +2249,7 @@ func migrateV25ToV26() error {
// Copy over fields from V25 into V26 config struct
srvConfig := &serverConfigV26{
Notify: notifier{},
Notify: notifierV3{},
}
srvConfig.Version = "26"
srvConfig.Credential = cv25.Credential
@ -2413,7 +2413,7 @@ func migrateV27ToV28() error {
return nil
}
// Migrates '.minio.sys/config.json' to v31.
// Migrates '.minio.sys/config.json' to v32.
func migrateMinioSysConfig(objAPI ObjectLayer) error {
if err := migrateV27ToV28MinioSys(objAPI); err != nil {
return err
@ -2424,7 +2424,10 @@ func migrateMinioSysConfig(objAPI ObjectLayer) error {
if err := migrateV29ToV30MinioSys(objAPI); err != nil {
return err
}
return migrateV30ToV31MinioSys(objAPI)
if err := migrateV30ToV31MinioSys(objAPI); err != nil {
return err
}
return migrateV31ToV32MinioSys(objAPI)
}
func checkConfigVersion(objAPI ObjectLayer, configFile string, version string) (bool, []byte, error) {
@ -2585,3 +2588,38 @@ func migrateV30ToV31MinioSys(objAPI ObjectLayer) error {
logger.Info(configMigrateMSGTemplate, configFile, "30", "31")
return nil
}
func migrateV31ToV32MinioSys(objAPI ObjectLayer) error {
configFile := path.Join(minioConfigPrefix, minioConfigFile)
ok, data, err := checkConfigVersion(objAPI, configFile, "31")
if err == errConfigNotFound {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config file. %v", err)
}
if !ok {
return nil
}
cfg := &serverConfigV32{}
if err = json.Unmarshal(data, cfg); err != nil {
return err
}
cfg.Version = "32"
cfg.Notify.NSQ = make(map[string]target.NSQArgs)
cfg.Notify.NSQ["1"] = target.NSQArgs{}
data, err = json.Marshal(cfg)
if err != nil {
return err
}
if err = saveConfig(context.Background(), objAPI, configFile, data); err != nil {
return fmt.Errorf("Failed to migrate config from ‘31’ to ‘32’. %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "31", "32")
return nil
}

@ -373,7 +373,7 @@ type serverConfigV12 struct {
Notify notifierV2 `json:"notify"`
}
type notifier struct {
type notifierV3 struct {
AMQP map[string]target.AMQPArgs `json:"amqp"`
Elasticsearch map[string]target.ElasticsearchArgs `json:"elasticsearch"`
Kafka map[string]target.KafkaArgs `json:"kafka"`
@ -398,7 +398,7 @@ type serverConfigV13 struct {
Logger *loggerV7 `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV14 server configuration version '14' which is like
@ -415,7 +415,7 @@ type serverConfigV14 struct {
Logger *loggerV7 `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV15 server configuration version '15' which is like
@ -432,7 +432,7 @@ type serverConfigV15 struct {
Logger *loggerV7 `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// FileLogger is introduced to workaround the dependency about logrus
@ -470,7 +470,7 @@ type serverConfigV16 struct {
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV17 server configuration version '17' which is like
@ -489,7 +489,7 @@ type serverConfigV17 struct {
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV18 server configuration version '18' which is like
@ -508,7 +508,7 @@ type serverConfigV18 struct {
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV19 server configuration version '19' which is like
@ -526,7 +526,7 @@ type serverConfigV19 struct {
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV20 server configuration version '20' which is like
@ -545,7 +545,7 @@ type serverConfigV20 struct {
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV21 is just like version '20' without logger field
@ -560,7 +560,7 @@ type serverConfigV21 struct {
Domain string `json:"domain"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
Notify *notifierV3 `json:"notify"`
}
// serverConfigV22 is just like version '21' with added support
@ -581,7 +581,7 @@ type serverConfigV22 struct {
StorageClass storageClassConfig `json:"storageclass"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
}
// serverConfigV23 is just like version '22' with addition of cache field.
@ -604,7 +604,7 @@ type serverConfigV23 struct {
Cache CacheConfig `json:"cache"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
}
// serverConfigV24 is just like version '23', we had to revert
@ -628,7 +628,7 @@ type serverConfigV24 struct {
Cache CacheConfig `json:"cache"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
}
// serverConfigV25 is just like version '24', stores additionally
@ -655,7 +655,7 @@ type serverConfigV25 struct {
Cache CacheConfig `json:"cache"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
}
// serverConfigV26 is just like version '25', stores additionally
@ -679,7 +679,7 @@ type serverConfigV26 struct {
Cache CacheConfig `json:"cache"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
}
type loggerConsole struct {
@ -720,7 +720,7 @@ type serverConfigV27 struct {
Cache CacheConfig `json:"cache"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
// Logger configuration
Logger loggerConfig `json:"logger"`
@ -751,7 +751,7 @@ type serverConfigV28 struct {
KMS crypto.KMSConfig `json:"kms"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
// Logger configuration
Logger loggerConfig `json:"logger"`
@ -787,7 +787,7 @@ type serverConfigV30 struct {
KMS crypto.KMSConfig `json:"kms"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV3 `json:"notify"`
// Logger configuration
Logger loggerConfig `json:"logger"`
@ -814,6 +814,61 @@ type serverConfigV31 struct {
// KMS configuration
KMS crypto.KMSConfig `json:"kms"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
// Logger configuration
Logger loggerConfig `json:"logger"`
// Compression configuration
Compression compressionConfig `json:"compress"`
// OpenID configuration
OpenID struct {
// JWKS validator config.
JWKS validator.JWKSArgs `json:"jwks"`
} `json:"openid"`
// External policy enforcements.
Policy struct {
// OPA configuration.
OPA iampolicy.OpaArgs `json:"opa"`
// Add new external policy enforcements here.
} `json:"policy"`
}
type notifier struct {
AMQP map[string]target.AMQPArgs `json:"amqp"`
Elasticsearch map[string]target.ElasticsearchArgs `json:"elasticsearch"`
Kafka map[string]target.KafkaArgs `json:"kafka"`
MQTT map[string]target.MQTTArgs `json:"mqtt"`
MySQL map[string]target.MySQLArgs `json:"mysql"`
NATS map[string]target.NATSArgs `json:"nats"`
NSQ map[string]target.NSQArgs `json:"nsq"`
PostgreSQL map[string]target.PostgreSQLArgs `json:"postgresql"`
Redis map[string]target.RedisArgs `json:"redis"`
Webhook map[string]target.WebhookArgs `json:"webhook"`
}
// serverConfigV32 is just like version '31' with added nsq notifer.
type serverConfigV32 struct {
Version string `json:"version"`
// S3 API configuration.
Credential auth.Credentials `json:"credential"`
Region string `json:"region"`
Worm BoolFlag `json:"worm"`
// Storage class configuration
StorageClass storageClassConfig `json:"storageclass"`
// Cache configuration
Cache CacheConfig `json:"cache"`
// KMS configuration
KMS crypto.KMSConfig `json:"kms"`
// Notification queue configuration.
Notify notifier `json:"notify"`

@ -17,6 +17,7 @@ Bucket events can be published to the following targets:
| [`AMQP`](#AMQP) | [`Redis`](#Redis) | [`MySQL`](#MySQL) |
| [`MQTT`](#MQTT) | [`NATS`](#NATS) | [`Apache Kafka`](#apache-kafka) |
| [`Elasticsearch`](#Elasticsearch) | [`PostgreSQL`](#PostgreSQL) | [`Webhooks`](#webhooks) |
| [`NSQ`](#NSQ) | | |
## Prerequisites
@ -1007,5 +1008,77 @@ mc ls myminio/images-thumbnail
[2017-02-08 11:39:40 IST] 992B images-thumbnail.jpg
```
<a name="NSQ"></a>
## Publish Minio events to NSQ
Install an NSQ Daemon from [here](https://nsq.io/). Or use the following Docker
command for starting an nsq daemon:
```
docker run --rm -p 4150-4151:4150-4151 nsqio/nsq /nsqd
```
### Step 1: Add NSQ endpoint to Minio
The Minio server configuration file is stored on the backend in json format. The NSQ configuration is located in the `nsq` key under the `notify` top-level key. Create a configuration key-value pair here for your NSQ instance. The key is a name for your NSQ endpoint, and the value is a collection of key-value parameters.
An example configuration for NSQ is shown below:
```json
"nsq": {
"1": {
"enable": true,
"nsqdAddress": "127.0.0.1:4150",
"topic": "minio",
"tls": {
"enable": false,
"skipVerify": true
}
}
}
```
To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally.
```sh
$ mc admin config get myminio/ > /tmp/myconfig
```
After updating the NSQ configuration in /tmp/myconfig , use `mc admin config set` command to update the configuration for the deployment.Restart the Minio server to put the changes into effect. The server will print a line like `SQS ARNs: arn:minio:sqs::1:nsq` at start-up if there were no errors.
```sh
$ mc admin config set myminio < /tmp/myconfig
```
Note that, you can add as many NSQ daemon endpoint configurations as needed by providing an identifier (like "1" in the example above) for the NSQ instance and an object of per-server configuration parameters.
### Step 2: Enable bucket notification using Minio client
We will enable bucket event notification to trigger whenever a JPEG image is uploaded or deleted ``images`` bucket on ``myminio`` server. Here ARN value is ``arn:minio:sqs::1:nsq``.
```
mc mb myminio/images
mc events add myminio/images arn:minio:sqs::1:nsq --suffix .jpg
mc events list myminio/images
arn:minio:sqs::1:nsq s3:ObjectCreated:*,s3:ObjectRemoved:* Filter: suffix=”.jpg”
```
### Step 3: Test on NSQ
The simplest test is to download `nsq_tail` from [nsq github](https://github.com/nsqio/nsq/releases)
```
./nsq_tail -nsqd-tcp-address 127.0.0.1:4150 -topic minio
```
Open another terminal and upload a JPEG image into ``images`` bucket.
```
mc cp gopher.jpg myminio/images
```
You should receive the following event notification via NSQ once the upload completes.
```
{"EventName":"s3:ObjectCreated:Put","Key":"images/gopher.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2018-10-31T09:31:11Z","eventName":"s3:ObjectCreated:Put","userIdentity":{"principalId":"21EJ9HYV110O8NVX2VMS"},"requestParameters":{"sourceIPAddress":"10.1.1.1"},"responseElements":{"x-amz-request-id":"1562A792DAA53426","x-minio-origin-endpoint":"http://10.0.3.1:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"images","ownerIdentity":{"principalId":"21EJ9HYV110O8NVX2VMS"},"arn":"arn:aws:s3:::images"},"object":{"key":"gopher.jpg","size":162023,"eTag":"5337769ffa594e742408ad3f30713cd7","contentType":"image/jpeg","userMetadata":{"content-type":"image/jpeg"},"versionId":"1","sequencer":"1562A792DAA53426"}},"source":{"host":"","port":"","userAgent":"Minio (linux; amd64) minio-go/v6.0.8 mc/DEVELOPMENT.GOGET"}}]}
```
*NOTE* If you are running [distributed Minio](https://docs.minio.io/docs/distributed-minio-quickstart-guide), modify ``~/.minio/config.json`` on all the nodes with your bucket event notification backend configuration.

@ -121,6 +121,17 @@
}
}
},
"nsq": {
"1": {
"enable": false,
"nsqdAddress": "",
"topic": "",
"tls": {
"enable": false,
"skipVerify": true
}
}
},
"postgresql": {
"1": {
"enable": false,

@ -0,0 +1,116 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"crypto/tls"
"encoding/json"
"errors"
"net/url"
"github.com/nsqio/go-nsq"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// NSQArgs - NSQ target arguments.
type NSQArgs struct {
Enable bool `json:"enable"`
NSQDAddress xnet.Host `json:"nsqdAddress"`
Topic string `json:"topic"`
TLS struct {
Enable bool `json:"enable"`
SkipVerify bool `json:"skipVerify"`
} `json:"tls"`
}
// Validate NSQArgs fields
func (n NSQArgs) Validate() error {
if !n.Enable {
return nil
}
if n.NSQDAddress.IsEmpty() {
return errors.New("empty nsqdAddress")
}
if n.Topic == "" {
return errors.New("empty topic")
}
return nil
}
// NSQTarget - NSQ target.
type NSQTarget struct {
id event.TargetID
args NSQArgs
producer *nsq.Producer
}
// ID - returns target ID.
func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to NSQD.
func (target *NSQTarget) Send(eventData event.Event) (err error) {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
err = target.producer.Publish(target.args.Topic, data)
return err
}
// Close - closes underneath connections to NSQD server.
func (target *NSQTarget) Close() (err error) {
// this blocks until complete:
target.producer.Stop()
return nil
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs) (*NSQTarget, error) {
config := nsq.NewConfig()
if args.TLS.Enable {
config.TlsV1 = true
config.TlsConfig = &tls.Config{
InsecureSkipVerify: args.TLS.SkipVerify,
}
}
producer, err := nsq.NewProducer(args.NSQDAddress.String(), config)
if err != nil {
return nil, err
}
return &NSQTarget{
id: event.TargetID{id, "nsq"},
args: args,
producer: producer,
}, nil
}

@ -0,0 +1,157 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"reflect"
"testing"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/net"
xnet "github.com/minio/minio/pkg/net"
"github.com/nsqio/go-nsq"
)
func TestNewNSQTarget(t *testing.T) {
type args struct {
id string
args NSQArgs
}
tests := []struct {
name string
args args
want *NSQTarget
wantErr bool
}{
{
name: "test1",
args: args{
id: "id",
args: NSQArgs{
Enable: true,
Topic: "",
TLS: struct {
Enable bool `json:"enable"`
SkipVerify bool `json:"skipVerify"`
}{true, true},
},
},
want: &NSQTarget{
id: event.TargetID{ID: "id", Name: "nsq"},
args: NSQArgs{
Enable: true,
NSQDAddress: net.Host{},
Topic: "",
TLS: struct {
Enable bool `json:"enable"`
SkipVerify bool `json:"skipVerify"`
}{true, true},
},
producer: &nsq.Producer{},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewNSQTarget(tt.args.id, tt.args.args)
// dirty hack, otherwhise cannot compare the pointers:
tt.want.producer = got.producer
if (err != nil) != tt.wantErr {
t.Errorf("NewNSQTarget() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewNSQTarget() = %v, want %v", got, tt.want)
}
})
}
}
func TestNSQArgs_Validate(t *testing.T) {
type fields struct {
Enable bool
NSQDAddress xnet.Host
Topic string
TLS struct {
Enable bool
SkipVerify bool
}
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "test1_missing_topic",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{
Name: "127.0.0.1",
Port: 4150,
IsPortSet: true,
},
Topic: "",
},
wantErr: true,
},
{
name: "test2_disabled",
fields: fields{
Enable: false,
NSQDAddress: xnet.Host{},
Topic: "topic",
},
wantErr: false,
},
{
name: "test3_OK",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{
Name: "127.0.0.1",
Port: 4150,
IsPortSet: true,
},
Topic: "topic",
},
wantErr: false,
},
{
name: "test4_emptynsqdaddr",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{},
Topic: "topic",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := NSQArgs{
Enable: tt.fields.Enable,
NSQDAddress: tt.fields.NSQDAddress,
Topic: tt.fields.Topic,
}
if err := n.Validate(); (err != nil) != tt.wantErr {
t.Errorf("NSQArgs.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

@ -0,0 +1,256 @@
## go-nsq Change Log
### 1.0.7 - 2017-08-04
**Upgrading from 1.0.6**: There are no backward incompatible changes.
* #97/#209 - consumer: retry nsqlookupd queries
* #179/#208 - consumer: redistribute RDY when connections are active
* #184/#201 - producer: fix misleading Stop() EOF (thanks @mengskysama)
* #203 - switch to golang/snappy (addressing potential snappy related deadlocks)
* #202 - consumer: fix backoff logging
### 1.0.6 - 2016-06-04
**Upgrading from 1.0.5**: There are no backward incompatible changes.
* #175 - consumer: reduce garbage generation in DecodeMessage (thanks @Dieterbe)
* #162 - producer: support `DeferredPublish` (thanks @DanielHeckrath)
### 1.0.5 - 2015-09-19
**Upgrading from 1.0.4**: There are no backward incompatible changes.
* #156 - consumer: prevent data race on RNG
* #155 - config: support `flag.Value` interface
* #147/#150 - consumer: fix application of `max_backoff_duration` (thanks @judwhite)
* #138 - fix lint, vet, fmt issues
* #137 - remove `go-simplejson` dependency
### 1.0.4 - 2015-04-07
**Upgrading from 1.0.3**: There are no backward incompatible changes.
* #133 - fix `ErrNotConnected` race during `Producer` connection (thanks @jeddenlea)
* #132 - fix `RDY` redistribution after backoff with no connections
* #128 - fix backoff stall when using `RequeueWithoutBackoff`
* #127 - fix handling of connection closing when resuming after backoff (thanks @jnewmano)
* #126 - allow `BackoffStrategy` to be set via flag (thanks @twmb)
* #125 - add pluggable consumer `BackoffStrategy`; add full-jitter strategy (thanks @hden)
* #124 - add `DialTimeout` and `LocalAddr` config (thanks @yashkin)
* #119 - add `Producer.Ping()` method (thanks @zulily)
* #122 - refactor log level string handling
* #120 - fix `Message` data races on `responded`
* #114 - fix lookupd jitter having no effect (thanks @judwhite)
### 1.0.3 - 2015-02-07
**Upgrading from 1.0.2**: There are no backward incompatible changes.
* #104 - fix reconnect address bug (thanks @ryanslade)
* #106 - fix backoff reconnect deadlock (thanks @ryanslade)
* #107 - fix out-of-bounds error when removing nsqlookupd addresses (thanks @andreas)
* #108 - fix potential logger race conditions (thanks @judwhite)
* #111 - fix resolved address error in reconnect loop (thanks @twmb)
### 1.0.2 - 2015-01-21
**Upgrading from 1.0.1**: There are no backward incompatible changes.
* #102 - TLS min/max config defaults (thanks @twmb)
* #99 - fix `Consumer.Stop()` race and `Producer.Stop()` deadlock (thanks @tylertreat)
* #92 - expose `Message.NSQDAddress`
* #95 - cleanup panic during `Consumer.Stop()` if handlers are deadlocked
* #98 - add `tls-min-version` option (thanks @twmb)
* #93 - expose a way to get `Consumer` runtime stats (thanks @dcarney)
* #94 - allow `#ephemeral` topic names (thanks @jamesgroat)
### 1.0.1 - 2014-11-09
**Upgrading from 1.0.0**: There are no backward incompatible changes functionally, however this
release no longer compiles with Go `1.0.x`.
* #89 - don't spam connection teardown cleanup messages
* #91 - add consumer `DisconnectFrom*`
* #87 - allow `heartbeat_interval` and `output_buffer_timeout` to be disabled
* #86 - pluggable `nsqlookupd` behaviors
* #83 - send `RDY` before `FIN`/`REQ` (forwards compatibility with nsqio/nsq#404)
* #82 - fix panic when conn isn't assigned
* #75/#76 - minor config related bug fixes
* #75/#77/#78 - add `tls-cert` and `tls-key` config options
### 1.0.0 - 2014-08-11
**Upgrading from 0.3.7**: The public API was significantly refactored and is not backwards
compatible, please read [UPGRADING](UPGRADING.md).
* #58 - support `IDENTIFY` `msg_timeout`
* #54 - per-connection TLS config and set `ServerName`
* #49 - add common connect helpers
* #43/#63 - more flexible `nsqlookupd` URL specification
* #35 - `AUTH` support
* #41/#62 - use package private RNG
* #36 - support 64 character topic/channel names
* #30/#38/#39/#42/#45/#46/#48/#51/#52/#65/#70 - refactor public API (see [UPGRADING](UPGRADING.md))
### 0.3.7 - 2014-05-25
**Upgrading from 0.3.6**: There are no backward incompatible changes. **THIS IS THE LAST STABLE
RELEASE PROVIDING THIS API**. Future releases will be based on the api in #30 and **will not be
backwards compatible!**
This is a bug fix release relating to the refactoring done in `0.3.6`.
* #32 - fix potential panic for race condition when # conns == 0
* #33/#34 - more granular connection locking
### 0.3.6 - 2014-04-29
**Upgrading from 0.3.5**: There are no backward incompatible changes.
This release includes a significant internal refactoring, designed
to better encapsulate responsibility, see #19.
Specifically:
* make `Conn` public
* move transport responsibilities into `Conn` from `Reader`/`Writer`
* supply callbacks for hooking into `Conn` events
As part of the refactoring, a few additional clean exit related
issues were resolved:
* wait group now includes all exit related goroutines
* ensure that readLoop exits before exiting cleanup
* always check messagesInFlight at readLoop exit
* close underlying connection last
### 0.3.5 - 2014-04-05
**Upgrading from 0.3.4**: There are no backward incompatible changes.
This release includes a few new features such as support for channel
sampling and sending along a user agent string (which is now displayed
in `nsqadmin`).
Also, a critical bug fix for potential deadlocks (thanks @kjk
for reporting and help testing).
New Features/Improvements:
* #27 - reader logs disambiguate topic/channel
* #22 - channel sampling
* #23 - user agent
Bug Fixes:
* #24 - fix racey reader IDENTIFY buffering
* #29 - fix recursive RLock deadlocks
### 0.3.4 - 2013-11-19
**Upgrading from 0.3.3**: There are no backward incompatible changes.
This is a bug fix release, notably potential deadlocks in `Message.Requeue()` and `Message.Touch()`
as well as a potential busy loop cleaning up closed connections with in-flight messages.
New Features/Improvements:
* #14 - add `Reader.Configure()`
* #18 - return an exported error when an `nsqlookupd` address is already configured
Bug Fixes:
* #15 - dont let `handleError()` loop if already connected
* #17 - resolve potential deadlocks on `Message` responders
* #16 - eliminate busy loop when draining `finishedMessages`
### 0.3.3 - 2013-10-21
**Upgrading from 0.3.2**: This release requires NSQ binary version `0.2.23+` for compression
support.
This release contains significant `Reader` refactoring of the RDY handling code paths. The
motivation is documented in #1 however the commits in #8 identify individual changes. Additionally,
we eliminated deadlocks during connection cleanup in `Writer`.
As a result, both user-facing APIs should now be considerably more robust and stable. Additionally,
`Reader` should behave better when backing off.
New Features/Improvements:
* #9 - ability to ignore publish responses in `Writer`
* #12 - `Requeue()` method on `Message`
* #6 - `Touch()` method on `Message`
* #4 - snappy/deflate feature negotiation
Bug Fixes:
* #8 - `Reader` RDY handling refactoring (race conditions, deadlocks, consolidation)
* #13 - fix `Writer` deadlocks
* #10 - stop accessing simplejson internals
* #5 - fix `max-in-flight` race condition
### 0.3.2 - 2013-08-26
**Upgrading from 0.3.1**: This release requires NSQ binary version `0.2.22+` for TLS support.
New Features/Improvements:
* #227 - TLS feature negotiation
* #164/#202/#255 - add `Writer`
* #186 - `MaxBackoffDuration` of `0` disables backoff
* #175 - support for `nsqd` config option `--max-rdy-count`
* #169 - auto-reconnect to hard-coded `nsqd`
Bug Fixes:
* #254/#256/#257 - new connection RDY starvation
* #250 - `nsqlookupd` polling improvements
* #243 - limit `IsStarved()` to connections w/ in-flight messages
* #169 - use last RDY count for `IsStarved()`; redistribute RDY state
* #204 - fix early termination blocking
* #177 - support `broadcast_address`
* #161 - connection pool goroutine safety
### 0.3.1 - 2013-02-07
**Upgrading from 0.3.0**: This release requires NSQ binary version `0.2.17+` for `TOUCH` support.
* #119 - add TOUCH command
* #133 - improved handling of errors/magic
* #127 - send IDENTIFY (missed in #90)
* #16 - add backoff to Reader
### 0.3.0 - 2013-01-07
**Upgrading from 0.2.4**: There are no backward incompatible changes to applications
written against the public `nsq.Reader` API.
However, there *are* a few backward incompatible changes to the API for applications that
directly use other public methods, or properties of a few NSQ data types:
`nsq.Message` IDs are now a type `nsq.MessageID` (a `[16]byte` array). The signatures of
`nsq.Finish()` and `nsq.Requeue()` reflect this change.
`nsq.SendCommand()` and `nsq.Frame()` were removed in favor of `nsq.SendFramedResponse()`.
`nsq.Subscribe()` no longer accepts `shortId` and `longId`. If upgrading your consumers
before upgrading your `nsqd` binaries to `0.2.16-rc.1` they will not be able to send the
optional custom identifiers.
* #90 performance optimizations
* #81 reader performance improvements / MPUB support
### 0.2.4 - 2012-10-15
* #69 added IsStarved() to reader API
### 0.2.3 - 2012-10-11
* #64 timeouts on reader queries to lookupd
* #54 fix crash issue with reader cleaning up from unexpectedly closed nsqd connections
### 0.2.2 - 2012-10-09
* Initial public release

@ -0,0 +1,14 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/golang/snappy"
packages = ["."]
revision = "d9eb7a3d35ec988b8585d4a0068e462c27d28380"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "eeb528388b932e0b4fd05a24243101a7d731edd504d0c247c620f9d949a29318"
solver-name = "gps-cdcl"
solver-version = 1

@ -0,0 +1,26 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
name = "github.com/golang/snappy"
revision = "d9eb7a3d35ec988b8585d4a0068e462c27d28380"

@ -0,0 +1,17 @@
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

@ -0,0 +1,19 @@
## go-nsq
[![Build Status](https://secure.travis-ci.org/nsqio/go-nsq.svg?branch=master)][travis] [![GoDoc](https://godoc.org/github.com/nsqio/go-nsq?status.svg)](https://godoc.org/github.com/nsqio/go-nsq) [![GitHub release](https://img.shields.io/github/release/nsqio/go-nsq.svg)](https://github.com/nsqio/go-nsq/releases/latest)
The official Go package for [NSQ][nsq].
### Docs
See [godoc][nsq_gopkgdoc] and the [main repo apps][apps] directory for examples of clients built
using this package.
### Tests
Tests are run via `./test.sh` (which requires `nsqd` and `nsqlookupd` to be installed).
[nsq]: https://github.com/nsqio/nsq
[nsq_gopkgdoc]: http://godoc.org/github.com/nsqio/go-nsq
[apps]: https://github.com/nsqio/nsq/tree/master/apps
[travis]: http://travis-ci.org/nsqio/go-nsq

@ -0,0 +1,180 @@
This outlines the backwards incompatible changes that were made to the public API after the
`v0.3.7` stable release, and and how to migrate existing legacy codebases.
#### Background
The original `go-nsq` codebase is some of our earliest Go code, and one of our first attempts at a
public Go library.
We've learned a lot over the last 2 years and we wanted `go-nsq` to reflect the experiences we've
had working with the library as well as the general Go conventions and best practices we picked up
along the way.
The diff can be seen via: https://github.com/nsqio/go-nsq/compare/v0.3.7...HEAD
The bulk of the refactoring came via: https://github.com/nsqio/go-nsq/pull/30
#### Naming
Previously, the high-level types we exposed were named `nsq.Reader` and `nsq.Writer`. These
reflected internal naming conventions we had used at bitly for some time but conflated semantics
with what a typical Go developer would expect (they obviously did not implement `io.Reader` and
`io.Writer`).
We renamed these types to `nsq.Consumer` and `nsq.Producer`, which more effectively communicate
their purpose and is consistent with the NSQ documentation.
#### Configuration
In the previous API there were inconsistent and confusing ways to configure your clients.
Now, configuration is performed *before* creating an `nsq.Consumer` or `nsq.Producer` by creating
an `nsq.Config` struct. The only valid way to do this is via `nsq.NewConfig` (i.e. using a struct
literal will panic due to invalid internal state).
The `nsq.Config` struct has exported variables that can be set directly in a type-safe manner. You
can also call `cfg.Validate()` to check that the values are correct and within range.
`nsq.Config` also exposes a convenient helper method `Set(k string, v interface{})` that can set
options by *coercing* the supplied `interface{}` value.
This is incredibly convenient if you're reading options from a config file or in a serialized
format that does not exactly match the native types.
It is both flexible and forgiving.
#### Improving the nsq.Handler interface
`go-nsq` attempts to make writing the common use case consumer incredibly easy.
You specify a type that implements the `nsq.Handler` interface, the interface method is called per
message, and the return value of said method indicates to the library what the response to `nsqd`
should be (`FIN` or `REQ`), all the while managing flow control and backoff.
However, more advanced use cases require the ability to respond to a message *later*
("asynchronously", if you will). Our original API provided a *second* message handler interface
called `nsq.AsyncHandler`.
Unfortunately, it was never obvious from the name alone (or even the documentation) how to properly
use this form. The API was needlessly complex, involving the garbage creation of wrapping structs
to track state and respond to messages.
We originally had the same problem in `pynsq`, our Python client library, and we were able to
resolve the tension and expose an API that was robust and supported all use cases.
The new `go-nsq` message handler interface exposes only `nsq.Handler`, and its `HandleMessage`
method remains identical (specifically, `nsq.AsyncHandler` has been removed).
Additionally, the API to configure handlers has been improved to provide better first-class support
for common operations. We've added `AddConcurrentHandlers` (for quickly spawning multiple handler
goroutines).
For the most common use case, where you want `go-nsq` to respond to messages on your behalf, there
are no changes required! In fact, we've made it even easier to implement the `nsq.Handler`
interface for simple functions by providing the `nsq.HandlerFunc` type (in the spirit of the Go
standard library's `http.HandlerFunc`):
```go
r, err := nsq.NewConsumer("test_topic", "test_channel", nsq.NewConfig())
if err != nil {
log.Fatalf(err.Error())
}
r.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
return doSomeWork(m)
})
err := r.ConnectToNSQD(nsqdAddr)
if err != nil {
log.Fatalf(err.Error())
}
<-r.StopChan
```
In the new API, we've made the `nsq.Message` struct more robust, giving it the ability to proxy
responses. If you want to usurp control of the message from `go-nsq`, you simply call
`msg.DisableAutoResponse()`.
This is effectively the same as if you had used `nsq.AsyncHandler`, only you don't need to manage
`nsq.FinishedMessage` structs or implement a separate interface. Instead you just keep/pass
references to the `nsq.Message` itself, and when you're ready to respond you call `msg.Finish()`,
`msg.Requeue(<duration>)` or `msg.Touch(<duration>)`. Additionally, this means you can make this
decision on a *per-message* basis rather than for the lifetime of the handler.
Here is an example:
```go
type myHandler struct {}
func (h *myHandler) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
workerChan <- m
return nil
}
go func() {
for m := range workerChan {
err := doSomeWork(m)
if err != nil {
m.Requeue(-1)
continue
}
m.Finish()
}
}()
cfg := nsq.NewConfig()
cfg.MaxInFlight = 1000
r, err := nsq.NewConsumer("test_topic", "test_channel", cfg)
if err != nil {
log.Fatalf(err.Error())
}
r.AddConcurrentHandlers(&myHandler{}, 20)
err := r.ConnectToNSQD(nsqdAddr)
if err != nil {
log.Fatalf(err.Error())
}
<-r.StopChan
```
#### Requeue without backoff
As a side effect of the message handler restructuring above, it is now trivial to respond to a
message without triggering a backoff state in `nsq.Consumer` (which was not possible in the
previous API).
The `nsq.Message` type now has a `msg.RequeueWithoutBackoff()` method for this purpose.
#### Producer Error Handling
Previously, `Writer` (now `Producer`) returned a triplicate of `frameType`, `responseBody`, and
`error` from calls to `*Publish`.
This required the caller to check both `error` and `frameType` to confirm success. `Producer`
publish methods now return only `error`.
#### Logging
One of the challenges library implementors face is how to provide feedback via logging, while
exposing an interface that follows the standard library and still provides a means to control and
configure the output.
In the new API, we've provided a method on `Consumer` and `Producer` called `SetLogger` that takes
an interface compatible with the Go standard library `log.Logger` (which can be instantiated via
`log.NewLogger`) and a traditional log level integer `nsq.LogLevel{Debug,Info,Warning,Error}`:
Output(maxdepth int, s string) error
This gives the user the flexibility to control the format, destination, and verbosity while still
conforming to standard library logging conventions.
#### Misc.
Un-exported `NewDeadlineTransport` and `ApiRequest`, which never should have been exported in the
first place.
`nsq.Message` serialization switched away from `binary.{Read,Write}` for performance and
`nsq.Message` now implements the `io.WriterTo` interface.

@ -0,0 +1,91 @@
package nsq
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"time"
)
type deadlinedConn struct {
Timeout time.Duration
net.Conn
}
func (c *deadlinedConn) Read(b []byte) (n int, err error) {
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
return c.Conn.Read(b)
}
func (c *deadlinedConn) Write(b []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
return c.Conn.Write(b)
}
func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
},
}
return transport
}
type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}
// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return err
}
req.Header.Add("Accept", "application/vnd.nsq; version=1.0")
resp, err := httpclient.Do(req)
if err != nil {
return err
}
respBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("got response %s %q", resp.Status, respBody)
}
if len(respBody) == 0 {
respBody = []byte("{}")
}
if resp.Header.Get("X-NSQ-Content-Type") == "nsq; version=1.0" {
return json.Unmarshal(respBody, ret)
}
wResp := &wrappedResp{
Data: ret,
}
if err = json.Unmarshal(respBody, wResp); err != nil {
return err
}
// wResp.StatusCode here is equal to resp.StatusCode, so ignore it
return nil
}

@ -0,0 +1,221 @@
package nsq
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"strconv"
"time"
)
var byteSpace = []byte(" ")
var byteNewLine = []byte("\n")
// Command represents a command from a client to an NSQ daemon
type Command struct {
Name []byte
Params [][]byte
Body []byte
}
// String returns the name and parameters of the Command
func (c *Command) String() string {
if len(c.Params) > 0 {
return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, byteSpace)))
}
return string(c.Name)
}
// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {
var total int64
var buf [4]byte
n, err := w.Write(c.Name)
total += int64(n)
if err != nil {
return total, err
}
for _, param := range c.Params {
n, err := w.Write(byteSpace)
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(param)
total += int64(n)
if err != nil {
return total, err
}
}
n, err = w.Write(byteNewLine)
total += int64(n)
if err != nil {
return total, err
}
if c.Body != nil {
bufs := buf[:]
binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))
n, err := w.Write(bufs)
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(c.Body)
total += int64(n)
if err != nil {
return total, err
}
}
return total, nil
}
// Identify creates a new Command to provide information about the client. After connecting,
// it is generally the first message sent.
//
// The supplied map is marshaled into JSON to provide some flexibility
// for this command to evolve over time.
//
// See http://nsq.io/clients/tcp_protocol_spec.html#identify for information
// on the supported options
func Identify(js map[string]interface{}) (*Command, error) {
body, err := json.Marshal(js)
if err != nil {
return nil, err
}
return &Command{[]byte("IDENTIFY"), nil, body}, nil
}
// Auth sends credentials for authentication
//
// After `Identify`, this is usually the first message sent, if auth is used.
func Auth(secret string) (*Command, error) {
return &Command{[]byte("AUTH"), nil, []byte(secret)}, nil
}
// Register creates a new Command to add a topic/channel for the connected nsqd
func Register(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{[]byte("REGISTER"), params, nil}
}
// UnRegister creates a new Command to remove a topic/channel for the connected nsqd
func UnRegister(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{[]byte("UNREGISTER"), params, nil}
}
// Ping creates a new Command to keep-alive the state of all the
// announced topic/channels for a given client
func Ping() *Command {
return &Command{[]byte("PING"), nil, nil}
}
// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
var params = [][]byte{[]byte(topic)}
return &Command{[]byte("PUB"), params, body}
}
// DeferredPublish creates a new Command to write a message to a given topic
// where the message will queue at the channel level until the timeout expires
func DeferredPublish(topic string, delay time.Duration, body []byte) *Command {
var params = [][]byte{[]byte(topic), []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("DPUB"), params, body}
}
// MultiPublish creates a new Command to write more than one message to a given topic
// (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
var params = [][]byte{[]byte(topic)}
num := uint32(len(bodies))
bodySize := 4
for _, b := range bodies {
bodySize += len(b) + 4
}
body := make([]byte, 0, bodySize)
buf := bytes.NewBuffer(body)
err := binary.Write(buf, binary.BigEndian, &num)
if err != nil {
return nil, err
}
for _, b := range bodies {
err = binary.Write(buf, binary.BigEndian, int32(len(b)))
if err != nil {
return nil, err
}
_, err = buf.Write(b)
if err != nil {
return nil, err
}
}
return &Command{[]byte("MPUB"), params, buf.Bytes()}, nil
}
// Subscribe creates a new Command to subscribe to the given topic/channel
func Subscribe(topic string, channel string) *Command {
var params = [][]byte{[]byte(topic), []byte(channel)}
return &Command{[]byte("SUB"), params, nil}
}
// Ready creates a new Command to specify
// the number of messages a client is willing to receive
func Ready(count int) *Command {
var params = [][]byte{[]byte(strconv.Itoa(count))}
return &Command{[]byte("RDY"), params, nil}
}
// Finish creates a new Command to indiciate that
// a given message (by id) has been processed successfully
func Finish(id MessageID) *Command {
var params = [][]byte{id[:]}
return &Command{[]byte("FIN"), params, nil}
}
// Requeue creates a new Command to indicate that
// a given message (by id) should be requeued after the given delay
// NOTE: a delay of 0 indicates immediate requeue
func Requeue(id MessageID, delay time.Duration) *Command {
var params = [][]byte{id[:], []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("REQ"), params, nil}
}
// Touch creates a new Command to reset the timeout for
// a given message (by id)
func Touch(id MessageID) *Command {
var params = [][]byte{id[:]}
return &Command{[]byte("TOUCH"), params, nil}
}
// StartClose creates a new Command to indicate that the
// client would like to start a close cycle. nsqd will no longer
// send messages to a client in this state and the client is expected
// finish pending messages and close the connection
func StartClose() *Command {
return &Command{[]byte("CLS"), nil, nil}
}
// Nop creates a new Command that has no effect server side.
// Commonly used to respond to heartbeats
func Nop() *Command {
return &Command{[]byte("NOP"), nil, nil}
}

@ -0,0 +1,671 @@
package nsq
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)
// Define handlers for setting config defaults, and setting config values from command line arguments or config files
type configHandler interface {
HandlesOption(c *Config, option string) bool
Set(c *Config, option string, value interface{}) error
Validate(c *Config) error
}
type defaultsHandler interface {
SetDefaults(c *Config) error
}
// BackoffStrategy defines a strategy for calculating the duration of time
// a consumer should backoff for a given attempt
type BackoffStrategy interface {
Calculate(attempt int) time.Duration
}
// ExponentialStrategy implements an exponential backoff strategy (default)
type ExponentialStrategy struct {
cfg *Config
}
// Calculate returns a duration of time: 2 ^ attempt
func (s *ExponentialStrategy) Calculate(attempt int) time.Duration {
backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
return backoffDuration
}
func (s *ExponentialStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}
// FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
type FullJitterStrategy struct {
cfg *Config
rngOnce sync.Once
rng *rand.Rand
}
// Calculate returns a random duration of time [0, 2 ^ attempt]
func (s *FullJitterStrategy) Calculate(attempt int) time.Duration {
// lazily initialize the RNG
s.rngOnce.Do(func() {
if s.rng != nil {
return
}
s.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
})
backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
return time.Duration(s.rng.Intn(int(backoffDuration)))
}
func (s *FullJitterStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}
// Config is a struct of NSQ options
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no
// longer mutable (they are copied).
//
// Use Set(option string, value interface{}) as an alternate way to set parameters
type Config struct {
initialized bool
// used to Initialize, Validate
configHandlers []configHandler
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// LocalAddr is the local address to use when dialing an nsqd.
// If empty, a local address is automatically chosen.
LocalAddr net.Addr `opt:"local_addr"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Duration to wait for a message from an nsqd when in a state where RDY
// counts are re-distributed (e.g. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
// To set TLS config, use the following options:
//
// tls_v1 - Bool enable TLS negotiation
// tls_root_ca_file - String path to file containing root CA
// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
// tls_cert - String path to file containing public key for certificate
// tls_key - String path to file containing private key for certificate
// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
//
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`
// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}
// NewConfig returns a new default nsq configuration.
//
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func NewConfig() *Config {
c := &Config{
configHandlers: []configHandler{&structTagsConfig{}, &tlsConfig{}},
initialized: true,
}
if err := c.setDefaults(); err != nil {
panic(err.Error())
}
return c
}
// Set takes an option as a string and a value as an interface and
// attempts to set the appropriate configuration option.
//
// It attempts to coerce the value into the right format depending on the named
// option and the underlying type of the value passed in.
//
// Calls to Set() that take a time.Duration as an argument can be input as:
//
// "1000ms" (a string parsed by time.ParseDuration())
// 1000 (an integer interpreted as milliseconds)
// 1000*time.Millisecond (a literal time.Duration value)
//
// Calls to Set() that take bool can be input as:
//
// "true" (a string parsed by strconv.ParseBool())
// true (a boolean)
// 1 (an int where 1 == true and 0 == false)
//
// It returns an error for an invalid option or value.
func (c *Config) Set(option string, value interface{}) error {
c.assertInitialized()
option = strings.Replace(option, "-", "_", -1)
for _, h := range c.configHandlers {
if h.HandlesOption(c, option) {
return h.Set(c, option, value)
}
}
return fmt.Errorf("invalid option %s", option)
}
func (c *Config) assertInitialized() {
if !c.initialized {
panic("Config{} must be created with NewConfig()")
}
}
// Validate checks that all values are within specified min/max ranges
func (c *Config) Validate() error {
c.assertInitialized()
for _, h := range c.configHandlers {
if err := h.Validate(c); err != nil {
return err
}
}
return nil
}
func (c *Config) setDefaults() error {
for _, h := range c.configHandlers {
hh, ok := h.(defaultsHandler)
if ok {
if err := hh.SetDefaults(c); err != nil {
return err
}
}
}
return nil
}
type structTagsConfig struct{}
// Handle options that are listed in StructTags
func (h *structTagsConfig) HandlesOption(c *Config, option string) bool {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
if opt == option {
return true
}
}
return false
}
// Set values based on parameters in StructTags
func (h *structTagsConfig) Set(c *Config, option string, value interface{}) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
if option != opt {
continue
}
min := field.Tag.Get("min")
max := field.Tag.Get("max")
fieldVal := val.FieldByName(field.Name)
dest := unsafeValueOf(fieldVal)
coercedVal, err := coerce(value, field.Type)
if err != nil {
return fmt.Errorf("failed to coerce option %s (%v) - %s",
option, value, err)
}
if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(coercedVal, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
option, coercedVal.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(coercedVal, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
option, coercedVal.Interface(), coercedMaxVal.Interface())
}
}
if coercedVal.Type().String() == "nsq.BackoffStrategy" {
v := coercedVal.Interface().(BackoffStrategy)
if v, ok := v.(interface {
setConfig(*Config)
}); ok {
v.setConfig(c)
}
}
dest.Set(coercedVal)
return nil
}
return fmt.Errorf("unknown option %s", option)
}
func (h *structTagsConfig) SetDefaults(c *Config) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
defaultVal := field.Tag.Get("default")
if defaultVal == "" || opt == "" {
continue
}
if err := c.Set(opt, defaultVal); err != nil {
return err
}
}
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
c.ClientID = strings.Split(hostname, ".")[0]
c.Hostname = hostname
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)
return nil
}
func (h *structTagsConfig) Validate(c *Config) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
min := field.Tag.Get("min")
max := field.Tag.Get("max")
if min == "" && max == "" {
continue
}
value := val.FieldByName(field.Name)
if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(value, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
field.Name, value.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(value, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
field.Name, value.Interface(), coercedMaxVal.Interface())
}
}
}
if c.HeartbeatInterval > c.ReadTimeout {
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
}
return nil
}
// Parsing for higher order TLS settings
type tlsConfig struct {
certFile string
keyFile string
}
func (t *tlsConfig) HandlesOption(c *Config, option string) bool {
switch option {
case "tls_root_ca_file", "tls_insecure_skip_verify", "tls_cert", "tls_key", "tls_min_version":
return true
}
return false
}
func (t *tlsConfig) Set(c *Config, option string, value interface{}) error {
if c.TlsConfig == nil {
c.TlsConfig = &tls.Config{
MinVersion: tls.VersionTLS10,
MaxVersion: tls.VersionTLS12, // enable TLS_FALLBACK_SCSV prior to Go 1.5: https://go-review.googlesource.com/#/c/1776/
}
}
val := reflect.ValueOf(c.TlsConfig).Elem()
switch option {
case "tls_cert", "tls_key":
if option == "tls_cert" {
t.certFile = value.(string)
} else {
t.keyFile = value.(string)
}
if t.certFile != "" && t.keyFile != "" && len(c.TlsConfig.Certificates) == 0 {
cert, err := tls.LoadX509KeyPair(t.certFile, t.keyFile)
if err != nil {
return err
}
c.TlsConfig.Certificates = []tls.Certificate{cert}
}
return nil
case "tls_root_ca_file":
filename, ok := value.(string)
if !ok {
return fmt.Errorf("ERROR: %v is not a string", value)
}
tlsCertPool := x509.NewCertPool()
caCertFile, err := ioutil.ReadFile(filename)
if err != nil {
return fmt.Errorf("ERROR: failed to read custom Certificate Authority file %s", err)
}
if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
return fmt.Errorf("ERROR: failed to append certificates from Certificate Authority file")
}
c.TlsConfig.RootCAs = tlsCertPool
return nil
case "tls_insecure_skip_verify":
fieldVal := val.FieldByName("InsecureSkipVerify")
dest := unsafeValueOf(fieldVal)
coercedVal, err := coerce(value, fieldVal.Type())
if err != nil {
return fmt.Errorf("failed to coerce option %s (%v) - %s",
option, value, err)
}
dest.Set(coercedVal)
return nil
case "tls_min_version":
version, ok := value.(string)
if !ok {
return fmt.Errorf("ERROR: %v is not a string", value)
}
switch version {
case "ssl3.0":
c.TlsConfig.MinVersion = tls.VersionSSL30
case "tls1.0":
c.TlsConfig.MinVersion = tls.VersionTLS10
case "tls1.1":
c.TlsConfig.MinVersion = tls.VersionTLS11
case "tls1.2":
c.TlsConfig.MinVersion = tls.VersionTLS12
default:
return fmt.Errorf("ERROR: %v is not a tls version", value)
}
return nil
}
return fmt.Errorf("unknown option %s", option)
}
func (t *tlsConfig) Validate(c *Config) error {
return nil
}
// because Config contains private structs we can't use reflect.Value
// directly, instead we need to "unsafely" address the variable
func unsafeValueOf(val reflect.Value) reflect.Value {
uptr := unsafe.Pointer(val.UnsafeAddr())
return reflect.NewAt(val.Type(), uptr).Elem()
}
func valueCompare(v1 reflect.Value, v2 reflect.Value) int {
switch v1.Type().String() {
case "int", "int16", "int32", "int64":
if v1.Int() > v2.Int() {
return 1
} else if v1.Int() < v2.Int() {
return -1
}
return 0
case "uint", "uint16", "uint32", "uint64":
if v1.Uint() > v2.Uint() {
return 1
} else if v1.Uint() < v2.Uint() {
return -1
}
return 0
case "float32", "float64":
if v1.Float() > v2.Float() {
return 1
} else if v1.Float() < v2.Float() {
return -1
}
return 0
case "time.Duration":
if v1.Interface().(time.Duration) > v2.Interface().(time.Duration) {
return 1
} else if v1.Interface().(time.Duration) < v2.Interface().(time.Duration) {
return -1
}
return 0
}
panic("impossible")
}
func coerce(v interface{}, typ reflect.Type) (reflect.Value, error) {
var err error
if typ.Kind() == reflect.Ptr {
return reflect.ValueOf(v), nil
}
switch typ.String() {
case "string":
v, err = coerceString(v)
case "int", "int16", "int32", "int64":
v, err = coerceInt64(v)
case "uint", "uint16", "uint32", "uint64":
v, err = coerceUint64(v)
case "float32", "float64":
v, err = coerceFloat64(v)
case "bool":
v, err = coerceBool(v)
case "time.Duration":
v, err = coerceDuration(v)
case "net.Addr":
v, err = coerceAddr(v)
case "nsq.BackoffStrategy":
v, err = coerceBackoffStrategy(v)
default:
v = nil
err = fmt.Errorf("invalid type %s", typ.String())
}
return valueTypeCoerce(v, typ), err
}
func valueTypeCoerce(v interface{}, typ reflect.Type) reflect.Value {
val := reflect.ValueOf(v)
if reflect.TypeOf(v) == typ {
return val
}
tval := reflect.New(typ).Elem()
switch typ.String() {
case "int", "int16", "int32", "int64":
tval.SetInt(val.Int())
case "uint", "uint16", "uint32", "uint64":
tval.SetUint(val.Uint())
case "float32", "float64":
tval.SetFloat(val.Float())
default:
tval.Set(val)
}
return tval
}
func coerceString(v interface{}) (string, error) {
switch v := v.(type) {
case string:
return v, nil
case int, int16, int32, int64, uint, uint16, uint32, uint64:
return fmt.Sprintf("%d", v), nil
case float32, float64:
return fmt.Sprintf("%f", v), nil
}
return fmt.Sprintf("%s", v), nil
}
func coerceDuration(v interface{}) (time.Duration, error) {
switch v := v.(type) {
case string:
return time.ParseDuration(v)
case int, int16, int32, int64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Int()) * time.Millisecond, nil
case uint, uint16, uint32, uint64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Uint()) * time.Millisecond, nil
case time.Duration:
return v, nil
}
return 0, errors.New("invalid value type")
}
func coerceAddr(v interface{}) (net.Addr, error) {
switch v := v.(type) {
case string:
return net.ResolveTCPAddr("tcp", v)
case net.Addr:
return v, nil
}
return nil, errors.New("invalid value type")
}
func coerceBackoffStrategy(v interface{}) (BackoffStrategy, error) {
switch v := v.(type) {
case string:
switch v {
case "", "exponential":
return &ExponentialStrategy{}, nil
case "full_jitter":
return &FullJitterStrategy{}, nil
}
case BackoffStrategy:
return v, nil
}
return nil, errors.New("invalid value type")
}
func coerceBool(v interface{}) (bool, error) {
switch v := v.(type) {
case bool:
return v, nil
case string:
return strconv.ParseBool(v)
case int, int16, int32, int64:
return reflect.ValueOf(v).Int() != 0, nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint() != 0, nil
}
return false, errors.New("invalid value type")
}
func coerceFloat64(v interface{}) (float64, error) {
switch v := v.(type) {
case string:
return strconv.ParseFloat(v, 64)
case int, int16, int32, int64:
return float64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return float64(reflect.ValueOf(v).Uint()), nil
case float32:
return float64(v), nil
case float64:
return v, nil
}
return 0, errors.New("invalid value type")
}
func coerceInt64(v interface{}) (int64, error) {
switch v := v.(type) {
case string:
return strconv.ParseInt(v, 10, 64)
case int, int16, int32, int64:
return reflect.ValueOf(v).Int(), nil
case uint, uint16, uint32, uint64:
return int64(reflect.ValueOf(v).Uint()), nil
}
return 0, errors.New("invalid value type")
}
func coerceUint64(v interface{}) (uint64, error) {
switch v := v.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int, int16, int32, int64:
return uint64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint(), nil
}
return 0, errors.New("invalid value type")
}

@ -0,0 +1,31 @@
package nsq
import (
"strings"
)
// ConfigFlag wraps a Config and implements the flag.Value interface
type ConfigFlag struct {
Config *Config
}
// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
parts := strings.SplitN(opt, ",", 2)
key := parts[0]
switch len(parts) {
case 1:
// default options specified without a value to boolean true
err = c.Config.Set(key, true)
case 2:
err = c.Config.Set(key, parts[1])
}
return
}
// String implements the flag.Value interface
func (c *ConfigFlag) String() string {
return ""
}

@ -0,0 +1,734 @@
package nsq
import (
"bufio"
"bytes"
"compress/flate"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/snappy"
)
// IdentifyResponse represents the metadata
// returned from an IDENTIFY command to nsqd
type IdentifyResponse struct {
MaxRdyCount int64 `json:"max_rdy_count"`
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
AuthRequired bool `json:"auth_required"`
}
// AuthResponse represents the metadata
// returned from an AUTH command to nsqd
type AuthResponse struct {
Identity string `json:"identity"`
IdentityUrl string `json:"identity_url"`
PermissionCount int64 `json:"permission_count"`
}
type msgResponse struct {
msg *Message
cmd *Command
success bool
backoff bool
}
// Conn represents a connection to nsqd
//
// Conn exposes a set of callbacks for the
// various events that occur on a connection
type Conn struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messagesInFlight int64
maxRdyCount int64
rdyCount int64
lastRdyCount int64
lastRdyTimestamp int64
lastMsgTimestamp int64
mtx sync.Mutex
config *Config
conn *net.TCPConn
tlsConn *tls.Conn
addr string
delegate ConnDelegate
logger logger
logLvl LogLevel
logFmt string
logGuard sync.RWMutex
r io.Reader
w io.Writer
cmdChan chan *Command
msgResponseChan chan *msgResponse
exitChan chan int
drainReady chan int
closeFlag int32
stopper sync.Once
wg sync.WaitGroup
readLoopRunning int32
}
// NewConn returns a new Conn instance
func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
if !config.initialized {
panic("Config must be created with NewConfig()")
}
return &Conn{
addr: addr,
config: config,
delegate: delegate,
maxRdyCount: 2500,
lastMsgTimestamp: time.Now().UnixNano(),
cmdChan: make(chan *Command),
msgResponseChan: make(chan *msgResponse),
exitChan: make(chan int),
drainReady: make(chan int),
}
}
// SetLogger assigns the logger to use as well as a level.
//
// The format parameter is expected to be a printf compatible string with
// a single %s argument. This is useful if you want to provide additional
// context to the log messages that the connection will print, the default
// is '(%s)'.
//
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()
c.logger = l
c.logLvl = lvl
c.logFmt = format
if c.logFmt == "" {
c.logFmt = "(%s)"
}
}
func (c *Conn) getLogger() (logger, LogLevel, string) {
c.logGuard.RLock()
defer c.logGuard.RUnlock()
return c.logger, c.logLvl, c.logFmt
}
// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
_, err = c.Write(MagicV2)
if err != nil {
c.Close()
return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
}
resp, err := c.identify()
if err != nil {
return nil, err
}
if resp != nil && resp.AuthRequired {
if c.config.AuthSecret == "" {
c.log(LogLevelError, "Auth Required")
return nil, errors.New("Auth Required")
}
err := c.auth(c.config.AuthSecret)
if err != nil {
c.log(LogLevelError, "Auth Failed %s", err)
return nil, err
}
}
c.wg.Add(2)
atomic.StoreInt32(&c.readLoopRunning, 1)
go c.readLoop()
go c.writeLoop()
return resp, nil
}
// Close idempotently initiates connection close
func (c *Conn) Close() error {
atomic.StoreInt32(&c.closeFlag, 1)
if c.conn != nil && atomic.LoadInt64(&c.messagesInFlight) == 0 {
return c.conn.CloseRead()
}
return nil
}
// IsClosing indicates whether or not the
// connection is currently in the processing of
// gracefully closing
func (c *Conn) IsClosing() bool {
return atomic.LoadInt32(&c.closeFlag) == 1
}
// RDY returns the current RDY count
func (c *Conn) RDY() int64 {
return atomic.LoadInt64(&c.rdyCount)
}
// LastRDY returns the previously set RDY count
func (c *Conn) LastRDY() int64 {
return atomic.LoadInt64(&c.lastRdyCount)
}
// SetRDY stores the specified RDY count
func (c *Conn) SetRDY(rdy int64) {
atomic.StoreInt64(&c.rdyCount, rdy)
atomic.StoreInt64(&c.lastRdyCount, rdy)
if rdy > 0 {
atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano())
}
}
// MaxRDY returns the nsqd negotiated maximum
// RDY count that it will accept for this connection
func (c *Conn) MaxRDY() int64 {
return c.maxRdyCount
}
func (c *Conn) LastRdyTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp))
}
// LastMessageTime returns a time.Time representing
// the time at which the last message was received
func (c *Conn) LastMessageTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastMsgTimestamp))
}
// RemoteAddr returns the configured destination nsqd address
func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// String returns the fully-qualified address
func (c *Conn) String() string {
return c.addr
}
// Read performs a deadlined read on the underlying TCP connection
func (c *Conn) Read(p []byte) (int, error) {
c.conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
return c.r.Read(p)
}
// Write performs a deadlined write on the underlying TCP connection
func (c *Conn) Write(p []byte) (int, error) {
c.conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout))
return c.w.Write(p)
}
// WriteCommand is a goroutine safe method to write a Command
// to this connection, and flush.
func (c *Conn) WriteCommand(cmd *Command) error {
c.mtx.Lock()
_, err := cmd.WriteTo(c)
if err != nil {
goto exit
}
err = c.Flush()
exit:
c.mtx.Unlock()
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
return err
}
type flusher interface {
Flush() error
}
// Flush writes all buffered data to the underlying TCP connection
func (c *Conn) Flush() error {
if f, ok := c.w.(flusher); ok {
return f.Flush()
}
return nil
}
func (c *Conn) identify() (*IdentifyResponse, error) {
ci := make(map[string]interface{})
ci["client_id"] = c.config.ClientID
ci["hostname"] = c.config.Hostname
ci["user_agent"] = c.config.UserAgent
ci["short_id"] = c.config.ClientID // deprecated
ci["long_id"] = c.config.Hostname // deprecated
ci["tls_v1"] = c.config.TlsV1
ci["deflate"] = c.config.Deflate
ci["deflate_level"] = c.config.DeflateLevel
ci["snappy"] = c.config.Snappy
ci["feature_negotiation"] = true
if c.config.HeartbeatInterval == -1 {
ci["heartbeat_interval"] = -1
} else {
ci["heartbeat_interval"] = int64(c.config.HeartbeatInterval / time.Millisecond)
}
ci["sample_rate"] = c.config.SampleRate
ci["output_buffer_size"] = c.config.OutputBufferSize
if c.config.OutputBufferTimeout == -1 {
ci["output_buffer_timeout"] = -1
} else {
ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond)
}
ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
cmd, err := Identify(ci)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
err = c.WriteCommand(cmd)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
if frameType == FrameTypeError {
return nil, ErrIdentify{string(data)}
}
// check to see if the server was able to respond w/ capabilities
// i.e. it was a JSON response
if data[0] != '{' {
return nil, nil
}
resp := &IdentifyResponse{}
err = json.Unmarshal(data, resp)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
c.log(LogLevelDebug, "IDENTIFY response: %+v", resp)
c.maxRdyCount = resp.MaxRdyCount
if resp.TLSv1 {
c.log(LogLevelInfo, "upgrading to TLS")
err := c.upgradeTLS(c.config.TlsConfig)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
if resp.Deflate {
c.log(LogLevelInfo, "upgrading to Deflate")
err := c.upgradeDeflate(c.config.DeflateLevel)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
if resp.Snappy {
c.log(LogLevelInfo, "upgrading to Snappy")
err := c.upgradeSnappy()
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
// now that connection is bootstrapped, enable read buffering
// (and write buffering if it's not already capable of Flush())
c.r = bufio.NewReader(c.r)
if _, ok := c.w.(flusher); !ok {
c.w = bufio.NewWriter(c.w)
}
return resp, nil
}
func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
// create a local copy of the config to set ServerName for this connection
var conf tls.Config
if tlsConf != nil {
conf = *tlsConf
}
host, _, err := net.SplitHostPort(c.addr)
if err != nil {
return err
}
conf.ServerName = host
c.tlsConn = tls.Client(c.conn, &conf)
err = c.tlsConn.Handshake()
if err != nil {
return err
}
c.r = c.tlsConn
c.w = c.tlsConn
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from TLS upgrade")
}
return nil
}
func (c *Conn) upgradeDeflate(level int) error {
conn := net.Conn(c.conn)
if c.tlsConn != nil {
conn = c.tlsConn
}
fw, _ := flate.NewWriter(conn, level)
c.r = flate.NewReader(conn)
c.w = fw
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from Deflate upgrade")
}
return nil
}
func (c *Conn) upgradeSnappy() error {
conn := net.Conn(c.conn)
if c.tlsConn != nil {
conn = c.tlsConn
}
c.r = snappy.NewReader(conn)
c.w = snappy.NewWriter(conn)
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from Snappy upgrade")
}
return nil
}
func (c *Conn) auth(secret string) error {
cmd, err := Auth(secret)
if err != nil {
return err
}
err = c.WriteCommand(cmd)
if err != nil {
return err
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType == FrameTypeError {
return errors.New("Error authenticating " + string(data))
}
resp := &AuthResponse{}
err = json.Unmarshal(data, resp)
if err != nil {
return err
}
c.log(LogLevelInfo, "Auth accepted. Identity: %q %s Permissions: %d",
resp.Identity, resp.IdentityUrl, resp.PermissionCount)
return nil
}
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
if atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
}
if !strings.Contains(err.Error(), "use of closed network connection") {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
goto exit
}
if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
c.log(LogLevelDebug, "heartbeat received")
c.delegate.OnHeartbeat(c)
err := c.WriteCommand(Nop())
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
continue
}
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg)
case FrameTypeError:
c.log(LogLevelError, "protocol error - %s", data)
c.delegate.OnError(c, data)
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
exit:
atomic.StoreInt32(&c.readLoopRunning, 0)
// start the connection close
messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
if messagesInFlight == 0 {
// if we exited readLoop with no messages in flight
// we need to explicitly trigger the close because
// writeLoop won't
c.close()
} else {
c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
}
c.wg.Done()
c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
for {
select {
case <-c.exitChan:
c.log(LogLevelInfo, "breaking out of writeLoop")
// Indicate drainReady because we will not pull any more off msgResponseChan
close(c.drainReady)
goto exit
case cmd := <-c.cmdChan:
err := c.WriteCommand(cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", cmd, err)
c.close()
continue
}
case resp := <-c.msgResponseChan:
// Decrement this here so it is correct even if we can't respond to nsqd
msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
c.delegate.OnResume(c)
} else {
c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
c.delegate.OnMessageRequeued(c, resp.msg)
if resp.backoff {
c.delegate.OnBackoff(c)
} else {
c.delegate.OnContinue(c)
}
}
err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}
if msgsInFlight == 0 &&
atomic.LoadInt32(&c.closeFlag) == 1 {
c.close()
continue
}
}
}
exit:
c.wg.Done()
c.log(LogLevelInfo, "writeLoop exiting")
}
func (c *Conn) close() {
// a "clean" connection close is orchestrated as follows:
//
// 1. CLOSE cmd sent to nsqd
// 2. CLOSE_WAIT response received from nsqd
// 3. set c.closeFlag
// 4. readLoop() exits
// a. if messages-in-flight > 0 delay close()
// i. writeLoop() continues receiving on c.msgResponseChan chan
// x. when messages-in-flight == 0 call close()
// b. else call close() immediately
// 5. c.exitChan close
// a. writeLoop() exits
// i. c.drainReady close
// 6a. launch cleanup() goroutine (we're racing with intraprocess
// routed messages, see comments below)
// a. wait on c.drainReady
// b. loop and receive on c.msgResponseChan chan
// until messages-in-flight == 0
// i. ensure that readLoop has exited
// 6b. launch waitForCleanup() goroutine
// b. wait on waitgroup (covers readLoop() and writeLoop()
// and cleanup goroutine)
// c. underlying TCP connection close
// d. trigger Delegate OnClose()
//
c.stopper.Do(func() {
c.log(LogLevelInfo, "beginning close")
close(c.exitChan)
c.conn.CloseRead()
c.wg.Add(1)
go c.cleanup()
go c.waitForCleanup()
})
}
func (c *Conn) cleanup() {
<-c.drainReady
ticker := time.NewTicker(100 * time.Millisecond)
lastWarning := time.Now()
// writeLoop has exited, drain any remaining in flight messages
for {
// we're racing with readLoop which potentially has a message
// for handling so infinitely loop until messagesInFlight == 0
// and readLoop has exited
var msgsInFlight int64
select {
case <-c.msgResponseChan:
msgsInFlight = atomic.AddInt64(&c.messagesInFlight, -1)
case <-ticker.C:
msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
}
if msgsInFlight > 0 {
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
lastWarning = time.Now()
}
continue
}
// until the readLoop has exited we cannot be sure that there
// still won't be a race
if atomic.LoadInt32(&c.readLoopRunning) == 1 {
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... readLoop still running")
lastWarning = time.Now()
}
continue
}
goto exit
}
exit:
ticker.Stop()
c.wg.Done()
c.log(LogLevelInfo, "finished draining, cleanup exiting")
}
func (c *Conn) waitForCleanup() {
// this blocks until readLoop and writeLoop
// (and cleanup goroutine above) have exited
c.wg.Wait()
c.conn.CloseWrite()
c.log(LogLevelInfo, "clean close complete")
c.delegate.OnClose(c)
}
func (c *Conn) onMessageFinish(m *Message) {
c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
}
func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
if delay == -1 {
// linear delay
delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts)
// bound the requeueDelay to configured max
if delay > c.config.MaxRequeueDelay {
delay = c.config.MaxRequeueDelay
}
}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff}
}
func (c *Conn) onMessageTouch(m *Message) {
select {
case c.cmdChan <- Touch(m.ID):
case <-c.exitChan:
}
}
func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl, logFmt := c.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("%-4s %s %s", lvl,
fmt.Sprintf(logFmt, c.String()),
fmt.Sprintf(line, args...)))
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,138 @@
package nsq
import "time"
type logger interface {
Output(calldepth int, s string) error
}
// LogLevel specifies the severity of a given log message
type LogLevel int
// Log levels
const (
LogLevelDebug LogLevel = iota
LogLevelInfo
LogLevelWarning
LogLevelError
)
// String returns the string form for a given LogLevel
func (lvl LogLevel) String() string {
switch lvl {
case LogLevelInfo:
return "INF"
case LogLevelWarning:
return "WRN"
case LogLevelError:
return "ERR"
}
return "DBG"
}
// MessageDelegate is an interface of methods that are used as
// callbacks in Message
type MessageDelegate interface {
// OnFinish is called when the Finish() method
// is triggered on the Message
OnFinish(*Message)
// OnRequeue is called when the Requeue() method
// is triggered on the Message
OnRequeue(m *Message, delay time.Duration, backoff bool)
// OnTouch is called when the Touch() method
// is triggered on the Message
OnTouch(*Message)
}
type connMessageDelegate struct {
c *Conn
}
func (d *connMessageDelegate) OnFinish(m *Message) { d.c.onMessageFinish(m) }
func (d *connMessageDelegate) OnRequeue(m *Message, t time.Duration, b bool) {
d.c.onMessageRequeue(m, t, b)
}
func (d *connMessageDelegate) OnTouch(m *Message) { d.c.onMessageTouch(m) }
// ConnDelegate is an interface of methods that are used as
// callbacks in Conn
type ConnDelegate interface {
// OnResponse is called when the connection
// receives a FrameTypeResponse from nsqd
OnResponse(*Conn, []byte)
// OnError is called when the connection
// receives a FrameTypeError from nsqd
OnError(*Conn, []byte)
// OnMessage is called when the connection
// receives a FrameTypeMessage from nsqd
OnMessage(*Conn, *Message)
// OnMessageFinished is called when the connection
// handles a FIN command from a message handler
OnMessageFinished(*Conn, *Message)
// OnMessageRequeued is called when the connection
// handles a REQ command from a message handler
OnMessageRequeued(*Conn, *Message)
// OnBackoff is called when the connection triggers a backoff state
OnBackoff(*Conn)
// OnContinue is called when the connection finishes a message without adjusting backoff state
OnContinue(*Conn)
// OnResume is called when the connection triggers a resume state
OnResume(*Conn)
// OnIOError is called when the connection experiences
// a low-level TCP transport error
OnIOError(*Conn, error)
// OnHeartbeat is called when the connection
// receives a heartbeat from nsqd
OnHeartbeat(*Conn)
// OnClose is called when the connection
// closes, after all cleanup
OnClose(*Conn)
}
// keeps the exported Consumer struct clean of the exported methods
// required to implement the ConnDelegate interface
type consumerConnDelegate struct {
r *Consumer
}
func (d *consumerConnDelegate) OnResponse(c *Conn, data []byte) { d.r.onConnResponse(c, data) }
func (d *consumerConnDelegate) OnError(c *Conn, data []byte) { d.r.onConnError(c, data) }
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onConnMessage(c, m) }
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
func (d *consumerConnDelegate) OnClose(c *Conn) { d.r.onConnClose(c) }
// keeps the exported Producer struct clean of the exported methods
// required to implement the ConnDelegate interface
type producerConnDelegate struct {
w *Producer
}
func (d *producerConnDelegate) OnResponse(c *Conn, data []byte) { d.w.onConnResponse(c, data) }
func (d *producerConnDelegate) OnError(c *Conn, data []byte) { d.w.onConnError(c, data) }
func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnBackoff(c *Conn) {}
func (d *producerConnDelegate) OnContinue(c *Conn) {}
func (d *producerConnDelegate) OnResume(c *Conn) {}
func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) }
func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) }
func (d *producerConnDelegate) OnClose(c *Conn) { d.w.onConnClose(c) }

@ -0,0 +1,44 @@
package nsq
import (
"errors"
"fmt"
)
// ErrNotConnected is returned when a publish command is made
// against a Producer that is not connected
var ErrNotConnected = errors.New("not connected")
// ErrStopped is returned when a publish command is
// made against a Producer that has been stopped
var ErrStopped = errors.New("stopped")
// ErrClosing is returned when a connection is closing
var ErrClosing = errors.New("closing")
// ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrAlreadyConnected = errors.New("already connected")
// ErrOverMaxInFlight is returned from Consumer if over max-in-flight
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
// ErrIdentify is returned from Conn as part of the IDENTIFY handshake
type ErrIdentify struct {
Reason string
}
// Error returns a stringified error
func (e ErrIdentify) Error() string {
return fmt.Sprintf("failed to IDENTIFY - %s", e.Reason)
}
// ErrProtocol is returned from Producer when encountering
// an NSQ protocol level error
type ErrProtocol struct {
Reason string
}
// Error returns a stringified error
func (e ErrProtocol) Error() string {
return e.Reason
}

@ -0,0 +1,164 @@
package nsq
import (
"encoding/binary"
"errors"
"io"
"sync/atomic"
"time"
)
// The number of bytes for a Message.ID
const MsgIDLength = 16
// MessageID is the ASCII encoded hexadecimal message ID
type MessageID [MsgIDLength]byte
// Message is the fundamental data type containing
// the id, body, and metadata
type Message struct {
ID MessageID
Body []byte
Timestamp int64
Attempts uint16
NSQDAddress string
Delegate MessageDelegate
autoResponseDisabled int32
responded int32
}
// NewMessage creates a Message, initializes some metadata,
// and returns a pointer
func NewMessage(id MessageID, body []byte) *Message {
return &Message{
ID: id,
Body: body,
Timestamp: time.Now().UnixNano(),
}
}
// DisableAutoResponse disables the automatic response that
// would normally be sent when a handler.HandleMessage
// returns (FIN/REQ based on the error value returned).
//
// This is useful if you want to batch, buffer, or asynchronously
// respond to messages.
func (m *Message) DisableAutoResponse() {
atomic.StoreInt32(&m.autoResponseDisabled, 1)
}
// IsAutoResponseDisabled indicates whether or not this message
// will be responded to automatically
func (m *Message) IsAutoResponseDisabled() bool {
return atomic.LoadInt32(&m.autoResponseDisabled) == 1
}
// HasResponded indicates whether or not this message has been responded to
func (m *Message) HasResponded() bool {
return atomic.LoadInt32(&m.responded) == 1
}
// Finish sends a FIN command to the nsqd which
// sent this message
func (m *Message) Finish() {
if !atomic.CompareAndSwapInt32(&m.responded, 0, 1) {
return
}
m.Delegate.OnFinish(m)
}
// Touch sends a TOUCH command to the nsqd which
// sent this message
func (m *Message) Touch() {
if m.HasResponded() {
return
}
m.Delegate.OnTouch(m)
}
// Requeue sends a REQ command to the nsqd which
// sent this message, using the supplied delay.
//
// A delay of -1 will automatically calculate
// based on the number of attempts and the
// configured default_requeue_delay
func (m *Message) Requeue(delay time.Duration) {
m.doRequeue(delay, true)
}
// RequeueWithoutBackoff sends a REQ command to the nsqd which
// sent this message, using the supplied delay.
//
// Notably, using this method to respond does not trigger a backoff
// event on the configured Delegate.
func (m *Message) RequeueWithoutBackoff(delay time.Duration) {
m.doRequeue(delay, false)
}
func (m *Message) doRequeue(delay time.Duration, backoff bool) {
if !atomic.CompareAndSwapInt32(&m.responded, 0, 1) {
return
}
m.Delegate.OnRequeue(m, delay, backoff)
}
// WriteTo implements the WriterTo interface and serializes
// the message into the supplied producer.
//
// It is suggested that the target Writer is buffered to
// avoid performing many system calls.
func (m *Message) WriteTo(w io.Writer) (int64, error) {
var buf [10]byte
var total int64
binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
n, err := w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.ID[:])
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.Body)
total += int64(n)
if err != nil {
return total, err
}
return total, nil
}
// DecodeMessage deserializes data (as []byte) and creates a new Message
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// | (int64) || || (hex string encoded in ASCII) || (binary)
// | 8-byte || || 16-byte || N-byte
// ------------------------------------------------------------------------------------------...
// nanosecond timestamp ^^ message ID message body
// (uint16)
// 2-byte
// attempts
func DecodeMessage(b []byte) (*Message, error) {
var msg Message
if len(b) < 10+MsgIDLength {
return nil, errors.New("not enough data to decode valid message")
}
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]
return &msg, nil
}

@ -0,0 +1,393 @@
package nsq
import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
)
type producerConn interface {
String() string
SetLogger(logger, LogLevel, string)
Connect() (*IdentifyResponse, error)
Close() error
WriteCommand(*Command) error
}
// Producer is a high-level type to publish to NSQ.
//
// A Producer instance is 1:1 with a destination `nsqd`
// and will lazily connect to that instance (and re-connect)
// when Publish commands are executed.
type Producer struct {
id int64
addr string
conn producerConn
config Config
logger logger
logLvl LogLevel
logGuard sync.RWMutex
responseChan chan []byte
errorChan chan []byte
closeChan chan int
transactionChan chan *ProducerTransaction
transactions []*ProducerTransaction
state int32
concurrentProducers int32
stopFlag int32
exitChan chan int
wg sync.WaitGroup
guard sync.Mutex
}
// ProducerTransaction is returned by the async publish methods
// to retrieve metadata about the command after the
// response is received.
type ProducerTransaction struct {
cmd *Command
doneChan chan *ProducerTransaction
Error error // the error (or nil) of the publish command
Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
}
func (t *ProducerTransaction) finish() {
if t.doneChan != nil {
t.doneChan <- t
}
}
// NewProducer returns an instance of Producer for the specified address
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {
config.assertInitialized()
err := config.Validate()
if err != nil {
return nil, err
}
p := &Producer{
id: atomic.AddInt64(&instCount, 1),
addr: addr,
config: *config,
logger: log.New(os.Stderr, "", log.Flags()),
logLvl: LogLevelInfo,
transactionChan: make(chan *ProducerTransaction),
exitChan: make(chan int),
responseChan: make(chan []byte),
errorChan: make(chan []byte),
}
return p, nil
}
// Ping causes the Producer to connect to it's configured nsqd (if not already
// connected) and send a `Nop` command, returning any error that might occur.
//
// This method can be used to verify that a newly-created Producer instance is
// configured correctly, rather than relying on the lazy "connect on Publish"
// behavior of a Producer.
func (w *Producer) Ping() error {
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
return w.conn.WriteCommand(Nop())
}
// SetLogger assigns the logger to use as well as a level
//
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
func (w *Producer) SetLogger(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()
w.logger = l
w.logLvl = lvl
}
func (w *Producer) getLogger() (logger, LogLevel) {
w.logGuard.RLock()
defer w.logGuard.RUnlock()
return w.logger, w.logLvl
}
// String returns the address of the Producer
func (w *Producer) String() string {
return w.addr
}
// Stop initiates a graceful stop of the Producer (permanent)
//
// NOTE: this blocks until completion
func (w *Producer) Stop() {
w.guard.Lock()
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
w.guard.Unlock()
return
}
w.log(LogLevelInfo, "stopping")
close(w.exitChan)
w.close()
w.guard.Unlock()
w.wg.Wait()
}
// PublishAsync publishes a message body to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction,
args ...interface{}) error {
return w.sendCommandAsync(Publish(topic, body), doneChan, args)
}
// MultiPublishAsync publishes a slice of message bodies to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction,
args ...interface{}) error {
cmd, err := MultiPublish(topic, body)
if err != nil {
return err
}
return w.sendCommandAsync(cmd, doneChan, args)
}
// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
return w.sendCommand(Publish(topic, body))
}
// MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning
// an error if publish failed
func (w *Producer) MultiPublish(topic string, body [][]byte) error {
cmd, err := MultiPublish(topic, body)
if err != nil {
return err
}
return w.sendCommand(cmd)
}
// DeferredPublish synchronously publishes a message body to the specified topic
// where the message will queue at the channel level until the timeout expires, returning
// an error if publish failed
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
return w.sendCommand(DeferredPublish(topic, delay, body))
}
// DeferredPublishAsync publishes a message body to the specified topic
// where the message will queue at the channel level until the timeout expires
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte,
doneChan chan *ProducerTransaction, args ...interface{}) error {
return w.sendCommandAsync(DeferredPublish(topic, delay, body), doneChan, args)
}
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
func (w *Producer) connect() error {
w.guard.Lock()
defer w.guard.Unlock()
if atomic.LoadInt32(&w.stopFlag) == 1 {
return ErrStopped
}
switch state := atomic.LoadInt32(&w.state); state {
case StateInit:
case StateConnected:
return nil
default:
return ErrNotConnected
}
w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)
logger, logLvl := w.getLogger()
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
_, err := w.conn.Connect()
if err != nil {
w.conn.Close()
w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
return err
}
atomic.StoreInt32(&w.state, StateConnected)
w.closeChan = make(chan int)
w.wg.Add(1)
go w.router()
return nil
}
func (w *Producer) close() {
if !atomic.CompareAndSwapInt32(&w.state, StateConnected, StateDisconnected) {
return
}
w.conn.Close()
go func() {
// we need to handle this in a goroutine so we don't
// block the caller from making progress
w.wg.Wait()
atomic.StoreInt32(&w.state, StateInit)
}()
}
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
}
func (w *Producer) popTransaction(frameType int32, data []byte) {
t := w.transactions[0]
w.transactions = w.transactions[1:]
if frameType == FrameTypeError {
t.Error = ErrProtocol{string(data)}
}
t.finish()
}
func (w *Producer) transactionCleanup() {
// clean up transactions we can easily account for
for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
}
w.transactions = w.transactions[:0]
// spin and free up any writes that might have raced
// with the cleanup process (blocked on writing
// to transactionChan)
for {
select {
case t := <-w.transactionChan:
t.Error = ErrNotConnected
t.finish()
default:
// keep spinning until there are 0 concurrent producers
if atomic.LoadInt32(&w.concurrentProducers) == 0 {
return
}
// give the runtime a chance to schedule other racing goroutines
time.Sleep(5 * time.Millisecond)
}
}
}
func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := w.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("%-4s %3d %s", lvl, w.id, fmt.Sprintf(line, args...)))
}
func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }
func (w *Producer) onConnError(c *Conn, data []byte) { w.errorChan <- data }
func (w *Producer) onConnHeartbeat(c *Conn) {}
func (w *Producer) onConnIOError(c *Conn, err error) { w.close() }
func (w *Producer) onConnClose(c *Conn) {
w.guard.Lock()
defer w.guard.Unlock()
close(w.closeChan)
}

@ -0,0 +1,96 @@
package nsq
import (
"encoding/binary"
"errors"
"io"
"regexp"
)
// MagicV1 is the initial identifier sent when connecting for V1 clients
var MagicV1 = []byte(" V1")
// MagicV2 is the initial identifier sent when connecting for V2 clients
var MagicV2 = []byte(" V2")
// frame types
const (
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
)
var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
// IsValidTopicName checks a topic name for correctness
func IsValidTopicName(name string) bool {
return isValidName(name)
}
// IsValidChannelName checks a channel name for correctness
func IsValidChannelName(name string) bool {
return isValidName(name)
}
func isValidName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validTopicChannelNameRegex.MatchString(name)
}
// ReadResponse is a client-side utility function to read from the supplied Reader
// according to the NSQ protocol spec:
//
// [x][x][x][x][x][x][x][x]...
// | (int32) || (binary)
// | 4-byte || N-byte
// ------------------------...
// size data
func ReadResponse(r io.Reader) ([]byte, error) {
var msgSize int32
// message size
err := binary.Read(r, binary.BigEndian, &msgSize)
if err != nil {
return nil, err
}
// message binary data
buf := make([]byte, msgSize)
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}
return buf, nil
}
// UnpackResponse is a client-side utility function that unpacks serialized data
// according to NSQ protocol spec:
//
// [x][x][x][x][x][x][x][x]...
// | (int32) || (binary)
// | 4-byte || N-byte
// ------------------------...
// frame ID data
//
// Returns a triplicate of: frame type, data ([]byte), error
func UnpackResponse(response []byte) (int32, []byte, error) {
if len(response) < 4 {
return -1, nil, errors.New("length of response is too small")
}
return int32(binary.BigEndian.Uint32(response)), response[4:], nil
}
// ReadUnpackedResponse reads and parses data from the underlying
// TCP connection according to the NSQ TCP protocol spec and
// returns the frameType, data or error
func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) {
resp, err := ReadResponse(r)
if err != nil {
return -1, nil, err
}
return UnpackResponse(resp)
}

@ -0,0 +1,8 @@
package nsq
// states
const (
StateInit = iota
StateDisconnected
StateConnected
)

@ -0,0 +1,39 @@
#!/bin/bash
set -e
# a helper script to run tests
if ! which nsqd >/dev/null; then
echo "missing nsqd binary" && exit 1
fi
if ! which nsqlookupd >/dev/null; then
echo "missing nsqlookupd binary" && exit 1
fi
# run nsqlookupd
LOOKUP_LOGFILE=$(mktemp -t nsqlookupd.XXXXXXX)
echo "starting nsqlookupd"
echo " logging to $LOOKUP_LOGFILE"
nsqlookupd >$LOOKUP_LOGFILE 2>&1 &
LOOKUPD_PID=$!
# run nsqd configured to use our lookupd above
rm -f *.dat
NSQD_LOGFILE=$(mktemp -t nsqlookupd.XXXXXXX)
echo "starting nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/server.pem --tls-key=./test/server.key --tls-root-ca-file=./test/ca.pem"
echo " logging to $NSQD_LOGFILE"
nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/server.pem --tls-key=./test/server.key --tls-root-ca-file=./test/ca.pem >$NSQD_LOGFILE 2>&1 &
NSQD_PID=$!
sleep 0.3
cleanup() {
echo "killing nsqd PID $NSQD_PID"
kill -s TERM $NSQD_PID || cat $NSQD_LOGFILE
echo "killing nsqlookupd PID $LOOKUPD_PID"
kill -s TERM $LOOKUPD_PID || cat $LOOKUP_LOGFILE
}
trap cleanup INT TERM EXIT
go test -v -timeout 60s

@ -0,0 +1,8 @@
// Package nsq is the official Go package for NSQ (http://nsq.io/)
//
// It provides high-level Consumer and Producer types as well as low-level
// functions to communicate over the NSQ protocol
package nsq
// VERSION
const VERSION = "1.0.7"

@ -782,6 +782,12 @@
"revision": "289cccf02c178dc782430d534e3c1f5b72af807f",
"revisionTime": "2016-09-27T04:49:45Z"
},
{
"checksumSHA1": "MZdppx6laedD1LcVomiZ/Sfa6rE=",
"path": "github.com/nsqio/go-nsq",
"revision": "0527e80f3ba5ecff59ad8d07db607677b5dc056a",
"revisionTime": "2018-10-28T19:52:56Z"
},
{
"checksumSHA1": "xCv4GBFyw07vZkVtKF/XrUnkHRk=",
"path": "github.com/pkg/errors",

Loading…
Cancel
Save