From 41653774fcc217fac0d3f04647051f19c71e16e9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 8 Mar 2015 16:58:36 -0700 Subject: [PATCH] Simplify fs codebase, split them into separate files and more commenting --- pkg/storage/donut/erasure/erasure.go | 1 + pkg/storage/donut/erasure/erasure1/erasure.go | 3 +- .../donut/fragment/fragment1/fragment.go | 4 +- pkg/storage/fs/fs.go | 501 ------------------ pkg/storage/fs/fs_bucket.go | 133 +++++ pkg/storage/fs/fs_common.go | 89 ++++ pkg/storage/fs/fs_filter.go | 94 ++++ pkg/storage/fs/fs_object.go | 214 ++++++++ pkg/storage/fs/fs_policy.go | 111 ++++ pkg/storage/storage.go | 49 ++ 10 files changed, 695 insertions(+), 504 deletions(-) create mode 100644 pkg/storage/fs/fs_bucket.go create mode 100644 pkg/storage/fs/fs_common.go create mode 100644 pkg/storage/fs/fs_filter.go create mode 100644 pkg/storage/fs/fs_object.go create mode 100644 pkg/storage/fs/fs_policy.go diff --git a/pkg/storage/donut/erasure/erasure.go b/pkg/storage/donut/erasure/erasure.go index 5a2d7d0e4..ee96be4f3 100644 --- a/pkg/storage/donut/erasure/erasure.go +++ b/pkg/storage/donut/erasure/erasure.go @@ -7,6 +7,7 @@ import ( "github.com/minio-io/minio/pkg/storage/donut/erasure/erasure1" ) +// EncoderTechnique - encoder matrix type type EncoderTechnique int const ( diff --git a/pkg/storage/donut/erasure/erasure1/erasure.go b/pkg/storage/donut/erasure/erasure1/erasure.go index c911bed1f..137441714 100644 --- a/pkg/storage/donut/erasure/erasure1/erasure.go +++ b/pkg/storage/donut/erasure/erasure1/erasure.go @@ -25,6 +25,7 @@ import ( "strconv" ) +// Version - format version const ( Version = uint32(1) ) @@ -102,7 +103,7 @@ func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, return nil } -// Read an erasure block +// ReadHeader - read an erasure block func ReadHeader(reader io.Reader) (dataHeader DataHeader, err error) { versionArray := make([]byte, 4) if err := binary.Read(reader, binary.LittleEndian, versionArray); err != nil { diff --git a/pkg/storage/donut/fragment/fragment1/fragment.go b/pkg/storage/donut/fragment/fragment1/fragment.go index 10bbfb916..4481dba7c 100644 --- a/pkg/storage/donut/fragment/fragment1/fragment.go +++ b/pkg/storage/donut/fragment/fragment1/fragment.go @@ -146,7 +146,7 @@ func Write(target io.Writer, reader io.Reader, length uint64) error { return nil } -// Reads a donut fragment +// Read - reads a donut fragment func Read(reader io.Reader) (io.Reader, error) { header, err := ReadHeader(reader) if err != nil { @@ -185,7 +185,7 @@ func Read(reader io.Reader) (io.Reader, error) { return bytes.NewBuffer(data), nil } -// Reads the header of a donut +// ReadHeader - reads the header of a donut func ReadHeader(reader io.Reader) (header DonutFrameHeader, err error) { headerSlice := make([]byte, 32) headerLength, err := reader.Read(headerSlice) diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index eb529d87a..70f156ca7 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -17,36 +17,10 @@ package fs import ( - "bufio" - "bytes" - "crypto/md5" - "encoding/gob" - "encoding/hex" - "encoding/json" - "io" - "io/ioutil" "os" - "path" - "path/filepath" - "sort" - "strings" "sync" - - mstorage "github.com/minio-io/minio/pkg/storage" ) -// Storage - fs local variables -type Storage struct { - root string - lock *sync.Mutex -} - -// Metadata - carries metadata about object -type Metadata struct { - Md5sum []byte - ContentType string -} - // Start filesystem channel func Start(root string) (chan<- string, <-chan error, *Storage) { ctrlChannel := make(chan string) @@ -63,478 +37,3 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error, s *Storage) { errorChannel <- err close(errorChannel) } - -func appendUniq(slice []string, i string) []string { - for _, ele := range slice { - if ele == i { - return slice - } - } - return append(slice, i) -} - -/// Bucket Operations - -// ListBuckets - Get service -func (storage *Storage) ListBuckets() ([]mstorage.BucketMetadata, error) { - files, err := ioutil.ReadDir(storage.root) - if err != nil { - return []mstorage.BucketMetadata{}, mstorage.EmbedError("bucket", "", err) - } - - var metadataList []mstorage.BucketMetadata - for _, file := range files { - // Skip policy files - if strings.HasSuffix(file.Name(), "_policy.json") { - continue - } - if !file.IsDir() { - return []mstorage.BucketMetadata{}, mstorage.BackendCorrupted{Path: storage.root} - } - metadata := mstorage.BucketMetadata{ - Name: file.Name(), - Created: file.ModTime(), // TODO - provide real created time - } - metadataList = append(metadataList, metadata) - } - return metadataList, nil -} - -// StoreBucket - PUT Bucket -func (storage *Storage) StoreBucket(bucket string) error { - storage.lock.Lock() - defer storage.lock.Unlock() - - // verify bucket path legal - if mstorage.IsValidBucket(bucket) == false { - return mstorage.BucketNameInvalid{Bucket: bucket} - } - - // get bucket path - bucketDir := path.Join(storage.root, bucket) - - // check if bucket exists - if _, err := os.Stat(bucketDir); err == nil { - return mstorage.BucketExists{ - Bucket: bucket, - } - } - - // make bucket - err := os.Mkdir(bucketDir, 0700) - if err != nil { - return mstorage.EmbedError(bucket, "", err) - } - return nil -} - -// GetBucketPolicy - GET bucket policy -func (storage *Storage) GetBucketPolicy(bucket string) (mstorage.BucketPolicy, error) { - storage.lock.Lock() - defer storage.lock.Unlock() - - var p mstorage.BucketPolicy - // verify bucket path legal - if mstorage.IsValidBucket(bucket) == false { - return mstorage.BucketPolicy{}, mstorage.BucketNameInvalid{Bucket: bucket} - } - - // get bucket path - bucketDir := path.Join(storage.root, bucket) - // check if bucket exists - if _, err := os.Stat(bucketDir); err != nil { - return mstorage.BucketPolicy{}, mstorage.BucketNotFound{Bucket: bucket} - } - - // get policy path - bucketPolicy := path.Join(storage.root, bucket+"_mstoragejson") - filestat, err := os.Stat(bucketPolicy) - - if os.IsNotExist(err) { - return mstorage.BucketPolicy{}, mstorage.BucketPolicyNotFound{Bucket: bucket} - } - - if filestat.IsDir() { - return mstorage.BucketPolicy{}, mstorage.BackendCorrupted{Path: bucketPolicy} - } - - file, err := os.OpenFile(bucketPolicy, os.O_RDONLY, 0666) - defer file.Close() - if err != nil { - return mstorage.BucketPolicy{}, mstorage.EmbedError(bucket, "", err) - } - encoder := json.NewDecoder(file) - err = encoder.Decode(&p) - if err != nil { - return mstorage.BucketPolicy{}, mstorage.EmbedError(bucket, "", err) - } - - return p, nil - -} - -// StoreBucketPolicy - PUT bucket policy -func (storage *Storage) StoreBucketPolicy(bucket string, p mstorage.BucketPolicy) error { - storage.lock.Lock() - defer storage.lock.Unlock() - - // verify bucket path legal - if mstorage.IsValidBucket(bucket) == false { - return mstorage.BucketNameInvalid{Bucket: bucket} - } - - // get bucket path - bucketDir := path.Join(storage.root, bucket) - // check if bucket exists - if _, err := os.Stat(bucketDir); err != nil { - return mstorage.BucketNotFound{ - Bucket: bucket, - } - } - - // get policy path - bucketPolicy := path.Join(storage.root, bucket+"_policy.json") - filestat, ret := os.Stat(bucketPolicy) - if !os.IsNotExist(ret) { - if filestat.IsDir() { - return mstorage.BackendCorrupted{Path: bucketPolicy} - } - } - - file, err := os.OpenFile(bucketPolicy, os.O_WRONLY|os.O_CREATE, 0600) - defer file.Close() - if err != nil { - return mstorage.EmbedError(bucket, "", err) - } - encoder := json.NewEncoder(file) - err = encoder.Encode(p) - if err != nil { - return mstorage.EmbedError(bucket, "", err) - } - return nil -} - -/// Object Operations - -// CopyObjectToWriter - GET object -func (storage *Storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) { - // validate bucket - if mstorage.IsValidBucket(bucket) == false { - return 0, mstorage.BucketNameInvalid{Bucket: bucket} - } - - // validate object - if mstorage.IsValidObject(object) == false { - return 0, mstorage.ObjectNameInvalid{Bucket: bucket, Object: object} - } - - objectPath := path.Join(storage.root, bucket, object) - - filestat, err := os.Stat(objectPath) - switch err := err.(type) { - case nil: - { - if filestat.IsDir() { - return 0, mstorage.ObjectNotFound{Bucket: bucket, Object: object} - } - } - default: - { - if os.IsNotExist(err) { - return 0, mstorage.ObjectNotFound{Bucket: bucket, Object: object} - } - return 0, mstorage.EmbedError(bucket, object, err) - } - } - file, err := os.Open(objectPath) - defer file.Close() - if err != nil { - return 0, mstorage.EmbedError(bucket, object, err) - } - - count, err := io.Copy(w, file) - if err != nil { - return count, mstorage.EmbedError(bucket, object, err) - } - return count, nil -} - -// GetObjectMetadata - HEAD object -func (storage *Storage) GetObjectMetadata(bucket, object, prefix string) (mstorage.ObjectMetadata, error) { - if mstorage.IsValidBucket(bucket) == false { - return mstorage.ObjectMetadata{}, mstorage.BucketNameInvalid{Bucket: bucket} - } - - if mstorage.IsValidObject(object) == false { - return mstorage.ObjectMetadata{}, mstorage.ObjectNameInvalid{Bucket: bucket, Object: bucket} - } - - // Do not use path.Join() since path.Join strips off any object names with '/', use them as is - // in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat() - objectPath := storage.root + "/" + bucket + "/" + object - stat, err := os.Stat(objectPath) - if os.IsNotExist(err) { - return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object} - } - - _, err = os.Stat(objectPath + "$metadata") - if os.IsNotExist(err) { - return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object} - } - - file, err := os.Open(objectPath + "$metadata") - defer file.Close() - if err != nil { - return mstorage.ObjectMetadata{}, mstorage.EmbedError(bucket, object, err) - } - - var deserializedMetadata Metadata - decoder := gob.NewDecoder(file) - err = decoder.Decode(&deserializedMetadata) - if err != nil { - return mstorage.ObjectMetadata{}, mstorage.EmbedError(bucket, object, err) - } - - contentType := "application/octet-stream" - if deserializedMetadata.ContentType != "" { - contentType = deserializedMetadata.ContentType - } - contentType = strings.TrimSpace(contentType) - - etag := bucket + "#" + path.Base(object) - if len(deserializedMetadata.Md5sum) != 0 { - etag = hex.EncodeToString(deserializedMetadata.Md5sum) - } - trimmedObject := strings.TrimPrefix(object, prefix) - metadata := mstorage.ObjectMetadata{ - Bucket: bucket, - Key: trimmedObject, - Created: stat.ModTime(), - Size: stat.Size(), - ETag: etag, - ContentType: contentType, - } - - return metadata, nil -} - -type bucketDir struct { - files map[string]os.FileInfo - root string -} - -func (p *bucketDir) getAllFiles(object string, fl os.FileInfo, err error) error { - if err != nil { - return err - } - if fl.Mode().IsRegular() { - if strings.HasSuffix(object, "$metadata") { - return nil - } - _p := strings.Split(object, p.root+"/") - if len(_p) > 1 { - p.files[_p[1]] = fl - } - } - return nil -} - -func delimiter(object, delimiter string) string { - readBuffer := bytes.NewBufferString(object) - reader := bufio.NewReader(readBuffer) - stringReader := strings.NewReader(delimiter) - delimited, _ := stringReader.ReadByte() - delimitedStr, _ := reader.ReadString(delimited) - return delimitedStr -} - -type byObjectKey []mstorage.ObjectMetadata - -// Len -func (b byObjectKey) Len() int { return len(b) } - -// Swap -func (b byObjectKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] } - -// Less -func (b byObjectKey) Less(i, j int) bool { return b[i].Key < b[j].Key } - -// ListObjects - GET bucket (list objects) -func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) { - p := bucketDir{} - p.files = make(map[string]os.FileInfo) - - if mstorage.IsValidBucket(bucket) == false { - return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNameInvalid{Bucket: bucket} - } - if resources.Prefix != "" && mstorage.IsValidObject(resources.Prefix) == false { - return []mstorage.ObjectMetadata{}, resources, mstorage.ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix} - } - - rootPrefix := path.Join(storage.root, bucket) - // check bucket exists - if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { - return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNotFound{Bucket: bucket} - } - - p.root = rootPrefix - err := filepath.Walk(rootPrefix, p.getAllFiles) - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - - var metadataList []mstorage.ObjectMetadata - for name, file := range p.files { - if len(metadataList) >= resources.Maxkeys { - resources.IsTruncated = true - goto ret - } - // TODO handle resources.Marker - switch true { - // Delimiter present and Prefix is absent - case resources.Delimiter != "" && resources.Prefix == "": - delimitedName := delimiter(name, resources.Delimiter) - switch true { - case delimitedName == "": - // Do not strip prefix object output - metadata, err := storage.GetObjectMetadata(bucket, name, "") - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - case delimitedName == file.Name(): - // Do not strip prefix object output - metadata, err := storage.GetObjectMetadata(bucket, name, "") - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - case delimitedName != "": - resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName) - } - // Both delimiter and Prefix is present - case resources.Delimiter != "" && resources.Prefix != "" && strings.HasPrefix(name, resources.Prefix): - trimmedName := strings.TrimPrefix(name, resources.Prefix) - delimitedName := delimiter(trimmedName, resources.Delimiter) - switch true { - case name == resources.Prefix: - // Use resources.Prefix to filter out delimited files - metadata, err := storage.GetObjectMetadata(bucket, name, resources.Prefix) - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - case delimitedName == file.Name(): - // Use resources.Prefix to filter out delimited files - metadata, err := storage.GetObjectMetadata(bucket, name, resources.Prefix) - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - case delimitedName != "": - if delimitedName == resources.Delimiter { - resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, resources.Prefix+delimitedName) - } else { - resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName) - } - } - // Delimiter is absent and only Prefix is present - case resources.Delimiter == "" && resources.Prefix != "" && strings.HasPrefix(name, resources.Prefix): - // Do not strip prefix object output - metadata, err := storage.GetObjectMetadata(bucket, name, "") - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - case resources.Prefix == "" && resources.Delimiter == "": - metadata, err := storage.GetObjectMetadata(bucket, name, "") - if err != nil { - return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) - } - metadataList = append(metadataList, metadata) - } - } -ret: - sort.Sort(byObjectKey(metadataList)) - return metadataList, resources, nil -} - -// StoreObject - PUT object -func (storage *Storage) StoreObject(bucket, key, contentType string, data io.Reader) error { - // TODO Commits should stage then move instead of writing directly - storage.lock.Lock() - defer storage.lock.Unlock() - - // check bucket name valid - if mstorage.IsValidBucket(bucket) == false { - return mstorage.BucketNameInvalid{Bucket: bucket} - } - - // check bucket exists - if _, err := os.Stat(path.Join(storage.root, bucket)); os.IsNotExist(err) { - return mstorage.BucketNotFound{Bucket: bucket} - } - - // verify object path legal - if mstorage.IsValidObject(key) == false { - return mstorage.ObjectNameInvalid{Bucket: bucket, Object: key} - } - - // verify content type - if contentType == "" { - contentType = "application/octet-stream" - } - contentType = strings.TrimSpace(contentType) - - // get object path - objectPath := path.Join(storage.root, bucket, key) - objectDir := path.Dir(objectPath) - if _, err := os.Stat(objectDir); os.IsNotExist(err) { - err = os.MkdirAll(objectDir, 0700) - if err != nil { - return mstorage.EmbedError(bucket, key, err) - } - } - - // check if object exists - if _, err := os.Stat(objectPath); !os.IsNotExist(err) { - return mstorage.ObjectExists{ - Bucket: bucket, - Object: key, - } - } - - // write object - file, err := os.OpenFile(objectPath, os.O_WRONLY|os.O_CREATE, 0600) - defer file.Close() - if err != nil { - return mstorage.EmbedError(bucket, key, err) - } - - h := md5.New() - mw := io.MultiWriter(file, h) - - _, err = io.Copy(mw, data) - if err != nil { - return mstorage.EmbedError(bucket, key, err) - } - - // - file, err = os.OpenFile(objectPath+"$metadata", os.O_WRONLY|os.O_CREATE, 0600) - defer file.Close() - if err != nil { - return mstorage.EmbedError(bucket, key, err) - } - - // serialize metadata to gob - encoder := gob.NewEncoder(file) - err = encoder.Encode(&Metadata{ - ContentType: contentType, - Md5sum: h.Sum(nil), - }) - if err != nil { - return mstorage.EmbedError(bucket, key, err) - } - - return nil -} diff --git a/pkg/storage/fs/fs_bucket.go b/pkg/storage/fs/fs_bucket.go new file mode 100644 index 000000000..fbd870355 --- /dev/null +++ b/pkg/storage/fs/fs_bucket.go @@ -0,0 +1,133 @@ +/* + * Mini Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "os" + "path" + "sort" + "strings" + + "io/ioutil" + "path/filepath" + + mstorage "github.com/minio-io/minio/pkg/storage" +) + +/// Bucket Operations + +// ListBuckets - Get service +func (storage *Storage) ListBuckets() ([]mstorage.BucketMetadata, error) { + files, err := ioutil.ReadDir(storage.root) + if err != nil { + return []mstorage.BucketMetadata{}, mstorage.EmbedError("bucket", "", err) + } + + var metadataList []mstorage.BucketMetadata + for _, file := range files { + // Skip policy files + if strings.HasSuffix(file.Name(), "_policy.json") { + continue + } + if !file.IsDir() { + return []mstorage.BucketMetadata{}, mstorage.BackendCorrupted{Path: storage.root} + } + metadata := mstorage.BucketMetadata{ + Name: file.Name(), + Created: file.ModTime(), // TODO - provide real created time + } + metadataList = append(metadataList, metadata) + } + return metadataList, nil +} + +// StoreBucket - PUT Bucket +func (storage *Storage) StoreBucket(bucket string) error { + storage.lock.Lock() + defer storage.lock.Unlock() + + // verify bucket path legal + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketNameInvalid{Bucket: bucket} + } + + // get bucket path + bucketDir := path.Join(storage.root, bucket) + + // check if bucket exists + if _, err := os.Stat(bucketDir); err == nil { + return mstorage.BucketExists{ + Bucket: bucket, + } + } + + // make bucket + err := os.Mkdir(bucketDir, 0700) + if err != nil { + return mstorage.EmbedError(bucket, "", err) + } + return nil +} + +// ListObjects - GET bucket (list objects) +func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) { + p := bucketDir{} + p.files = make(map[string]os.FileInfo) + + if mstorage.IsValidBucket(bucket) == false { + return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNameInvalid{Bucket: bucket} + } + if resources.Prefix != "" && mstorage.IsValidObject(resources.Prefix) == false { + return []mstorage.ObjectMetadata{}, resources, mstorage.ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix} + } + + rootPrefix := path.Join(storage.root, bucket) + // check bucket exists + if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { + return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNotFound{Bucket: bucket} + } + + p.root = rootPrefix + err := filepath.Walk(rootPrefix, p.getAllFiles) + if err != nil { + return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + + var metadataList []mstorage.ObjectMetadata + var metadata mstorage.ObjectMetadata + + // Populate filtering mode + resources.Mode = mstorage.GetMode(resources) + + for name, file := range p.files { + if len(metadataList) >= resources.Maxkeys { + resources.IsTruncated = true + goto ret + } + metadata, resources, err = storage.filter(bucket, name, file, resources) + if err != nil { + return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + if metadata.Bucket != "" { + metadataList = append(metadataList, metadata) + } + } + +ret: + sort.Sort(byObjectKey(metadataList)) + return metadataList, resources, nil +} diff --git a/pkg/storage/fs/fs_common.go b/pkg/storage/fs/fs_common.go new file mode 100644 index 000000000..627ab7d6e --- /dev/null +++ b/pkg/storage/fs/fs_common.go @@ -0,0 +1,89 @@ +/* + * Mini Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "bufio" + "bytes" + "os" + "strings" + "sync" + + mstorage "github.com/minio-io/minio/pkg/storage" +) + +// Storage - fs local variables +type Storage struct { + root string + lock *sync.Mutex +} + +// Metadata - carries metadata about object +type Metadata struct { + Md5sum []byte + ContentType string +} + +func appendUniq(slice []string, i string) []string { + for _, ele := range slice { + if ele == i { + return slice + } + } + return append(slice, i) +} + +type bucketDir struct { + files map[string]os.FileInfo + root string +} + +func (p *bucketDir) getAllFiles(object string, fl os.FileInfo, err error) error { + if err != nil { + return err + } + if fl.Mode().IsRegular() { + if strings.HasSuffix(object, "$metadata") { + return nil + } + _p := strings.Split(object, p.root+"/") + if len(_p) > 1 { + p.files[_p[1]] = fl + } + } + return nil +} + +func delimiter(object, delimiter string) string { + readBuffer := bytes.NewBufferString(object) + reader := bufio.NewReader(readBuffer) + stringReader := strings.NewReader(delimiter) + delimited, _ := stringReader.ReadByte() + delimitedStr, _ := reader.ReadString(delimited) + return delimitedStr +} + +type byObjectKey []mstorage.ObjectMetadata + +// Len +func (b byObjectKey) Len() int { return len(b) } + +// Swap +func (b byObjectKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] } + +// Less +func (b byObjectKey) Less(i, j int) bool { return b[i].Key < b[j].Key } diff --git a/pkg/storage/fs/fs_filter.go b/pkg/storage/fs/fs_filter.go new file mode 100644 index 000000000..4a56b2bad --- /dev/null +++ b/pkg/storage/fs/fs_filter.go @@ -0,0 +1,94 @@ +/* + * Mini Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "os" + "strings" + + mstorage "github.com/minio-io/minio/pkg/storage" +) + +// TODO handle resources.Marker +func (storage *Storage) filter(bucket, name string, file os.FileInfo, resources mstorage.BucketResourcesMetadata) (mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) { + var err error + var metadata mstorage.ObjectMetadata + + switch true { + // Both delimiter and Prefix is present + case resources.IsDelimiterPrefixSet(): + if strings.HasPrefix(name, resources.Prefix) { + trimmedName := strings.TrimPrefix(name, resources.Prefix) + delimitedName := delimiter(trimmedName, resources.Delimiter) + switch true { + case name == resources.Prefix: + // Use resources.Prefix to filter out delimited files + metadata, err = storage.GetObjectMetadata(bucket, name, resources.Prefix) + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + case delimitedName == file.Name(): + // Use resources.Prefix to filter out delimited files + metadata, err = storage.GetObjectMetadata(bucket, name, resources.Prefix) + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + case delimitedName != "": + if delimitedName == resources.Delimiter { + resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, resources.Prefix+delimitedName) + } else { + resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName) + } + } + } + // Delimiter present and Prefix is absent + case resources.IsDelimiterSet(): + delimitedName := delimiter(name, resources.Delimiter) + switch true { + case delimitedName == "": + // Do not strip prefix object output + metadata, err = storage.GetObjectMetadata(bucket, name, "") + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + case delimitedName == file.Name(): + // Do not strip prefix object output + metadata, err = storage.GetObjectMetadata(bucket, name, "") + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + case delimitedName != "": + resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName) + } + // Delimiter is absent and only Prefix is present + case resources.IsPrefixSet(): + if strings.HasPrefix(name, resources.Prefix) { + // Do not strip prefix object output + metadata, err = storage.GetObjectMetadata(bucket, name, "") + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + } + case resources.IsDefault(): + metadata, err = storage.GetObjectMetadata(bucket, name, "") + if err != nil { + return mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err) + } + } + + return metadata, resources, nil +} diff --git a/pkg/storage/fs/fs_object.go b/pkg/storage/fs/fs_object.go new file mode 100644 index 000000000..cfb0a6ced --- /dev/null +++ b/pkg/storage/fs/fs_object.go @@ -0,0 +1,214 @@ +/* + * Mini Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "io" + "os" + "path" + "strings" + + "crypto/md5" + "encoding/gob" + "encoding/hex" + + mstorage "github.com/minio-io/minio/pkg/storage" +) + +/// Object Operations + +// CopyObjectToWriter - GET object +func (storage *Storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) { + // validate bucket + if mstorage.IsValidBucket(bucket) == false { + return 0, mstorage.BucketNameInvalid{Bucket: bucket} + } + + // validate object + if mstorage.IsValidObject(object) == false { + return 0, mstorage.ObjectNameInvalid{Bucket: bucket, Object: object} + } + + objectPath := path.Join(storage.root, bucket, object) + + filestat, err := os.Stat(objectPath) + switch err := err.(type) { + case nil: + { + if filestat.IsDir() { + return 0, mstorage.ObjectNotFound{Bucket: bucket, Object: object} + } + } + default: + { + if os.IsNotExist(err) { + return 0, mstorage.ObjectNotFound{Bucket: bucket, Object: object} + } + return 0, mstorage.EmbedError(bucket, object, err) + } + } + file, err := os.Open(objectPath) + defer file.Close() + if err != nil { + return 0, mstorage.EmbedError(bucket, object, err) + } + + count, err := io.Copy(w, file) + if err != nil { + return count, mstorage.EmbedError(bucket, object, err) + } + return count, nil +} + +// GetObjectMetadata - HEAD object +func (storage *Storage) GetObjectMetadata(bucket, object, prefix string) (mstorage.ObjectMetadata, error) { + if mstorage.IsValidBucket(bucket) == false { + return mstorage.ObjectMetadata{}, mstorage.BucketNameInvalid{Bucket: bucket} + } + + if mstorage.IsValidObject(object) == false { + return mstorage.ObjectMetadata{}, mstorage.ObjectNameInvalid{Bucket: bucket, Object: bucket} + } + + // Do not use path.Join() since path.Join strips off any object names with '/', use them as is + // in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat() + objectPath := storage.root + "/" + bucket + "/" + object + stat, err := os.Stat(objectPath) + if os.IsNotExist(err) { + return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object} + } + + _, err = os.Stat(objectPath + "$metadata") + if os.IsNotExist(err) { + return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object} + } + + file, err := os.Open(objectPath + "$metadata") + defer file.Close() + if err != nil { + return mstorage.ObjectMetadata{}, mstorage.EmbedError(bucket, object, err) + } + + var deserializedMetadata Metadata + decoder := gob.NewDecoder(file) + err = decoder.Decode(&deserializedMetadata) + if err != nil { + return mstorage.ObjectMetadata{}, mstorage.EmbedError(bucket, object, err) + } + + contentType := "application/octet-stream" + if deserializedMetadata.ContentType != "" { + contentType = deserializedMetadata.ContentType + } + contentType = strings.TrimSpace(contentType) + + etag := bucket + "#" + path.Base(object) + if len(deserializedMetadata.Md5sum) != 0 { + etag = hex.EncodeToString(deserializedMetadata.Md5sum) + } + trimmedObject := strings.TrimPrefix(object, prefix) + metadata := mstorage.ObjectMetadata{ + Bucket: bucket, + Key: trimmedObject, + Created: stat.ModTime(), + Size: stat.Size(), + ETag: etag, + ContentType: contentType, + } + + return metadata, nil +} + +// StoreObject - PUT object +func (storage *Storage) StoreObject(bucket, key, contentType string, data io.Reader) error { + // TODO Commits should stage then move instead of writing directly + storage.lock.Lock() + defer storage.lock.Unlock() + + // check bucket name valid + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketNameInvalid{Bucket: bucket} + } + + // check bucket exists + if _, err := os.Stat(path.Join(storage.root, bucket)); os.IsNotExist(err) { + return mstorage.BucketNotFound{Bucket: bucket} + } + + // verify object path legal + if mstorage.IsValidObject(key) == false { + return mstorage.ObjectNameInvalid{Bucket: bucket, Object: key} + } + + // verify content type + if contentType == "" { + contentType = "application/octet-stream" + } + contentType = strings.TrimSpace(contentType) + + // get object path + objectPath := path.Join(storage.root, bucket, key) + objectDir := path.Dir(objectPath) + if _, err := os.Stat(objectDir); os.IsNotExist(err) { + err = os.MkdirAll(objectDir, 0700) + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + } + + // check if object exists + if _, err := os.Stat(objectPath); !os.IsNotExist(err) { + return mstorage.ObjectExists{ + Bucket: bucket, + Object: key, + } + } + + // write object + file, err := os.OpenFile(objectPath, os.O_WRONLY|os.O_CREATE, 0600) + defer file.Close() + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + h := md5.New() + mw := io.MultiWriter(file, h) + + _, err = io.Copy(mw, data) + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + // + file, err = os.OpenFile(objectPath+"$metadata", os.O_WRONLY|os.O_CREATE, 0600) + defer file.Close() + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + // serialize metadata to gob + encoder := gob.NewEncoder(file) + err = encoder.Encode(&Metadata{ + ContentType: contentType, + Md5sum: h.Sum(nil), + }) + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + return nil +} diff --git a/pkg/storage/fs/fs_policy.go b/pkg/storage/fs/fs_policy.go new file mode 100644 index 000000000..c1f6a345b --- /dev/null +++ b/pkg/storage/fs/fs_policy.go @@ -0,0 +1,111 @@ +/* + * Mini Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "os" + "path" + + "encoding/json" + mstorage "github.com/minio-io/minio/pkg/storage" +) + +// GetBucketPolicy - GET bucket policy +func (storage *Storage) GetBucketPolicy(bucket string) (mstorage.BucketPolicy, error) { + storage.lock.Lock() + defer storage.lock.Unlock() + + var p mstorage.BucketPolicy + // verify bucket path legal + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketPolicy{}, mstorage.BucketNameInvalid{Bucket: bucket} + } + + // get bucket path + bucketDir := path.Join(storage.root, bucket) + // check if bucket exists + if _, err := os.Stat(bucketDir); err != nil { + return mstorage.BucketPolicy{}, mstorage.BucketNotFound{Bucket: bucket} + } + + // get policy path + bucketPolicy := path.Join(storage.root, bucket+"_mstoragejson") + filestat, err := os.Stat(bucketPolicy) + + if os.IsNotExist(err) { + return mstorage.BucketPolicy{}, mstorage.BucketPolicyNotFound{Bucket: bucket} + } + + if filestat.IsDir() { + return mstorage.BucketPolicy{}, mstorage.BackendCorrupted{Path: bucketPolicy} + } + + file, err := os.OpenFile(bucketPolicy, os.O_RDONLY, 0666) + defer file.Close() + if err != nil { + return mstorage.BucketPolicy{}, mstorage.EmbedError(bucket, "", err) + } + encoder := json.NewDecoder(file) + err = encoder.Decode(&p) + if err != nil { + return mstorage.BucketPolicy{}, mstorage.EmbedError(bucket, "", err) + } + + return p, nil + +} + +// StoreBucketPolicy - PUT bucket policy +func (storage *Storage) StoreBucketPolicy(bucket string, p mstorage.BucketPolicy) error { + storage.lock.Lock() + defer storage.lock.Unlock() + + // verify bucket path legal + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketNameInvalid{Bucket: bucket} + } + + // get bucket path + bucketDir := path.Join(storage.root, bucket) + // check if bucket exists + if _, err := os.Stat(bucketDir); err != nil { + return mstorage.BucketNotFound{ + Bucket: bucket, + } + } + + // get policy path + bucketPolicy := path.Join(storage.root, bucket+"_policy.json") + filestat, ret := os.Stat(bucketPolicy) + if !os.IsNotExist(ret) { + if filestat.IsDir() { + return mstorage.BackendCorrupted{Path: bucketPolicy} + } + } + + file, err := os.OpenFile(bucketPolicy, os.O_WRONLY|os.O_CREATE, 0600) + defer file.Close() + if err != nil { + return mstorage.EmbedError(bucket, "", err) + } + encoder := json.NewEncoder(file) + err = encoder.Encode(p) + if err != nil { + return mstorage.EmbedError(bucket, "", err) + } + return nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index b5607ae06..29333eea3 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -55,6 +55,17 @@ type ObjectMetadata struct { Size int64 } +// FilterMode type +type FilterMode int + +// FilterMode list +const ( + DelimiterPrefixMode FilterMode = iota + DelimiterMode + PrefixMode + DefaultMode +) + // BucketResourcesMetadata - various types of bucket resources type BucketResourcesMetadata struct { Prefix string @@ -63,6 +74,7 @@ type BucketResourcesMetadata struct { Delimiter string IsTruncated bool CommonPrefixes []string + Mode FilterMode Policy bool // TODO @@ -70,6 +82,43 @@ type BucketResourcesMetadata struct { Notification string } +// GetMode - Populate filter mode +func GetMode(resources BucketResourcesMetadata) FilterMode { + var f FilterMode + switch true { + case resources.Delimiter != "" && resources.Prefix != "": + f = DelimiterPrefixMode + case resources.Delimiter != "" && resources.Prefix == "": + f = DelimiterMode + case resources.Delimiter == "" && resources.Prefix != "": + f = PrefixMode + case resources.Delimiter == "" && resources.Prefix == "": + f = DefaultMode + } + + return f +} + +// IsDelimiterPrefixSet Delimiter and Prefix set +func (b BucketResourcesMetadata) IsDelimiterPrefixSet() bool { + return b.Mode == DelimiterPrefixMode +} + +// IsDelimiterSet Delimiter set +func (b BucketResourcesMetadata) IsDelimiterSet() bool { + return b.Mode == DelimiterMode +} + +// IsPrefixSet Prefix set +func (b BucketResourcesMetadata) IsPrefixSet() bool { + return b.Mode == PrefixMode +} + +// IsDefault No query values +func (b BucketResourcesMetadata) IsDefault() bool { + return b.Mode == DefaultMode +} + // IsValidBucket - verify bucket name in accordance with // - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html func IsValidBucket(bucket string) bool {