From eaaf05a7cc358893650614770365da540615570f Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Thu, 10 Sep 2020 00:50:49 +0530 Subject: [PATCH] Add Kubernetes operator webook server as DNS target (#10404) This PR adds a DNS target that ensures to update an entry into Kubernetes operator when a bucket is created or deleted. See minio/operator#264 for details. Co-authored-by: Harshavardhana --- cmd/bucket-handlers.go | 58 ++++---- cmd/common-main.go | 9 ++ cmd/config/constants.go | 1 + cmd/config/etcd/dns/operator_dns.go | 200 ++++++++++++++++++++++++++++ cmd/object-handlers.go | 4 +- cmd/server-main.go | 2 + cmd/web-handlers.go | 4 +- pkg/env/env.go | 4 +- pkg/env/web_env.go | 34 ++--- pkg/env/web_env_test.go | 10 +- 10 files changed, 277 insertions(+), 49 deletions(-) create mode 100644 cmd/config/etcd/dns/operator_dns.go diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 2234a44e1..14f58b00d 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -72,7 +72,7 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { // Get buckets in the DNS dnsBuckets, err := globalDNSConfig.List() - if err != nil && err != dns.ErrNoEntriesFound { + if err != nil && err != dns.ErrNoEntriesFound && err != dns.ErrNotImplemented { logger.LogIf(GlobalContext, err) return } @@ -80,33 +80,35 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { bucketsSet := set.NewStringSet() bucketsToBeUpdated := set.NewStringSet() bucketsInConflict := set.NewStringSet() - for _, bucket := range buckets { - bucketsSet.Add(bucket.Name) - r, ok := dnsBuckets[bucket.Name] - if !ok { - bucketsToBeUpdated.Add(bucket.Name) - continue - } - if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { - if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { - // No difference in terms of domainIPs and nothing - // has changed so we don't change anything on the etcd. + if dnsBuckets != nil { + for _, bucket := range buckets { + bucketsSet.Add(bucket.Name) + r, ok := dnsBuckets[bucket.Name] + if !ok { + bucketsToBeUpdated.Add(bucket.Name) continue } - // if domain IPs intersect then it won't be an empty set. - // such an intersection means that bucket exists on etcd. - // but if we do see a difference with local domain IPs with - // hostSlice from etcd then we should update with newer - // domainIPs, we proceed to do that here. - bucketsToBeUpdated.Add(bucket.Name) - continue + if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { + if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { + // No difference in terms of domainIPs and nothing + // has changed so we don't change anything on the etcd. + continue + } + // if domain IPs intersect then it won't be an empty set. + // such an intersection means that bucket exists on etcd. + // but if we do see a difference with local domain IPs with + // hostSlice from etcd then we should update with newer + // domainIPs, we proceed to do that here. + bucketsToBeUpdated.Add(bucket.Name) + continue + } + // No IPs seem to intersect, this means that bucket exists but has + // different IP addresses perhaps from a different deployment. + // bucket names are globally unique in federation at a given + // path prefix, name collision is not allowed. We simply log + // an error and continue. + bucketsInConflict.Add(bucket.Name) } - // No IPs seem to intersect, this means that bucket exists but has - // different IP addresses perhaps from a different deployment. - // bucket names are globally unique in federation at a given - // path prefix, name collision is not allowed. We simply log - // an error and continue. - bucketsInConflict.Add(bucket.Name) } // Add/update buckets that are not registered with the DNS @@ -562,7 +564,9 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req if globalDNSConfig != nil { sr, err := globalDNSConfig.Get(bucket) if err != nil { - if err == dns.ErrNoEntriesFound { + // ErrNotImplemented indicates a DNS backend that doesn't need to check if bucket already + // exists elsewhere + if err == dns.ErrNoEntriesFound || err == dns.ErrNotImplemented { // Proceed to creating a bucket. if err = objectAPI.MakeBucketWithLocation(ctx, bucket, opts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) @@ -1000,7 +1004,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. if globalDNSConfig != nil { if err := globalDNSConfig.Delete(bucket); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to delete bucket DNS entry %w, please delete it manually using etcdctl", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to delete bucket DNS entry %w, please delete it manually", err)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/common-main.go b/cmd/common-main.go index d9bac566f..5639c5df4 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -33,6 +33,7 @@ import ( "github.com/minio/cli" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/config/etcd/dns" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/certs" @@ -285,6 +286,14 @@ func handleCommonEnvVars() { os.Unsetenv(config.EnvAccessKeyOld) os.Unsetenv(config.EnvSecretKeyOld) } + + url, user, pwd, ok := env.LookupEnv(config.EnvDNSWebhook) + if ok { + globalDNSConfig, err = dns.NewOperatorDNS(url, user, pwd) + if err != nil { + logger.Fatal(err, "Unable to fetch the value of "+config.EnvDNSWebhook) + } + } } func logStartupMessage(msg string) { diff --git a/cmd/config/constants.go b/cmd/config/constants.go index d25952c1e..856e59a6c 100644 --- a/cmd/config/constants.go +++ b/cmd/config/constants.go @@ -33,6 +33,7 @@ const ( EnvPublicIPs = "MINIO_PUBLIC_IPS" EnvFSOSync = "MINIO_FS_OSYNC" EnvArgs = "MINIO_ARGS" + EnvDNSWebhook = "MINIO_DNS_WEBHOOK_ENDPOINT" EnvUpdate = "MINIO_UPDATE" diff --git a/cmd/config/etcd/dns/operator_dns.go b/cmd/config/etcd/dns/operator_dns.go new file mode 100644 index 000000000..0583c8f13 --- /dev/null +++ b/cmd/config/etcd/dns/operator_dns.go @@ -0,0 +1,200 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dns + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "github.com/dgrijalva/jwt-go" + "github.com/minio/minio/cmd/config" +) + +var ( + defaultOperatorContextTimeout = 10 * time.Second + // ErrNotImplemented - Indicates no entries were found for the given key (directory) + ErrNotImplemented = errors.New("The method is not implemented") + globalRootCAs *x509.CertPool +) + +// RegisterGlobalCAs register the global root CAs +func RegisterGlobalCAs(CAs *x509.CertPool) { + globalRootCAs = CAs +} + +func (c *OperatorDNS) addAuthHeader(r *http.Request) (*http.Request, error) { + claims := &jwt.StandardClaims{ + ExpiresAt: int64(15 * time.Minute), + Issuer: c.Username, + Subject: config.EnvDNSWebhook, + } + + token := jwt.NewWithClaims(jwt.SigningMethodHS512, claims) + ss, err := token.SignedString([]byte(c.Password)) + if err != nil { + return r, err + } + + r.Header.Set("Authorization", "Bearer "+ss) + return r, nil +} + +func (c *OperatorDNS) endpoint(bucket string, delete bool) (string, error) { + u, err := url.Parse(c.Endpoint) + if err != nil { + return "", err + } + q := u.Query() + q.Add("bucket", bucket) + if delete { + q.Add("delete", "true") + } else { + q.Add("delete", "false") + } + u.RawQuery = q.Encode() + return u.String(), nil +} + +// Put - Adds DNS entries into operator webhook server +func (c *OperatorDNS) Put(bucket string) error { + ctx, cancel := context.WithTimeout(context.Background(), defaultOperatorContextTimeout) + defer cancel() + e, err := c.endpoint(bucket, false) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e, nil) + if err != nil { + return err + } + req, err = c.addAuthHeader(req) + if err != nil { + return err + } + resp, err := c.httpClient.Do(req) + if err != nil { + if err := c.Delete(bucket); err != nil { + return err + } + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("request to create the service for bucket %s, failed with status %s", bucket, resp.Status) + } + return nil +} + +// Delete - Removes DNS entries added in Put(). +func (c *OperatorDNS) Delete(bucket string) error { + ctx, cancel := context.WithTimeout(context.Background(), defaultOperatorContextTimeout) + defer cancel() + e, err := c.endpoint(bucket, true) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e, nil) + if err != nil { + return err + } + req, err = c.addAuthHeader(req) + if err != nil { + return err + } + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("request to delete the service for bucket %s, failed with status %s", bucket, resp.Status) + } + return nil +} + +// DeleteRecord - Removes a specific DNS entry +// No Op for Operator because operator deals on with bucket entries +func (c *OperatorDNS) DeleteRecord(record SrvRecord) error { + return ErrNotImplemented +} + +// Close closes the internal http client +func (c *OperatorDNS) Close() error { + c.httpClient.CloseIdleConnections() + return nil +} + +// List - Retrieves list of DNS entries for the domain. +// This is a No Op for Operator because, there is no intent to enforce global +// namespace at MinIO level with this DNS entry. The global namespace in +// enforced by the Kubernetes Operator +func (c *OperatorDNS) List() (srvRecords map[string][]SrvRecord, err error) { + return nil, ErrNotImplemented +} + +// Get - Retrieves DNS records for a bucket. +// This is a No Op for Operator because, there is no intent to enforce global +// namespace at MinIO level with this DNS entry. The global namespace in +// enforced by the Kubernetes Operator +func (c *OperatorDNS) Get(bucket string) (srvRecords []SrvRecord, err error) { + return nil, ErrNotImplemented +} + +// OperatorDNS - represents dns config for MinIO k8s operator. +type OperatorDNS struct { + httpClient *http.Client + Endpoint string + Username string + Password string +} + +// NewOperatorDNS - initialize a new K8S Operator DNS set/unset values. +func NewOperatorDNS(endpoint, user, pwd string) (Store, error) { + if endpoint == "" || user == "" || pwd == "" { + return nil, errors.New("invalid argument") + } + args := &OperatorDNS{ + Username: user, + Password: pwd, + Endpoint: endpoint, + httpClient: &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 3 * time.Second, + KeepAlive: 5 * time.Second, + }).DialContext, + ResponseHeaderTimeout: 3 * time.Second, + TLSHandshakeTimeout: 3 * time.Second, + ExpectContinueTimeout: 3 * time.Second, + TLSClientConfig: &tls.Config{ + RootCAs: globalRootCAs, + }, + // Go net/http automatically unzip if content-type is + // gzip disable this feature, as we are always interested + // in raw stream. + DisableCompression: true, + }, + }, + } + + return args, nil +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d96076661..0ae990bf9 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -764,7 +764,7 @@ func getRemoteInstanceClientLongTimeout(r *http.Request, host string) (*miniogo. // when federation is enabled, ie when globalDNSConfig is non 'nil'. // // This function is similar to isRemoteCallRequired but specifically for COPY object API -// if destination and source are same we do not need to check for destnation bucket +// if destination and source are same we do not need to check for destination bucket // to exist locally. func isRemoteCopyRequired(ctx context.Context, srcBucket, dstBucket string, objAPI ObjectLayer) bool { if srcBucket == dstBucket { @@ -2713,7 +2713,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. if globalDNSConfig != nil { _, err := globalDNSConfig.Get(bucket) - if err != nil { + if err != nil && err != dns.ErrNotImplemented { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/server-main.go b/cmd/server-main.go index fc2f201d4..6d7980765 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -32,6 +32,7 @@ import ( "github.com/minio/cli" "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/config/etcd/dns" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" @@ -129,6 +130,7 @@ func serverHandleCmdArgs(ctx *cli.Context) { // Register root CAs for remote ENVs env.RegisterGlobalCAs(globalRootCAs) + dns.RegisterGlobalCAs(globalRootCAs) globalMinioAddr = globalCLIContext.Addr diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index bcb344ae5..ae5ca118e 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -179,7 +179,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep if globalDNSConfig != nil { if _, err := globalDNSConfig.Get(args.BucketName); err != nil { - if err == dns.ErrNoEntriesFound { + if err == dns.ErrNoEntriesFound || err == dns.ErrNotImplemented { // Proceed to creating a bucket. if err = objectAPI.MakeBucketWithLocation(ctx, args.BucketName, opts); err != nil { return toJSONError(ctx, err) @@ -281,7 +281,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, if globalDNSConfig != nil { if err := globalDNSConfig.Delete(args.BucketName); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to delete bucket DNS entry %w, please delete it manually using etcdctl", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to delete bucket DNS entry %w, please delete it manually", err)) return toJSONError(ctx, err) } } diff --git a/pkg/env/env.go b/pkg/env/env.go index 6a875bbc5..820a4248a 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -45,7 +45,7 @@ func SetEnvOn() { // IsSet returns if the given env key is set. func IsSet(key string) bool { - _, ok := LookupEnv(key) + _, _, _, ok := LookupEnv(key) return ok } @@ -60,7 +60,7 @@ func Get(key, defaultValue string) string { if ok { return defaultValue } - if v, ok := LookupEnv(key); ok { + if v, _, _, ok := LookupEnv(key); ok { return v } return defaultValue diff --git a/pkg/env/web_env.go b/pkg/env/web_env.go index 2adcd41a6..7e0fd9032 100644 --- a/pkg/env/web_env.go +++ b/pkg/env/web_env.go @@ -72,10 +72,10 @@ func fetchHTTPConstituentParts(u *url.URL) (username string, password string, en return username, password, envURL, nil } -func getEnvValueFromHTTP(urlStr, envKey string) (string, error) { +func getEnvValueFromHTTP(urlStr, envKey string) (string, string, string, error) { u, err := url.Parse(urlStr) if err != nil { - return "", err + return "", "", "", err } switch u.Scheme { @@ -84,12 +84,12 @@ func getEnvValueFromHTTP(urlStr, envKey string) (string, error) { case webEnvSchemeSecure: u.Scheme = "https" default: - return "", errors.New("invalid arguments") + return "", "", "", errors.New("invalid arguments") } username, password, envURL, err := fetchHTTPConstituentParts(u) if err != nil { - return "", err + return "", "", "", err } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -97,7 +97,7 @@ func getEnvValueFromHTTP(urlStr, envKey string) (string, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, envURL+"?key="+envKey, nil) if err != nil { - return "", err + return "", "", "", err } claims := &jwt.StandardClaims{ @@ -109,7 +109,7 @@ func getEnvValueFromHTTP(urlStr, envKey string) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodHS512, claims) ss, err := token.SignedString([]byte(password)) if err != nil { - return "", err + return "", "", "", err } req.Header.Set("Authorization", "Bearer "+ss) @@ -136,15 +136,15 @@ func getEnvValueFromHTTP(urlStr, envKey string) (string, error) { resp, err := clnt.Do(req) if err != nil { - return "", err + return "", "", "", err } envValueBytes, err := ioutil.ReadAll(resp.Body) if err != nil { - return "", err + return "", "", "", err } - return string(envValueBytes), nil + return string(envValueBytes), username, password, nil } // Environ returns a copy of strings representing the @@ -161,23 +161,27 @@ func Environ() []string { // // Additionally if the input is env://username:password@remote:port/ // to fetch ENV values for the env value from a remote server. -func LookupEnv(key string) (string, bool) { +// In this case, it also returns the credentials username and password +func LookupEnv(key string) (string, string, string, bool) { v, ok := os.LookupEnv(key) if ok && strings.HasPrefix(v, webEnvScheme) { // If env value starts with `env*://` // continue to parse and fetch from remote var err error - v, err = getEnvValueFromHTTP(strings.TrimSpace(v), key) + v, user, pwd, err := getEnvValueFromHTTP(strings.TrimSpace(v), key) if err != nil { - // fallback to cached value if-any. - return os.LookupEnv("_" + key) + env, eok := os.LookupEnv("_" + key) + if eok { + // fallback to cached value if-any. + return env, user, pwd, eok + } } // Set the ENV value to _env value, // this value is a fallback in-case of // server restarts when webhook server // is down. os.Setenv("_"+key, v) - return v, true + return v, user, pwd, true } - return v, ok + return v, "", "", ok } diff --git a/pkg/env/web_env_test.go b/pkg/env/web_env_test.go index 79903f83f..759dbe3ff 100644 --- a/pkg/env/web_env_test.go +++ b/pkg/env/web_env_test.go @@ -67,7 +67,7 @@ func TestWebEnv(t *testing.T) { t.Fatal(err) } - v, err := getEnvValueFromHTTP( + v, user, pwd, err := getEnvValueFromHTTP( fmt.Sprintf("env://minio:minio123@%s/webhook/v1/getenv/default/minio", u.Host), "MINIO_ARGS") @@ -78,4 +78,12 @@ func TestWebEnv(t *testing.T) { if v != "http://127.0.0.{1..4}:9000/data{1...4}" { t.Fatalf("Unexpected value %s", v) } + + if user != "minio" { + t.Fatalf("Unexpected value %s", v) + } + + if pwd != "minio123" { + t.Fatalf("Unexpected value %s", v) + } }