From 1ffa983a9d8efac1194fbba2d8365e40b9db41aa Mon Sep 17 00:00:00 2001 From: Stephen N Date: Fri, 20 Mar 2020 18:10:27 +0000 Subject: [PATCH] added support for SASL/SCRAM on Kafka bucket notifications. (#9168) fixes #9167 --- cmd/config/notify/help.go | 6 +++ cmd/config/notify/parse.go | 9 ++++ docs/bucket/notifications/README.md | 32 ++++++------ go.mod | 1 + go.sum | 2 + pkg/event/target/kafka.go | 19 +++++-- pkg/event/target/kafka_scram_client.go | 70 ++++++++++++++++++++++++++ 7 files changed, 121 insertions(+), 18 deletions(-) create mode 100644 pkg/event/target/kafka_scram_client.go diff --git a/cmd/config/notify/help.go b/cmd/config/notify/help.go index 655bca24d..aea53162e 100644 --- a/cmd/config/notify/help.go +++ b/cmd/config/notify/help.go @@ -165,6 +165,12 @@ var ( Optional: true, Type: "string", }, + config.HelpKV{ + Key: target.KafkaSASLMechanism, + Description: "sasl authentication mechanism, default 'plain'", + Optional: true, + Type: "string", + }, config.HelpKV{ Key: target.KafkaTLSClientAuth, Description: "clientAuth determines the Kafka server's policy for TLS client auth", diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index 4e9ec96b5..ae456c5fa 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -352,6 +352,10 @@ var ( Key: target.KafkaSASLPassword, Value: "", }, + config.KV{ + Key: target.KafkaSASLMechanism, + Value: "plain", + }, config.KV{ Key: target.KafkaClientTLSCert, Value: "", @@ -507,9 +511,14 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs if k != config.Default { saslPasswordEnv = saslPasswordEnv + config.Default + k } + saslMechanismEnv := target.EnvKafkaSASLMechanism + if k != config.Default { + saslMechanismEnv = saslMechanismEnv + config.Default + k + } kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername)) kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword)) + kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism)) if err = kafkaArgs.Validate(); err != nil { return nil, err diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 2a31998a8..154328b6e 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -1063,6 +1063,7 @@ brokers* (csv) comma separated list of Kafka broker addresses topic (string) Kafka topic used for bucket notifications sasl_username (string) username for SASL/PLAIN or SASL/SCRAM authentication sasl_password (string) password for SASL/PLAIN or SASL/SCRAM authentication +sasl_mechanism (string) sasl authentication mechanism, default 'PLAIN' tls_client_auth (string) clientAuth determines the Kafka server's policy for TLS client auth sasl (on|off) set to 'on' to enable SASL authentication tls (on|off) set to 'on' to enable TLS @@ -1081,21 +1082,22 @@ KEY: notify_kafka[:name] publish bucket notifications to Kafka endpoints ARGS: -MINIO_NOTIFY_KAFKA_ENABLE* (on|off) enable notify_kafka target, default is 'off' -MINIO_NOTIFY_KAFKA_BROKERS* (csv) comma separated list of Kafka broker addresses -MINIO_NOTIFY_KAFKA_TOPIC (string) Kafka topic used for bucket notifications -MINIO_NOTIFY_KAFKA_SASL_USERNAME (string) username for SASL/PLAIN or SASL/SCRAM authentication -MINIO_NOTIFY_KAFKA_SASL_PASSWORD (string) password for SASL/PLAIN or SASL/SCRAM authentication -MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH (string) clientAuth determines the Kafka server's policy for TLS client auth -MINIO_NOTIFY_KAFKA_SASL (on|off) set to 'on' to enable SASL authentication -MINIO_NOTIFY_KAFKA_TLS (on|off) set to 'on' to enable TLS -MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY (on|off) trust server TLS without verification, defaults to "on" (verify) -MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT (path) path to client certificate for mTLS auth -MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth -MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events' -MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '10000' -MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting -MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' +MINIO_NOTIFY_KAFKA_ENABLE* (on|off) enable notify_kafka target, default is 'off' +MINIO_NOTIFY_KAFKA_BROKERS* (csv) comma separated list of Kafka broker addresses +MINIO_NOTIFY_KAFKA_TOPIC (string) Kafka topic used for bucket notifications +MINIO_NOTIFY_KAFKA_SASL_USERNAME (string) username for SASL/PLAIN or SASL/SCRAM authentication +MINIO_NOTIFY_KAFKA_SASL_PASSWORD (string) password for SASL/PLAIN or SASL/SCRAM authentication +MINIO_NOTIFY_KAFKA_SASL_MECHANISM (plain*|sha256|sha512) sasl authentication mechanism, default 'plain' +MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH (string) clientAuth determines the Kafka server's policy for TLS client auth +MINIO_NOTIFY_KAFKA_SASL (on|off) set to 'on' to enable SASL authentication +MINIO_NOTIFY_KAFKA_TLS (on|off) set to 'on' to enable TLS +MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY (on|off) trust server TLS without verification, defaults to "on" (verify) +MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT (path) path to client certificate for mTLS auth +MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth +MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events' +MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '10000' +MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting +MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' ``` To update the configuration, use `mc admin config get` command to get the current configuration. diff --git a/go.mod b/go.mod index b92b6bc50..83ebdc405 100644 --- a/go.mod +++ b/go.mod @@ -106,6 +106,7 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/ugorji/go v1.1.5-pre // indirect github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.3 // indirect go.uber.org/atomic v1.3.2 diff --git a/go.sum b/go.sum index 1b6407e8f..aa476de93 100644 --- a/go.sum +++ b/go.sum @@ -429,7 +429,9 @@ github.com/ugorji/go/codec v1.1.5-pre h1:5YV9PsFAN+ndcCtTM7s60no7nY7eTG3LPtxhSwu github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index e6fb55f38..bf09a1a90 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -46,6 +46,7 @@ const ( KafkaSASL = "sasl" KafkaSASLUsername = "sasl_username" KafkaSASLPassword = "sasl_password" + KafkaSASLMechanism = "sasl_mechanism" KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSKey = "client_tls_key" KafkaVersion = "version" @@ -61,6 +62,7 @@ const ( EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL" EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME" EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD" + EnvKafkaSASLMechanism = "MINIO_NOTIFY_KAFKA_SASL_MECHANISM" EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT" EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" @@ -83,9 +85,10 @@ type KafkaArgs struct { ClientTLSKey string `json:"clientTLSKey"` } `json:"tls"` SASL struct { - Enable bool `json:"enable"` - User string `json:"username"` - Password string `json:"password"` + Enable bool `json:"enable"` + User string `json:"username"` + Password string `json:"password"` + Mechanism string `json:"mechanism"` } `json:"sasl"` } @@ -255,6 +258,16 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc config.Net.SASL.User = args.SASL.User config.Net.SASL.Password = args.SASL.Password + if args.SASL.Mechanism == "sha512" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA512} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + } else if args.SASL.Mechanism == "sha256" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA256} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + } else { + // default to PLAIN + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + } config.Net.SASL.Enable = args.SASL.Enable tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey) diff --git a/pkg/event/target/kafka_scram_client.go b/pkg/event/target/kafka_scram_client.go new file mode 100644 index 000000000..efecec893 --- /dev/null +++ b/pkg/event/target/kafka_scram_client.go @@ -0,0 +1,70 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package target + +import ( + "crypto/sha512" + "hash" + + "github.com/minio/sha256-simd" + "github.com/xdg/scram" +) + +// KafkaSHA256 is a function that returns a crypto/sha256 hasher and should be used +// to create Client objects configured for SHA-256 hashing. +var KafkaSHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } + +// KafkaSHA512 is a function that returns a crypto/sha512 hasher and should be used +// to create Client objects configured for SHA-512 hashing. +var KafkaSHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +// XDGSCRAMClient implements the client-side of an authentication +// conversation with a server. A new conversation must be created for +// each authentication attempt. +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin constructs a SCRAM client component based on a given hash.Hash +// factory receiver. This constructor will normalize the username, password +// and authzID via the SASLprep algorithm, as recommended by RFC-5802. If +// SASLprep fails, the method returns an error. +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step takes a string provided from a server (or just an empty string for the +// very first conversation step) and attempts to move the authentication +// conversation forward. It returns a string to be sent to the server or an +// error if the server message is invalid. Calling Step after a conversation +// completes is also an error. +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done returns true if the conversation is completed or has errored. +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +}