|
|
|
@ -2,11 +2,7 @@ package donut |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"encoding/gob" |
|
|
|
|
"encoding/json" |
|
|
|
|
"errors" |
|
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
|
"io" |
|
|
|
|
"io/ioutil" |
|
|
|
|
"log" |
|
|
|
@ -16,6 +12,12 @@ import ( |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"encoding/gob" |
|
|
|
|
"encoding/json" |
|
|
|
|
|
|
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type donutDriver struct { |
|
|
|
@ -23,6 +25,7 @@ type donutDriver struct { |
|
|
|
|
nodes map[string]Node |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewDonutDriver - instantiate new donut driver
|
|
|
|
|
func NewDonutDriver(root string) Donut { |
|
|
|
|
nodes := make(map[string]Node) |
|
|
|
|
nodes["localhost"] = localDirectoryNode{root: root} |
|
|
|
@ -55,8 +58,8 @@ func (driver donutDriver) CreateBucket(bucketName string) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (driver donutDriver) ListBuckets() ([]string, error) { |
|
|
|
|
buckets := make([]string, 0) |
|
|
|
|
for bucket, _ := range driver.buckets { |
|
|
|
|
var buckets []string |
|
|
|
|
for bucket := range driver.buckets { |
|
|
|
|
buckets = append(buckets, bucket) |
|
|
|
|
} |
|
|
|
|
sort.Strings(buckets) |
|
|
|
@ -65,13 +68,13 @@ func (driver donutDriver) ListBuckets() ([]string, error) { |
|
|
|
|
|
|
|
|
|
func (driver donutDriver) GetObjectWriter(bucketName, objectName string) (ObjectWriter, error) { |
|
|
|
|
if bucket, ok := driver.buckets[bucketName]; ok == true { |
|
|
|
|
writers := make([]DonutWriter, 16) |
|
|
|
|
writers := make([]Writer, 16) |
|
|
|
|
nodes, err := bucket.GetNodes() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
for i, nodeId := range nodes { |
|
|
|
|
if node, ok := driver.nodes[nodeId]; ok == true { |
|
|
|
|
for i, nodeID := range nodes { |
|
|
|
|
if node, ok := driver.nodes[nodeID]; ok == true { |
|
|
|
|
writer, _ := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName) |
|
|
|
|
writers[i] = writer |
|
|
|
|
} |
|
|
|
@ -90,16 +93,16 @@ func (driver donutDriver) GetObject(bucketName, objectName string) (io.ReadClose |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
var metadata map[string]string |
|
|
|
|
for i, nodeId := range nodes { |
|
|
|
|
if node, ok := driver.nodes[nodeId]; ok == true { |
|
|
|
|
bucketId := bucketName + ":0:" + strconv.Itoa(i) |
|
|
|
|
reader, err := node.GetReader(bucketId, objectName) |
|
|
|
|
for i, nodeID := range nodes { |
|
|
|
|
if node, ok := driver.nodes[nodeID]; ok == true { |
|
|
|
|
bucketID := bucketName + ":0:" + strconv.Itoa(i) |
|
|
|
|
reader, err := node.GetReader(bucketID, objectName) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
readers[i] = reader |
|
|
|
|
if metadata == nil { |
|
|
|
|
metadata, err = node.GetDonutMetadata(bucketId, objectName) |
|
|
|
|
metadata, err = node.GetDonutMetadata(bucketID, objectName) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
@ -151,14 +154,14 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
|
// erasure writer
|
|
|
|
|
|
|
|
|
|
type erasureWriter struct { |
|
|
|
|
writers []DonutWriter |
|
|
|
|
writers []Writer |
|
|
|
|
metadata map[string]string |
|
|
|
|
donutMetadata map[string]string // not exposed
|
|
|
|
|
erasureWriter *io.PipeWriter |
|
|
|
|
isClosed <-chan bool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newErasureWriter(writers []DonutWriter) ObjectWriter { |
|
|
|
|
func newErasureWriter(writers []Writer) ObjectWriter { |
|
|
|
|
r, w := io.Pipe() |
|
|
|
|
isClosed := make(chan bool) |
|
|
|
|
writer := erasureWriter{ |
|
|
|
@ -208,19 +211,19 @@ func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- b |
|
|
|
|
isClosed <- true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self erasureWriter) Write(data []byte) (int, error) { |
|
|
|
|
io.Copy(self.erasureWriter, bytes.NewBuffer(data)) |
|
|
|
|
func (d erasureWriter) Write(data []byte) (int, error) { |
|
|
|
|
io.Copy(d.erasureWriter, bytes.NewBuffer(data)) |
|
|
|
|
return len(data), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self erasureWriter) Close() error { |
|
|
|
|
self.erasureWriter.Close() |
|
|
|
|
<-self.isClosed |
|
|
|
|
func (d erasureWriter) Close() error { |
|
|
|
|
d.erasureWriter.Close() |
|
|
|
|
<-d.isClosed |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self erasureWriter) CloseWithError(err error) error { |
|
|
|
|
for _, writer := range self.writers { |
|
|
|
|
func (d erasureWriter) CloseWithError(err error) error { |
|
|
|
|
for _, writer := range d.writers { |
|
|
|
|
if writer != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
} |
|
|
|
@ -228,19 +231,19 @@ func (self erasureWriter) CloseWithError(err error) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self erasureWriter) SetMetadata(metadata map[string]string) error { |
|
|
|
|
for k, _ := range self.metadata { |
|
|
|
|
delete(self.metadata, k) |
|
|
|
|
func (d erasureWriter) SetMetadata(metadata map[string]string) error { |
|
|
|
|
for k := range d.metadata { |
|
|
|
|
delete(d.metadata, k) |
|
|
|
|
} |
|
|
|
|
for k, v := range metadata { |
|
|
|
|
self.metadata[k] = v |
|
|
|
|
d.metadata[k] = v |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self erasureWriter) GetMetadata() (map[string]string, error) { |
|
|
|
|
func (d erasureWriter) GetMetadata() (map[string]string, error) { |
|
|
|
|
metadata := make(map[string]string) |
|
|
|
|
for k, v := range self.metadata { |
|
|
|
|
for k, v := range d.metadata { |
|
|
|
|
metadata[k] = v |
|
|
|
|
} |
|
|
|
|
return metadata, nil |
|
|
|
@ -250,12 +253,12 @@ type localDirectoryNode struct { |
|
|
|
|
root string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self localDirectoryNode) GetBuckets() ([]string, error) { |
|
|
|
|
func (d localDirectoryNode) GetBuckets() ([]string, error) { |
|
|
|
|
return nil, errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self localDirectoryNode) GetWriter(bucket, object string) (DonutWriter, error) { |
|
|
|
|
objectPath := path.Join(self.root, bucket, object) |
|
|
|
|
func (d localDirectoryNode) GetWriter(bucket, object string) (Writer, error) { |
|
|
|
|
objectPath := path.Join(d.root, bucket, object) |
|
|
|
|
err := os.MkdirAll(objectPath, 0700) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
@ -263,19 +266,19 @@ func (self localDirectoryNode) GetWriter(bucket, object string) (DonutWriter, er |
|
|
|
|
return newDonutFileWriter(objectPath) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { |
|
|
|
|
return os.Open(path.Join(self.root, bucket, object, "data")) |
|
|
|
|
func (d localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { |
|
|
|
|
return os.Open(path.Join(d.root, bucket, object, "data")) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { |
|
|
|
|
return self.getMetadata(bucket, object, "metadata.json") |
|
|
|
|
func (d localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { |
|
|
|
|
return d.getMetadata(bucket, object, "metadata.json") |
|
|
|
|
} |
|
|
|
|
func (self localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) { |
|
|
|
|
return self.getMetadata(bucket, object, "donutMetadata.json") |
|
|
|
|
func (d localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) { |
|
|
|
|
return d.getMetadata(bucket, object, "donutMetadata.json") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { |
|
|
|
|
file, err := os.Open(path.Join(self.root, bucket, object, fileName)) |
|
|
|
|
func (d localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { |
|
|
|
|
file, err := os.Open(path.Join(d.root, bucket, object, fileName)) |
|
|
|
|
defer file.Close() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
@ -289,7 +292,7 @@ func (self localDirectoryNode) getMetadata(bucket, object, fileName string) (map |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newDonutFileWriter(objectDir string) (DonutWriter, error) { |
|
|
|
|
func newDonutFileWriter(objectDir string) (Writer, error) { |
|
|
|
|
dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE, 0600) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
@ -310,64 +313,64 @@ type donutFileWriter struct { |
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) Write(data []byte) (int, error) { |
|
|
|
|
return self.file.Write(data) |
|
|
|
|
func (d donutFileWriter) Write(data []byte) (int, error) { |
|
|
|
|
return d.file.Write(data) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) Close() error { |
|
|
|
|
if self.err != nil { |
|
|
|
|
return self.err |
|
|
|
|
func (d donutFileWriter) Close() error { |
|
|
|
|
if d.err != nil { |
|
|
|
|
return d.err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
self.file.Close() |
|
|
|
|
d.file.Close() |
|
|
|
|
|
|
|
|
|
metadata, _ := json.Marshal(self.metadata) |
|
|
|
|
ioutil.WriteFile(path.Join(self.root, "metadata.json"), metadata, 0600) |
|
|
|
|
donutMetadata, _ := json.Marshal(self.donutMetadata) |
|
|
|
|
ioutil.WriteFile(path.Join(self.root, "donutMetadata.json"), donutMetadata, 0600) |
|
|
|
|
metadata, _ := json.Marshal(d.metadata) |
|
|
|
|
ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600) |
|
|
|
|
donutMetadata, _ := json.Marshal(d.donutMetadata) |
|
|
|
|
ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) CloseWithError(err error) error { |
|
|
|
|
if self.err != nil { |
|
|
|
|
self.err = err |
|
|
|
|
func (d donutFileWriter) CloseWithError(err error) error { |
|
|
|
|
if d.err != nil { |
|
|
|
|
d.err = err |
|
|
|
|
} |
|
|
|
|
self.file.Close() |
|
|
|
|
d.file.Close() |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) SetMetadata(metadata map[string]string) error { |
|
|
|
|
for k := range self.metadata { |
|
|
|
|
delete(self.metadata, k) |
|
|
|
|
func (d donutFileWriter) SetMetadata(metadata map[string]string) error { |
|
|
|
|
for k := range d.metadata { |
|
|
|
|
delete(d.metadata, k) |
|
|
|
|
} |
|
|
|
|
for k, v := range metadata { |
|
|
|
|
self.metadata[k] = v |
|
|
|
|
d.metadata[k] = v |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) GetMetadata() (map[string]string, error) { |
|
|
|
|
func (d donutFileWriter) GetMetadata() (map[string]string, error) { |
|
|
|
|
metadata := make(map[string]string) |
|
|
|
|
for k, v := range self.metadata { |
|
|
|
|
for k, v := range d.metadata { |
|
|
|
|
metadata[k] = v |
|
|
|
|
} |
|
|
|
|
return metadata, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) SetDonutMetadata(metadata map[string]string) error { |
|
|
|
|
for k := range self.donutMetadata { |
|
|
|
|
delete(self.donutMetadata, k) |
|
|
|
|
func (d donutFileWriter) SetDonutMetadata(metadata map[string]string) error { |
|
|
|
|
for k := range d.donutMetadata { |
|
|
|
|
delete(d.donutMetadata, k) |
|
|
|
|
} |
|
|
|
|
for k, v := range metadata { |
|
|
|
|
self.donutMetadata[k] = v |
|
|
|
|
d.donutMetadata[k] = v |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self donutFileWriter) GetDonutMetadata() (map[string]string, error) { |
|
|
|
|
func (d donutFileWriter) GetDonutMetadata() (map[string]string, error) { |
|
|
|
|
donutMetadata := make(map[string]string) |
|
|
|
|
for k, v := range self.donutMetadata { |
|
|
|
|
for k, v := range d.donutMetadata { |
|
|
|
|
donutMetadata[k] = v |
|
|
|
|
} |
|
|
|
|
return donutMetadata, nil |
|
|
|
|