Move remote disk StorageAPI abstraction from RPC to REST (#6464)

master
Krishna Srinivas 6 years ago committed by kannappanr
parent 670f9788e3
commit 81bee93b8d
  1. 2
      cmd/object-api-common.go
  2. 108
      cmd/rest/client.go
  3. 2
      cmd/routers.go
  4. 343
      cmd/storage-rest-client.go
  5. 52
      cmd/storage-rest-common.go
  6. 353
      cmd/storage-rest-server.go
  7. 112
      cmd/storage-rest_test.go
  8. 338
      cmd/storage-rpc-client.go
  9. 233
      cmd/storage-rpc-server.go
  10. 10
      cmd/utils.go
  11. 22
      cmd/utils_test.go
  12. 3
      cmd/xl-sets.go

@ -100,7 +100,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
return newPosix(endpoint.Path)
}
return newStorageRPC(endpoint), nil
return newStorageRESTClient(endpoint), nil
}
// Cleanup a directory recursively.

@ -0,0 +1,108 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rest
import (
"context"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
xhttp "github.com/minio/minio/cmd/http"
)
// DefaultRESTTimeout - default RPC timeout is one minute.
const DefaultRESTTimeout = 1 * time.Minute
// Client - http based RPC client.
type Client struct {
httpClient *http.Client
url *url.URL
newAuthToken func() string
}
// Call - make a REST call.
func (c *Client) Call(method string, values url.Values, body io.Reader) (reply io.ReadCloser, err error) {
req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.newAuthToken())
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
// Limit the ReadAll(), just in case, because of a bug, the server responds with large data.
r := io.LimitReader(resp.Body, 1024)
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return nil, errors.New(string(b))
}
return resp.Body, nil
}
func newCustomDialContext(timeout time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: timeout,
KeepAlive: timeout,
DualStack: true,
}
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
return xhttp.NewTimeoutConn(conn, timeout, timeout), nil
}
}
// NewClient - returns new RPC client.
func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) *Client {
return &Client{
httpClient: &http.Client{
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig.
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(timeout),
MaxIdleConnsPerHost: 4096,
MaxIdleConns: 4096,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: tlsConfig,
DisableCompression: true,
},
},
url: url,
newAuthToken: newAuthToken,
}
}

