Add tests for storage rpc handlers (#3262)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent 398421b9f5
commit 7abcededf2
  1. 75
      cmd/storage-rpc-client_test.go
  2. 226
      cmd/storage-rpc-server_test.go

@ -194,6 +194,7 @@ func (s *TestRPCStorageSuite) testRPCStorageClient(t *testing.T) {
s.testRPCStorageDisksInfo(t) s.testRPCStorageDisksInfo(t)
s.testRPCStorageVolOps(t) s.testRPCStorageVolOps(t)
s.testRPCStorageFileOps(t) s.testRPCStorageFileOps(t)
s.testRPCStorageListDir(t)
} }
// Test storage disks info. // Test storage disks info.
@ -207,7 +208,7 @@ func (s *TestRPCStorageSuite) testRPCStorageDisksInfo(t *testing.T) {
t.Error("Invalid diskInfo total") t.Error("Invalid diskInfo total")
} }
if storageDisk.String() == "" { if storageDisk.String() == "" {
t.Error("Stinger storageAPI should be non empty") t.Error("String representation of storageAPI should not be empty")
} }
} }
} }
@ -215,10 +216,12 @@ func (s *TestRPCStorageSuite) testRPCStorageDisksInfo(t *testing.T) {
// Test storage vol operations. // Test storage vol operations.
func (s *TestRPCStorageSuite) testRPCStorageVolOps(t *testing.T) { func (s *TestRPCStorageSuite) testRPCStorageVolOps(t *testing.T) {
for _, storageDisk := range s.remoteDisks { for _, storageDisk := range s.remoteDisks {
numVols := 0
err := storageDisk.MakeVol("myvol") err := storageDisk.MakeVol("myvol")
if err != nil { if err != nil {
t.Error("Unable to initiate MakeVol", err) t.Error("Unable to initiate MakeVol", err)
} }
numVols++
volInfo, err := storageDisk.StatVol("myvol") volInfo, err := storageDisk.StatVol("myvol")
if err != nil { if err != nil {
t.Error("Unable to initiate StatVol", err) t.Error("Unable to initiate StatVol", err)
@ -229,10 +232,40 @@ func (s *TestRPCStorageSuite) testRPCStorageVolOps(t *testing.T) {
if volInfo.Created.IsZero() { if volInfo.Created.IsZero() {
t.Error("Expected created time to be non zero") t.Error("Expected created time to be non zero")
} }
for i := 0; i < 10; i++ {
err = storageDisk.MakeVol(fmt.Sprintf("myvol-%d", i))
if err != nil {
t.Error("Unable to initiate MakeVol", err)
}
numVols++
}
vols, err := storageDisk.ListVols()
if err != nil {
t.Error("Unable to initiate ListVol")
}
if len(vols) != numVols {
t.Errorf("Expected %d volumes but found only %d", numVols, len(vols))
}
for i := 0; i < 10; i++ {
err = storageDisk.DeleteVol(fmt.Sprintf("myvol-%d", i))
if err != nil {
t.Error("Unable to initiate DeleteVol", err)
}
}
err = storageDisk.DeleteVol("myvol") err = storageDisk.DeleteVol("myvol")
if err != nil { if err != nil {
t.Error("Unable to initiate DeleteVol", err) t.Error("Unable to initiate DeleteVol", err)
} }
vols, err = storageDisk.ListVols()
if err != nil {
t.Error("Unable to initiate ListVol")
}
if len(vols) > 0 {
t.Errorf("Expected no volumes but found %d", len(vols))
}
} }
} }
@ -302,3 +335,43 @@ func (s *TestRPCStorageSuite) testRPCStorageFileOps(t *testing.T) {
} }
} }
} }
// Tests for ListDirHandler.
func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) {
for _, storageDisk := range s.remoteDisks {
err := storageDisk.MakeVol("myvol")
if err != nil {
t.Error("Unable to initiate MakeVol", err)
}
dirCount := 10
for i := 0; i < dirCount; i++ {
err = storageDisk.MakeVol(fmt.Sprintf("myvol/mydir-%d", i))
if err != nil {
t.Error("Unable to initiate MakeVol", err)
}
}
dirs, err := storageDisk.ListDir("myvol", "")
if len(dirs) != dirCount {
t.Errorf("Expected %d directories but found only %d", dirCount, len(dirs))
}
for i := 0; i < dirCount; i++ {
err = storageDisk.DeleteVol(fmt.Sprintf("myvol/mydir-%d", i))
if err != nil {
t.Error("Unable to initiate DeleteVol", err)
}
}
dirs, err = storageDisk.ListDir("myvol", "")
if len(dirs) != 0 {
t.Errorf("Expected no directories but found %d", dirCount)
}
err = storageDisk.DeleteVol("myvol")
if err != nil {
t.Error("Unable to initiate DeleteVol", err)
}
vols, err := storageDisk.ListVols()
if len(vols) != 0 {
t.Errorf("Expected no volumes but found %d", dirCount)
}
}
}

@ -0,0 +1,226 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/url"
"testing"
"time"
"github.com/minio/minio/pkg/disk"
)
const invalidToken = "invalidToken"
type testStorageRPCServer struct {
configDir string
token string
diskDirs []string
stServer *storageServer
endpoints []*url.URL
}
func createTestStorageServer(t *testing.T) *testStorageRPCServer {
testPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
}
jwt, err := newJWT(defaultInterNodeJWTExpiry)
if err != nil {
t.Fatalf("unable to get new JWT, %s", err)
}
err = jwt.Authenticate(serverConfig.GetCredential().AccessKeyID, serverConfig.GetCredential().SecretAccessKey)
if err != nil {
t.Fatalf("unable for JWT to authenticate, %s", err)
}
token, err := jwt.GenerateToken(serverConfig.GetCredential().AccessKeyID)
if err != nil {
t.Fatalf("unable for JWT to generate token, %s", err)
}
fsDirs, err := getRandomDisks(1)
if err != nil {
t.Fatalf("unable to create FS backend, %s", err)
}
endpoints, err := parseStorageEndpoints(fsDirs)
if err != nil {
t.Fatalf("unable to parse storage endpoints, %s", err)
}
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
t.Fatalf("unable to initialize storage disks, %s", err)
}
stServer := &storageServer{
storage: storageDisks[0],
path: "/disk1",
timestamp: time.Now().UTC(),
}
return &testStorageRPCServer{
token: token,
configDir: testPath,
diskDirs: fsDirs,
endpoints: endpoints,
stServer: stServer,
}
}
func errorIfInvalidToken(t *testing.T, err error) {
realErr := errorCause(err)
if realErr != errInvalidToken {
t.Errorf("Expected to fail with %s but failed with %s", errInvalidToken, realErr)
}
}
func TestStorageRPCTryInitHandler(t *testing.T) {
st := createTestStorageServer(t)
defer removeRoots(st.diskDirs)
defer removeAll(st.configDir)
storageRPC := st.stServer
timestamp := time.Now().UTC()
tryArgs := &GenericArgs{
Token: st.token,
Timestamp: timestamp,
}
tryReply := &GenericReply{}
err := storageRPC.TryInitHandler(tryArgs, tryReply)
if err != nil {
t.Errorf("TryInitHandler failed with %s", err)
}
}
func TestStorageRPCInvalidToken(t *testing.T) {
st := createTestStorageServer(t)
defer removeRoots(st.diskDirs)
defer removeAll(st.configDir)
storageRPC := st.stServer
timestamp := time.Now().UTC()
ga := GenericArgs{
Token: st.token,
Timestamp: timestamp,
}
// Construct an invalid token.
badga := ga
badga.Token = "invalidToken"
// Following test cases are meant to exercise the invalid
// token code path of the storage RPC methods.
var err error
gva := GenericVolArgs{
GenericArgs: badga,
Vol: "myvol",
}
// 1. DiskInfoHandler
diskInfoReply := &disk.Info{}
err = storageRPC.DiskInfoHandler(&badga, diskInfoReply)
errorIfInvalidToken(t, err)
// 2. MakeVolHandler
makeVolArgs := &gva
makeVolReply := &GenericReply{}
err = storageRPC.MakeVolHandler(makeVolArgs, makeVolReply)
errorIfInvalidToken(t, err)
// 3. ListVolsHandler
listVolReply := &ListVolsReply{}
err = storageRPC.ListVolsHandler(&badga, listVolReply)
errorIfInvalidToken(t, err)
// 4. StatVolHandler
statVolReply := &VolInfo{}
statVolArgs := &gva
err = storageRPC.StatVolHandler(statVolArgs, statVolReply)
errorIfInvalidToken(t, err)
// 5. DeleteVolHandler
deleteVolArgs := &gva
deleteVolReply := &GenericReply{}
err = storageRPC.DeleteVolHandler(deleteVolArgs, deleteVolReply)
errorIfInvalidToken(t, err)
// 6. StatFileHandler
statFileArgs := &StatFileArgs{
GenericArgs: badga,
}
statReply := &FileInfo{}
err = storageRPC.StatFileHandler(statFileArgs, statReply)
errorIfInvalidToken(t, err)
// 7. ListDirHandler
listDirArgs := &ListDirArgs{
GenericArgs: badga,
}
listDirReply := &[]string{}
err = storageRPC.ListDirHandler(listDirArgs, listDirReply)
errorIfInvalidToken(t, err)
// 8. ReadAllHandler
readFileArgs := &ReadFileArgs{
GenericArgs: badga,
}
readFileReply := &[]byte{}
err = storageRPC.ReadAllHandler(readFileArgs, readFileReply)
errorIfInvalidToken(t, err)
// 9. ReadFileHandler
err = storageRPC.ReadFileHandler(readFileArgs, readFileReply)
errorIfInvalidToken(t, err)
// 10. PrepareFileHandler
prepFileArgs := &PrepareFileArgs{
GenericArgs: badga,
}
prepFileReply := &GenericReply{}
err = storageRPC.PrepareFileHandler(prepFileArgs, prepFileReply)
errorIfInvalidToken(t, err)
// 11. AppendFileHandler
appendArgs := &AppendFileArgs{
GenericArgs: badga,
}
appendReply := &GenericReply{}
err = storageRPC.AppendFileHandler(appendArgs, appendReply)
errorIfInvalidToken(t, err)
// 12. DeleteFileHandler
delFileArgs := &DeleteFileArgs{
GenericArgs: badga,
}
delFileRely := &GenericReply{}
err = storageRPC.DeleteFileHandler(delFileArgs, delFileRely)
errorIfInvalidToken(t, err)
// 13. RenameFileHandler
renameArgs := &RenameFileArgs{
GenericArgs: badga,
}
renameReply := &GenericReply{}
err = storageRPC.RenameFileHandler(renameArgs, renameReply)
errorIfInvalidToken(t, err)
// 14. TryInitHandler
tryArgs := &badga
tryReply := &GenericReply{}
err = storageRPC.TryInitHandler(tryArgs, tryReply)
errorIfInvalidToken(t, err)
}
Loading…
Cancel
Save