Removed `clientID` from NATS-Streaming Config (#6391)

clientID must be a unique `UUID` for each connections. Now, the
server generates it, rather considering the config.

Removing it as it is non-beneficial right now.

Fixes #6364
master
Praveen raj Mani 6 years ago committed by Nitish Tiwari
parent e7971b1d55
commit e7af31c2ff
  1. 3
      cmd/admin-handlers_test.go
  2. 4
      cmd/config-current.go
  3. 2
      cmd/config-current_test.go
  4. 40
      cmd/config-migrate.go
  5. 4
      cmd/config-migrate_test.go
  6. 44
      cmd/config-versions.go
  7. 3
      docs/config/config.sample.json
  8. 14
      pkg/event/target/nats.go
  9. 1
      pkg/madmin/examples/set-config.go

@ -38,7 +38,7 @@ import (
var (
configJSON = []byte(`{
"version": "32",
"version": "33",
"credential": {
"accessKey": "minio",
"secretKey": "minio123"
@ -152,7 +152,6 @@ var (
"streaming": {
"enable": false,
"clusterID": "",
"clientID": "",
"async": false,
"maxPubAcksInflight": 0
}

@ -43,9 +43,9 @@ import (
// 6. Make changes in config-current_test.go for any test change
// Config version
const serverConfigVersion = "32"
const serverConfigVersion = "33"
type serverConfig = serverConfigV32
type serverConfig = serverConfigV33
var (
// globalServerConfig server config.

@ -188,7 +188,7 @@ func TestValidateConfig(t *testing.T) {
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false},
// Test 12 - Test NATS
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "streaming": { "enable": false, "clusterID": "", "clientID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false},
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false},
// Test 13 - Test ElasticSearch
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "url": "", "index": "" } }}}`, false},

@ -2455,7 +2455,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
return saveServerConfig(context.Background(), objAPI, config)
}
// Migrates '.minio.sys/config.json' to v32.
// Migrates '.minio.sys/config.json' to v33.
func migrateMinioSysConfig(objAPI ObjectLayer) error {
configFile := path.Join(minioConfigPrefix, minioConfigFile)
@ -2483,7 +2483,10 @@ func migrateMinioSysConfig(objAPI ObjectLayer) error {
if err := migrateV30ToV31MinioSys(objAPI); err != nil {
return err
}
return migrateV31ToV32MinioSys(objAPI)
if err := migrateV31ToV32MinioSys(objAPI); err != nil {
return err
}
return migrateV32ToV33MinioSys(objAPI)
}
func checkConfigVersion(objAPI ObjectLayer, configFile string, version string) (bool, []byte, error) {
@ -2679,3 +2682,36 @@ func migrateV31ToV32MinioSys(objAPI ObjectLayer) error {
logger.Info(configMigrateMSGTemplate, configFile, "31", "32")
return nil
}
func migrateV32ToV33MinioSys(objAPI ObjectLayer) error {
configFile := path.Join(minioConfigPrefix, minioConfigFile)
ok, data, err := checkConfigVersion(objAPI, configFile, "32")
if err == errConfigNotFound {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config file. %v", err)
}
if !ok {
return nil
}
cfg := &serverConfigV33{}
if err = json.Unmarshal(data, cfg); err != nil {
return err
}
cfg.Version = "33"
data, err = json.Marshal(cfg)
if err != nil {
return err
}
if err = saveConfig(context.Background(), objAPI, configFile, data); err != nil {
return fmt.Errorf("Failed to migrate config from 32 to 33 . %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "32", "33")
return nil
}

@ -159,8 +159,8 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
}
}
// Test if a config migration from v2 to v30 is successfully done
func TestServerConfigMigrateV2toV30(t *testing.T) {
// Test if a config migration from v2 to v33 is successfully done
func TestServerConfigMigrateV2toV33(t *testing.T) {
rootPath, err := ioutil.TempDir(globalTestTmpDir, "minio-")
if err != nil {
t.Fatal(err)

@ -892,3 +892,47 @@ type serverConfigV32 struct {
// Add new external policy enforcements here.
} `json:"policy"`
}
// serverConfigV33 is just like version '32', removes clientID from NATS.
type serverConfigV33 struct {
quick.Config `json:"-"` // ignore interfaces
Version string `json:"version"`
// S3 API configuration.
Credential auth.Credentials `json:"credential"`
Region string `json:"region"`
Worm BoolFlag `json:"worm"`
// Storage class configuration
StorageClass storageClassConfig `json:"storageclass"`
// Cache configuration
Cache CacheConfig `json:"cache"`
// KMS configuration
KMS crypto.KMSConfig `json:"kms"`
// Notification queue configuration.
Notify notifier `json:"notify"`
// Logger configuration
Logger loggerConfig `json:"logger"`
// Compression configuration
Compression compressionConfig `json:"compress"`
// OpenID configuration
OpenID struct {
// JWKS validator config.
JWKS validator.JWKSArgs `json:"jwks"`
} `json:"openid"`
// External policy enforcements.
Policy struct {
// OPA configuration.
OPA iampolicy.OpaArgs `json:"opa"`
// Add new external policy enforcements here.
} `json:"policy"`
}

@ -1,5 +1,5 @@
{
"version": "31",
"version": "33",
"credential": {
"accessKey": "36J9X8EZI4KEV1G7EHXA",
"secretKey": "ECk2uqOoNqvtJIMQ3WYugvmNPL_-zm3WcRqP5vUM",
@ -115,7 +115,6 @@
"streaming": {
"enable": false,
"clusterID": "",
"clientID": "",
"async": false,
"maxPubAcksInflight": 0
}

@ -40,7 +40,6 @@ type NATSArgs struct {
Streaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
ClientID string `json:"clientID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
} `json:"streaming"`
@ -64,9 +63,6 @@ func (n NATSArgs) Validate() error {
if n.Streaming.ClusterID == "" {
return errors.New("empty cluster id")
}
if n.Streaming.ClientID == "" {
return errors.New("empty client id")
}
}
return nil
@ -128,6 +124,7 @@ func (target *NATSTarget) Close() (err error) {
func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn
var clientID string
var err error
if args.Streaming.Enable {
@ -137,12 +134,9 @@ func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) {
}
addressURL := scheme + "://" + args.Username + ":" + args.Password + "@" + args.Address.String()
clientID := args.Streaming.ClientID
if clientID == "" {
clientID, err = getNewUUID()
if err != nil {
return nil, err
}
clientID, err = getNewUUID()
if err != nil {
return nil, err
}
connOpts := []stan.Option{stan.NatsURL(addressURL)}

@ -75,7 +75,6 @@ var configJSON = []byte(`{
"streaming": {
"enable": false,
"clusterID": "",
"clientID": "",
"async": false,
"maxPubAcksInflight": 0
}

Loading…
Cancel
Save