@ -36,7 +36,7 @@ func newCacheObjectsFn() CacheObjectLayer {
// Composed function registering routers for only distributed XL setup.
func registerDistXLRouters(router *mux.Router, endpoints EndpointList) {
// Register storage rpc router only if its a distributed setup.
registerStorageRPCRouters(router, endpoints)
registerStorageRESTHandlers(router, endpoints)
// Register distributed namespace lock.
registerDistNSLockRouter(router)

@ -0,0 +1,343 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"crypto/tls"
"io"
"io/ioutil"
"net"
"net/url"
"path"
"strconv"
"encoding/gob"
"encoding/hex"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net"
)
func isNetworkDisconnectError(err error) bool {
if err == nil {
return false
}
if uerr, isURLError := err.(*url.Error); isURLError {
if uerr.Timeout() {
return true
}
err = uerr.Err
}
_, isNetOpError := err.(*net.OpError)
return isNetOpError
}
// Converts rpc.ServerError to underlying error. This function is
// written so that the storageAPI errors are consistent across network
// disks as well.
func toStorageErr(err error) error {
if err == nil {
return nil
}
if isNetworkDisconnectError(err) {
return errDiskNotFound
}
switch err.Error() {
case io.EOF.Error():
return io.EOF
case io.ErrUnexpectedEOF.Error():
return io.ErrUnexpectedEOF
case errUnexpected.Error():
return errUnexpected
case errDiskFull.Error():
return errDiskFull
case errVolumeNotFound.Error():
return errVolumeNotFound
case errVolumeExists.Error():
return errVolumeExists
case errFileNotFound.Error():
return errFileNotFound
case errFileNameTooLong.Error():
return errFileNameTooLong
case errFileAccessDenied.Error():
return errFileAccessDenied
case errIsNotRegular.Error():
return errIsNotRegular
case errVolumeNotEmpty.Error():
return errVolumeNotEmpty
case errVolumeAccessDenied.Error():
return errVolumeAccessDenied
case errCorruptedFormat.Error():
return errCorruptedFormat
case errUnformattedDisk.Error():
return errUnformattedDisk
case errInvalidAccessKeyID.Error():
return errInvalidAccessKeyID
case errAuthentication.Error():
return errAuthentication
case errRPCAPIVersionUnsupported.Error():
return errRPCAPIVersionUnsupported
case errServerTimeMismatch.Error():
return errServerTimeMismatch
}
return err
}
// Abstracts a remote disk.
type storageRESTClient struct {
endpoint Endpoint
restClient *rest.Client
connected bool
lastError error
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
// permanently. The only way to restore the storage connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *storageRESTClient) call(method string, values url.Values, body io.Reader) (respBody io.ReadCloser, err error) {
if !client.connected {
return nil, errDiskNotFound
}
respBody, err = client.restClient.Call(method, values, body)
if err == nil {
return respBody, nil
}
client.lastError = err
if isNetworkDisconnectError(err) {
client.connected = false
}
return nil, toStorageErr(err)
}
// Stringer provides a canonicalized representation of network device.
func (client *storageRESTClient) String() string {
return client.endpoint.String()
}
// IsOnline - returns whether RPC client failed to connect or not.
func (client *storageRESTClient) IsOnline() bool {
return client.connected
}
// LastError - returns the network error if any.
func (client *storageRESTClient) LastError() error {
return client.lastError
}
// DiskInfo - fetch disk information for a remote disk.
func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) {
respBody, err := client.call(storageRESTMethodDiskInfo, nil, nil)
if err != nil {
return
}
defer CloseResponse(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// MakeVol - create a volume on a remote disk.
func (client *storageRESTClient) MakeVol(volume string) (err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(storageRESTMethodMakeVol, values, nil)
defer CloseResponse(respBody)
return err
}
// ListVols - List all volumes on a remote disk.
func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) {
respBody, err := client.call(storageRESTMethodListVols, nil, nil)
if err != nil {
return
}
defer CloseResponse(respBody)
err = gob.NewDecoder(respBody).Decode(&volinfo)
return volinfo, err
}
// StatVol - get volume info over the network.
func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(storageRESTMethodStatVol, values, nil)
if err != nil {
return
}
defer CloseResponse(respBody)
err = gob.NewDecoder(respBody).Decode(&volInfo)
return volInfo, err
}
// DeleteVol - Deletes a volume over the network.
func (client *storageRESTClient) DeleteVol(volume string) (err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(storageRESTMethodDeleteVol, values, nil)
defer CloseResponse(respBody)
return err
}
// PrepareFile - to fallocate() disk space for a file.
func (client *storageRESTClient) PrepareFile(volume, path string, length int64) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(length)))
respBody, err := client.call(storageRESTMethodPrepareFile, values, nil)
defer CloseResponse(respBody)
return err
}
// AppendFile - append to a file.
func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
reader := bytes.NewBuffer(buffer)
respBody, err := client.call(storageRESTMethodAppendFile, values, reader)
defer CloseResponse(respBody)
return err
}
// StatFile - stat a file.
func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(storageRESTMethodStatFile, values, nil)
if err != nil {
return info, err
}
defer CloseResponse(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// ReadAll - reads all contents of a file.
func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(storageRESTMethodReadAll, values, nil)
if err != nil {
return nil, err
}
defer CloseResponse(respBody)
return ioutil.ReadAll(respBody)
}
// ReadFile - reads section of a file.
func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (int64, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTOffset, strconv.Itoa(int(offset)))
values.Set(storageRESTLength, strconv.Itoa(len(buffer)))
if verifier != nil {
values.Set(storageRESTBitrotAlgo, verifier.algorithm.String())
values.Set(storageRESTBitrotHash, hex.EncodeToString(verifier.sum))
} else {
values.Set(storageRESTBitrotAlgo, "")
values.Set(storageRESTBitrotHash, "")
}
respBody, err := client.call(storageRESTMethodReadFile, values, nil)
if err != nil {
return 0, err
}
defer CloseResponse(respBody)
n, err := io.ReadFull(respBody, buffer)
return int64(n), err
}
// ListDir - lists a directory.
func (client *storageRESTClient) ListDir(volume, dirPath string, count int) (entries []string, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTCount, strconv.Itoa(count))
respBody, err := client.call(storageRESTMethodListDir, values, nil)
if err != nil {
return nil, err
}
defer CloseResponse(respBody)
err = gob.NewDecoder(respBody).Decode(&entries)
return entries, err
}
// DeleteFile - deletes a file.
func (client *storageRESTClient) DeleteFile(volume, path string) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(storageRESTMethodDeleteFile, values, nil)
defer CloseResponse(respBody)
return err
}
// RenameFile - renames a file.
func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
values := make(url.Values)
values.Set(storageRESTSrcVolume, srcVolume)
values.Set(storageRESTSrcPath, srcPath)
values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(storageRESTMethodRenameFile, values, nil)
defer CloseResponse(respBody)
return err
}
// Close - marks the client as closed.
func (client *storageRESTClient) Close() error {
client.connected = false
return nil
}
// Returns a storage rest client.
func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
host, err := xnet.ParseHost(endpoint.Host)
logger.FatalIf(err, "Unable to parse storage Host")
scheme := "http"
if globalIsSSL {
scheme = "https"
}
serverURL := &url.URL{
Scheme: scheme,
Host: endpoint.Host,
Path: path.Join(storageRESTPath, endpoint.Path),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: host.Name,
RootCAs: globalRootCAs,
}
}
restClient := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken)
return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: true}
}

@ -0,0 +1,52 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
const storageRESTVersion = "v1"
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/"
const (
storageRESTMethodDiskInfo = "diskinfo"
storageRESTMethodMakeVol = "makevol"
storageRESTMethodStatVol = "statvol"
storageRESTMethodDeleteVol = "deletevol"
storageRESTMethodListVols = "listvols"
storageRESTMethodPrepareFile = "preparefile"
storageRESTMethodAppendFile = "appendfile"
storageRESTMethodStatFile = "statfile"
storageRESTMethodReadAll = "readall"
storageRESTMethodReadFile = "readfile"
storageRESTMethodListDir = "listdir"
storageRESTMethodDeleteFile = "deletefile"
storageRESTMethodRenameFile = "renamefile"
)
const (
storageRESTVolume = "volume"
storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path"
storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path"
storageRESTDstVolume = "destination-volume"
storageRESTDstPath = "destination-path"
storageRESTOffset = "offset"
storageRESTLength = "length"
storageRESTCount = "count"
storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash"
)

@ -0,0 +1,353 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"io"
"path"
"strconv"
"net/http"
"encoding/gob"
"encoding/hex"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
)
// To abstract a disk over network.
type storageRESTServer struct {
storage *posix
}
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
}
// IsValid - To authenticate and verify the time difference.
func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
requestTimeStr := r.Header.Get("X-Minio-Time")
requestTime, err := time.Parse(time.RFC3339, requestTimeStr)
if err != nil {
s.writeErrorResponse(w, err)
return false
}
utcNow := UTCNow()
delta := requestTime.Sub(utcNow)
if delta < 0 {
delta = delta * -1
}
if delta > DefaultSkewTime {
s.writeErrorResponse(w, fmt.Errorf("client time %v is too apart with server time %v", requestTime, utcNow))
return false
}
return true
}
// DiskInfoHandler - returns disk info.
func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
info, err := s.storage.DiskInfo()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info)
}
// MakeVolHandler - make a volume.
func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
err := s.storage.MakeVol(volume)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// ListVolsHandler - list volumes.
func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
infos, err := s.storage.ListVols()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(&infos)
}
// StatVolHandler - stat a volume.
func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
info, err := s.storage.StatVol(volume)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info)
}
// DeleteVolumeHandler - delete a volume.
func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
err := s.storage.DeleteVol(volume)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// PrepareFileHandler - fallocate() space for a file.
func (s *storageRESTServer) PrepareFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
fileSizeStr := vars[storageRESTLength]
fileSize, err := strconv.Atoi(fileSizeStr)
if err != nil {
s.writeErrorResponse(w, err)
return
}
err = s.storage.PrepareFile(volume, filePath, int64(fileSize))
if err != nil {
s.writeErrorResponse(w, err)
}
}
// AppendFileHandler - append to a file.
func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
buf := make([]byte, r.ContentLength)
_, err := io.ReadFull(r.Body, buf)
if err != nil {
s.writeErrorResponse(w, err)
return
}
err = s.storage.AppendFile(volume, filePath, buf)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// StatFileHandler - stat a file.
func (s *storageRESTServer) StatFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
info, err := s.storage.StatFile(volume, filePath)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info)
}
// ReadAllHandler - read all the contents of a file.
func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
buf, err := s.storage.ReadAll(volume, filePath)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
w.Write(buf)
}
// ReadFileHandler - read section of a file.
func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
offset, err := strconv.Atoi(vars[storageRESTOffset])
if err != nil {
s.writeErrorResponse(w, err)
return
}
length, err := strconv.Atoi(vars[storageRESTLength])
if err != nil {
s.writeErrorResponse(w, err)
return
}
if offset < 0 || length < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
var verifier *BitrotVerifier
if vars[storageRESTBitrotAlgo] != "" {
hashStr := vars[storageRESTBitrotHash]
var hash []byte
hash, err = hex.DecodeString(hashStr)
if err != nil {
s.writeErrorResponse(w, err)
return
}
verifier = NewBitrotVerifier(BitrotAlgorithmFromString(vars[storageRESTBitrotAlgo]), hash)
}
buf := make([]byte, length)
_, err = s.storage.ReadFile(volume, filePath, int64(offset), buf, verifier)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
w.Write(buf)
}
// ListDirHandler - list a directory.
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
count, err := strconv.Atoi(vars[storageRESTCount])
if err != nil {
s.writeErrorResponse(w, err)
return
}
entries, err := s.storage.ListDir(volume, dirPath, count)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(&entries)
}
// DeleteFileHandler - delete a file.
func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
err := s.storage.DeleteFile(volume, filePath)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// RenameFileHandler - rename a file.
func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
srcVolume := vars[storageRESTSrcVolume]
srcFilePath := vars[storageRESTSrcPath]
dstVolume := vars[storageRESTDstVolume]
dstFilePath := vars[storageRESTDstPath]
err := s.storage.RenameFile(srcVolume, srcFilePath, dstVolume, dstFilePath)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
for _, endpoint := range endpoints {
if !endpoint.IsLocal {
continue
}
storage, err := newPosix(endpoint.Path)
if err != nil {
logger.Fatal(uiErrUnableToWriteInBackend(err), "Unable to initialize posix backend")
}
server := &storageRESTServer{storage}
subrouter := router.PathPrefix(path.Join(storageRESTPath, endpoint.Path)).Subrouter()
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler))
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler)).Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler))
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodPrepareFile).HandlerFunc(httpTraceHdrs(server.PrepareFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodAppendFile).HandlerFunc(httpTraceHdrs(server.AppendFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatFile).HandlerFunc(httpTraceHdrs(server.StatFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadAll).HandlerFunc(httpTraceHdrs(server.ReadAllHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadFile).HandlerFunc(httpTraceHdrs(server.ReadFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength, storageRESTBitrotAlgo, storageRESTBitrotHash)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)
}
}

@ -18,18 +18,18 @@ package cmd
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"
"github.com/gorilla/mux"
xnet "github.com/minio/minio/pkg/net"
)
///////////////////////////////////////////////////////////////////////////////
//
// Storage RPC server, storageRPCReceiver and StorageRPCClient are
// Storage REST server, storageRESTReceiver and StorageRESTClient are
// inter-dependent, below test functions are sufficient to test all of them.
//
///////////////////////////////////////////////////////////////////////////////
@ -520,182 +520,174 @@ func testStorageAPIRenameFile(t *testing.T, storage StorageAPI) {
}
}
func newStorageRPCHTTPServerClient(t *testing.T) (*httptest.Server, *StorageRPCClient, *serverConfig, string) {
endpointPath, err := ioutil.TempDir("", ".TestStorageRPC.")
func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRESTClient, *serverConfig, string) {
endpointPath, err := ioutil.TempDir("", ".TestStorageREST.")
if err != nil {
t.Fatalf("unexpected error %v", err)
}
rpcServer, err := NewStorageRPCServer(endpointPath)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rpcServer.ServeHTTP(w, r)
}))
router := mux.NewRouter()
httpServer := httptest.NewServer(router)
url, err := xnet.ParseURL(httpServer.URL)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
url.Path = endpointPath
host, err := xnet.ParseHost(url.Host)
endpoint, err := NewEndpoint(url.String())
if err != nil {
t.Fatalf("unexpected error %v", err)
t.Fatalf("NewEndpoint failed %v", endpoint)
}
registerStorageRESTHandlers(router, EndpointList{endpoint})
restClient := newStorageRESTClient(endpoint)
prevGlobalServerConfig := globalServerConfig
globalServerConfig = newServerConfig()
rpcClient, err := NewStorageRPCClient(host, endpointPath)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
rpcClient.connected = true
return httpServer, rpcClient, prevGlobalServerConfig, endpointPath
return httpServer, restClient, prevGlobalServerConfig, endpointPath
}
func TestStorageRPCClientDiskInfo(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientDiskInfo(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIDiskInfo(t, rpcClient)
testStorageAPIDiskInfo(t, restClient)
}
func TestStorageRPCClientMakeVol(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientMakeVol(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIMakeVol(t, rpcClient)
testStorageAPIMakeVol(t, restClient)
}
func TestStorageRPCClientListVols(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientListVols(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIListVols(t, rpcClient)
testStorageAPIListVols(t, restClient)
}
func TestStorageRPCClientStatVol(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientStatVol(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIStatVol(t, rpcClient)
testStorageAPIStatVol(t, restClient)
}
func TestStorageRPCClientDeleteVol(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientDeleteVol(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIDeleteVol(t, rpcClient)
testStorageAPIDeleteVol(t, restClient)
}
func TestStorageRPCClientStatFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientStatFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIStatFile(t, rpcClient)
testStorageAPIStatFile(t, restClient)
}
func TestStorageRPCClientListDir(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientListDir(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIListDir(t, rpcClient)
testStorageAPIListDir(t, restClient)
}
func TestStorageRPCClientReadAll(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientReadAll(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIReadAll(t, rpcClient)
testStorageAPIReadAll(t, restClient)
}
func TestStorageRPCClientReadFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientReadFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIReadFile(t, rpcClient)
testStorageAPIReadFile(t, restClient)
}
func TestStorageRPCClientPrepareFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientPrepareFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIPrepareFile(t, rpcClient)
testStorageAPIPrepareFile(t, restClient)
}
func TestStorageRPCClientAppendFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientAppendFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIAppendFile(t, rpcClient)
testStorageAPIAppendFile(t, restClient)
}
func TestStorageRPCClientDeleteFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientDeleteFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIDeleteFile(t, rpcClient)
testStorageAPIDeleteFile(t, restClient)
}
func TestStorageRPCClientRenameFile(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t)
func TestStorageRESTClientRenameFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
defer os.RemoveAll(endpointPath)
testStorageAPIRenameFile(t, rpcClient)
testStorageAPIRenameFile(t, restClient)
}

@ -1,338 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"crypto/tls"
"io"
"net"
"net/url"
"path"
"strings"
"github.com/minio/minio/cmd/logger"
xnet "github.com/minio/minio/pkg/net"
)
func isNetworkDisconnectError(err error) bool {
if err == nil {
return false
}
if uerr, isURLError := err.(*url.Error); isURLError {
if uerr.Timeout() {
return true
}
err = uerr.Err
}
_, isNetOpError := err.(*net.OpError)
return isNetOpError
}
// Converts rpc.ServerError to underlying error. This function is
// written so that the storageAPI errors are consistent across network
// disks as well.
func toStorageErr(err error) error {
if err == nil {
return nil
}
if isNetworkDisconnectError(err) {
return errDiskNotFound
}
switch err.Error() {
case io.EOF.Error():
return io.EOF
case io.ErrUnexpectedEOF.Error():
return io.ErrUnexpectedEOF
case errUnexpected.Error():
return errUnexpected
case errDiskFull.Error():
return errDiskFull
case errVolumeNotFound.Error():
return errVolumeNotFound
case errVolumeExists.Error():
return errVolumeExists
case errFileNotFound.Error():
return errFileNotFound
case errFileNameTooLong.Error():
return errFileNameTooLong
case errFileAccessDenied.Error():
return errFileAccessDenied
case errIsNotRegular.Error():
return errIsNotRegular
case errVolumeNotEmpty.Error():
return errVolumeNotEmpty
case errVolumeAccessDenied.Error():
return errVolumeAccessDenied
case errCorruptedFormat.Error():
return errCorruptedFormat
case errUnformattedDisk.Error():
return errUnformattedDisk
case errInvalidAccessKeyID.Error():
return errInvalidAccessKeyID
case errAuthentication.Error():
return errAuthentication
case errRPCAPIVersionUnsupported.Error():
return errRPCAPIVersionUnsupported
case errServerTimeMismatch.Error():
return errServerTimeMismatch
}
return err
}
// StorageRPCClient - storage RPC client.
type StorageRPCClient struct {
*RPCClient
connected bool
// Plain error of the last RPC call
lastRPCError error
}
// Stringer provides a canonicalized representation of network device.
func (client *StorageRPCClient) String() string {
url := client.ServiceURL()
// Remove the storage RPC path prefix, internal paths are meaningless. why?
url.Path = strings.TrimPrefix(url.Path, storageServicePath)
return url.String()
}
// LastError - returns the last RPC call result, nil or error if any
func (client *StorageRPCClient) LastError() error {
return client.lastRPCError
}
// Close - closes underneath RPC client.
func (client *StorageRPCClient) Close() error {
client.connected = false
return toStorageErr(client.RPCClient.Close())
}
// IsOnline - returns whether RPC client failed to connect or not.
func (client *StorageRPCClient) IsOnline() bool {
return client.connected
}
func (client *StorageRPCClient) connect() {
err := client.Call(storageServiceName+".Connect", &AuthArgs{}, &VoidReply{})
client.lastRPCError = err
client.connected = err == nil
}
func (client *StorageRPCClient) call(handler string, args interface {
SetAuthArgs(args AuthArgs)
}, reply interface{}) error {
if !client.connected {
return errDiskNotFound
}
err := client.Call(handler, args, reply)
client.lastRPCError = err
if err == nil {
return nil
}
if isNetworkDisconnectError(err) {
client.connected = false
}
return toStorageErr(err)
}
// DiskInfo - fetch disk information for a remote disk.
func (client *StorageRPCClient) DiskInfo() (info DiskInfo, err error) {
err = client.call(storageServiceName+".DiskInfo", &AuthArgs{}, &info)
return info, err
}
// MakeVol - create a volume on a remote disk.
func (client *StorageRPCClient) MakeVol(volume string) (err error) {
return client.call(storageServiceName+".MakeVol", &VolArgs{Vol: volume}, &VoidReply{})
}
// ListVols - List all volumes on a remote disk.
func (client *StorageRPCClient) ListVols() ([]VolInfo, error) {
var reply []VolInfo
err := client.call(storageServiceName+".ListVols", &AuthArgs{}, &reply)
return reply, err
}
// StatVol - get volume info over the network.
func (client *StorageRPCClient) StatVol(volume string) (volInfo VolInfo, err error) {
err = client.call(storageServiceName+".StatVol", &VolArgs{Vol: volume}, &volInfo)
return volInfo, err
}
// DeleteVol - Deletes a volume over the network.
func (client *StorageRPCClient) DeleteVol(volume string) (err error) {
return client.call(storageServiceName+".DeleteVol", &VolArgs{Vol: volume}, &VoidReply{})
}
// File operations.
// PrepareFile - calls PrepareFile RPC.
func (client *StorageRPCClient) PrepareFile(volume, path string, length int64) (err error) {
args := PrepareFileArgs{
Vol: volume,
Path: path,
Size: length,
}
reply := VoidReply{}
return client.call(storageServiceName+".PrepareFile", &args, &reply)
}
// AppendFile - append file writes buffer to a remote network path.
func (client *StorageRPCClient) AppendFile(volume, path string, buffer []byte) (err error) {
args := AppendFileArgs{
Vol: volume,
Path: path,
Buffer: buffer,
}
reply := VoidReply{}
return client.call(storageServiceName+".AppendFile", &args, &reply)
}
// StatFile - get latest Stat information for a file at path.
func (client *StorageRPCClient) StatFile(volume, path string) (fileInfo FileInfo, err error) {
err = client.call(storageServiceName+".StatFile", &StatFileArgs{Vol: volume, Path: path}, &fileInfo)
return fileInfo, err
}
// ReadAll - reads entire contents of the file at path until EOF, returns the
// contents in a byte slice. Returns buf == nil if err != nil.
// This API is meant to be used on files which have small memory footprint, do
// not use this on large files as it would cause server to crash.
func (client *StorageRPCClient) ReadAll(volume, path string) (buf []byte, err error) {
err = client.call(storageServiceName+".ReadAll", &ReadAllArgs{Vol: volume, Path: path}, &buf)
return buf, err
}
// ReadFile - reads a file at remote path and fills the buffer.
func (client *StorageRPCClient) ReadFile(volume string, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (m int64, err error) {
// Recover from any panic and return error.
defer func() {
if r := recover(); r != nil {
err = bytes.ErrTooLarge
}
}()
args := ReadFileArgs{
Vol: volume,
Path: path,
Offset: offset,
Length: int64(len(buffer)),
Verified: verifier == nil, // Marked accordingly if verifier is set or not.
}
if verifier != nil {
args.Algo = verifier.algorithm
args.ExpectedHash = verifier.sum
}
var reply []byte
err = client.call(storageServiceName+".ReadFile", &args, &reply)
// Copy reply to buffer.
copy(buffer, reply)
// Return length of result, err if any.
return int64(len(reply)), err
}
// ListDir - list all entries at prefix.
func (client *StorageRPCClient) ListDir(volume, path string, count int) (entries []string, err error) {
err = client.call(storageServiceName+".ListDir", &ListDirArgs{Vol: volume, Path: path, Count: count}, &entries)
return entries, err
}
// DeleteFile - Delete a file at path.
func (client *StorageRPCClient) DeleteFile(volume, path string) (err error) {
args := DeleteFileArgs{
Vol: volume,
Path: path,
}
reply := VoidReply{}
return client.call(storageServiceName+".DeleteFile", &args, &reply)
}
// RenameFile - rename a remote file from source to destination.
func (client *StorageRPCClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
args := RenameFileArgs{
SrcVol: srcVolume,
SrcPath: srcPath,
DstVol: dstVolume,
DstPath: dstPath,
}
reply := VoidReply{}
return client.call(storageServiceName+".RenameFile", &args, &reply)
}
// NewStorageRPCClient - returns new storage RPC client.
func NewStorageRPCClient(host *xnet.Host, endpointPath string) (*StorageRPCClient, error) {
scheme := "http"
if globalIsSSL {
scheme = "https"
}
serviceURL := &xnet.URL{
Scheme: scheme,
Host: host.String(),
Path: path.Join(storageServicePath, endpointPath),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: host.Name,
RootCAs: globalRootCAs,
}
}
rpcClient, err := NewRPCClient(
RPCClientArgs{
NewAuthTokenFunc: newAuthToken,
RPCVersion: globalRPCAPIVersion,
ServiceName: storageServiceName,
ServiceURL: serviceURL,
TLSConfig: tlsConfig,
},
)
if err != nil {
return nil, err
}
return &StorageRPCClient{RPCClient: rpcClient}, nil
}
// Initialize new storage rpc client.
func newStorageRPC(endpoint Endpoint) *StorageRPCClient {
host, err := xnet.ParseHost(endpoint.Host)
logger.FatalIf(err, "Unable to parse storage RPC Host")
rpcClient, err := NewStorageRPCClient(host, endpoint.Path)
logger.FatalIf(err, "Unable to initialize storage RPC client")
// Attempt first try connection and save error if any.
rpcClient.connect()
return rpcClient
}

@ -1,233 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io"
"path"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
xrpc "github.com/minio/minio/cmd/rpc"
)
const storageServiceName = "Storage"
const storageServiceSubPath = "/storage"
var storageServicePath = path.Join(minioReservedBucketPath, storageServiceSubPath)
// storageRPCReceiver - Storage RPC receiver for storage RPC server
type storageRPCReceiver struct {
local *posix
}
// VolArgs - generic volume args.
type VolArgs struct {
AuthArgs
Vol string
}
/// Storage operations handlers.
// Connect - authenticates remote connection.
func (receiver *storageRPCReceiver) Connect(args *AuthArgs, reply *VoidReply) (err error) {
return args.Authenticate()
}
// DiskInfo - disk info handler is rpc wrapper for DiskInfo operation.
func (receiver *storageRPCReceiver) DiskInfo(args *AuthArgs, reply *DiskInfo) (err error) {
*reply, err = receiver.local.DiskInfo()
return err
}
/// Volume operations handlers.
// MakeVol - make vol handler is rpc wrapper for MakeVol operation.
func (receiver *storageRPCReceiver) MakeVol(args *VolArgs, reply *VoidReply) error {
return receiver.local.MakeVol(args.Vol)
}
// ListVols - list vols handler is rpc wrapper for ListVols operation.
func (receiver *storageRPCReceiver) ListVols(args *AuthArgs, reply *[]VolInfo) (err error) {
*reply, err = receiver.local.ListVols()
return err
}
// StatVol - stat vol handler is a rpc wrapper for StatVol operation.
func (receiver *storageRPCReceiver) StatVol(args *VolArgs, reply *VolInfo) (err error) {
*reply, err = receiver.local.StatVol(args.Vol)
return err
}
// DeleteVol - delete vol handler is a rpc wrapper for
// DeleteVol operation.
func (receiver *storageRPCReceiver) DeleteVol(args *VolArgs, reply *VoidReply) error {
return receiver.local.DeleteVol(args.Vol)
}
/// File operations
// StatFileArgs represents stat file RPC arguments.
type StatFileArgs struct {
AuthArgs
Vol string
Path string
}
// StatFile - stat file handler is rpc wrapper to stat file.
func (receiver *storageRPCReceiver) StatFile(args *StatFileArgs, reply *FileInfo) (err error) {
*reply, err = receiver.local.StatFile(args.Vol, args.Path)
return err
}
// ListDirArgs represents list contents RPC arguments.
type ListDirArgs struct {
AuthArgs
Vol string
Path string
Count int
}
// ListDir - list directory handler is rpc wrapper to list dir.
func (receiver *storageRPCReceiver) ListDir(args *ListDirArgs, reply *[]string) (err error) {
*reply, err = receiver.local.ListDir(args.Vol, args.Path, args.Count)
return err
}
// ReadAllArgs represents read all RPC arguments.
type ReadAllArgs struct {
AuthArgs
Vol string
Path string
}
// ReadAll - read all handler is rpc wrapper to read all storage API.
func (receiver *storageRPCReceiver) ReadAll(args *ReadAllArgs, reply *[]byte) (err error) {
*reply, err = receiver.local.ReadAll(args.Vol, args.Path)
return err
}
// ReadFileArgs represents read file RPC arguments.
type ReadFileArgs struct {
AuthArgs
Vol string
Path string
Offset int64
Length int64
Algo BitrotAlgorithm
ExpectedHash []byte
Verified bool
}
// ReadFile - read file handler is rpc wrapper to read file.
func (receiver *storageRPCReceiver) ReadFile(args *ReadFileArgs, reply *[]byte) error {
var verifier *BitrotVerifier
if !args.Verified {
verifier = NewBitrotVerifier(args.Algo, args.ExpectedHash)
}
buf := make([]byte, args.Length)
n, err := receiver.local.ReadFile(args.Vol, args.Path, args.Offset, buf, verifier)
// Ignore io.ErrEnexpectedEOF for short reads i.e. less content available than requested.
if err == io.ErrUnexpectedEOF {
err = nil
}
*reply = buf[0:n]
return err
}
// PrepareFileArgs represents append file RPC arguments.
type PrepareFileArgs struct {
AuthArgs
Vol string
Path string
Size int64
}
// PrepareFile - prepare file handler is rpc wrapper to prepare file.
func (receiver *storageRPCReceiver) PrepareFile(args *PrepareFileArgs, reply *VoidReply) error {
return receiver.local.PrepareFile(args.Vol, args.Path, args.Size)
}
// AppendFileArgs represents append file RPC arguments.
type AppendFileArgs struct {
AuthArgs
Vol string
Path string
Buffer []byte
}
// AppendFile - append file handler is rpc wrapper to append file.
func (receiver *storageRPCReceiver) AppendFile(args *AppendFileArgs, reply *VoidReply) error {
return receiver.local.AppendFile(args.Vol, args.Path, args.Buffer)
}
// DeleteFileArgs represents delete file RPC arguments.
type DeleteFileArgs struct {
AuthArgs
Vol string
Path string
}
// DeleteFile - delete file handler is rpc wrapper to delete file.
func (receiver *storageRPCReceiver) DeleteFile(args *DeleteFileArgs, reply *VoidReply) error {
return receiver.local.DeleteFile(args.Vol, args.Path)
}
// RenameFileArgs represents rename file RPC arguments.
type RenameFileArgs struct {
AuthArgs
SrcVol string
SrcPath string
DstVol string
DstPath string
}
// RenameFile - rename file handler is rpc wrapper to rename file.
func (receiver *storageRPCReceiver) RenameFile(args *RenameFileArgs, reply *VoidReply) error {
return receiver.local.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath)
}
// NewStorageRPCServer - returns new storage RPC server.
func NewStorageRPCServer(endpointPath string) (*xrpc.Server, error) {
storage, err := newPosix(endpointPath)
if err != nil {
return nil, err
}
rpcServer := xrpc.NewServer()
if err = rpcServer.RegisterName(storageServiceName, &storageRPCReceiver{storage}); err != nil {
return nil, err
}
return rpcServer, nil
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRPCRouters(router *mux.Router, endpoints EndpointList) {
for _, endpoint := range endpoints {
if endpoint.IsLocal {
rpcServer, err := NewStorageRPCServer(endpoint.Path)
if err != nil {
logger.Fatal(uiErrUnableToWriteInBackend(err), "Unable to configure one of server's RPC services")
}
subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter()
subrouter.Path(path.Join(storageServiceSubPath, endpoint.Path)).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP))
}
}
}

@ -458,3 +458,13 @@ func CloseResponse(respBody io.ReadCloser) {
respBody.Close()
}
}
// Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example)
// If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"]
func restQueries(keys ...string) []string {
var accumulator []string
for _, key := range keys {
accumulator = append(accumulator, key, "{"+key+":.*}")
}
return accumulator
}

@ -470,3 +470,25 @@ func TestIsErrIgnored(t *testing.T) {
}
}
}
// Test queries()
func TestQueries(t *testing.T) {
var testCases = []struct {
keys []string
keyvalues []string
}{
{
[]string{"aaaa", "bbbb"},
[]string{"aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"},
},
}
for i, test := range testCases {
keyvalues := restQueries(test.keys...)
for j := range keyvalues {
if keyvalues[j] != test.keyvalues[j] {
t.Fatalf("test %d: keyvalues[%d] does not match", i+1, j)
}
}
}
}

@ -194,6 +194,9 @@ func (s *xlSets) connectDisksWithQuorum() {
s.xlDisks[i][j] = disk
onlineDisks++
}
// Sleep for a while - so that we don't go into
// 100% CPU when half the disks are online.
time.Sleep(500 * time.Millisecond)
}
}

Loading…
Cancel
Save