Merge pull request #107 from harshavardhana/pr_out_storage_erasure_encoding_info_at_storageentry_and_also_store_crc_at_blockheader

master
Harshavardhana 10 years ago
commit e107c3204d
  1. 34
      pkgs/erasure/encode.go
  2. 5
      pkgs/storage/appendstorage/append_storage.go
  3. 17
      pkgs/storage/encodedstorage/encoded_storage.go

@ -43,9 +43,9 @@ const (
// EncoderParams is a configuration set for building an encoder. It is created using ValidateParams. // EncoderParams is a configuration set for building an encoder. It is created using ValidateParams.
type EncoderParams struct { type EncoderParams struct {
k, K,
m, M,
technique int // cauchy or vandermonde matrix (RS) Technique int // cauchy or vandermonde matrix (RS)
} }
// Encoder is an object used to encode and decode data. // Encoder is an object used to encode and decode data.
@ -87,21 +87,21 @@ func ParseEncoderParams(k, m, technique int) (*EncoderParams, error) {
} }
return &EncoderParams{ return &EncoderParams{
k: k, K: k,
m: m, M: m,
technique: technique, Technique: technique,
}, nil }, nil
} }
// NewEncoder creates an encoder with a given set of parameters. // NewEncoder creates an encoder with a given set of parameters.
func NewEncoder(ep *EncoderParams) *Encoder { func NewEncoder(ep *EncoderParams) *Encoder {
var k = C.int(ep.k) var k = C.int(ep.K)
var m = C.int(ep.m) var m = C.int(ep.M)
var encode_matrix *C.uint8_t var encode_matrix *C.uint8_t
var encode_tbls *C.uint8_t var encode_tbls *C.uint8_t
C.minio_init_encoder(C.int(ep.technique), k, m, &encode_matrix, C.minio_init_encoder(C.int(ep.Technique), k, m, &encode_matrix,
&encode_tbls) &encode_tbls)
return &Encoder{ return &Encoder{
@ -122,7 +122,7 @@ func (e *Encoder) Encode(block []byte) ([][]byte, int) {
var block_len = len(block) var block_len = len(block)
chunk_size := int(C.minio_calc_chunk_size(e.k, C.uint32_t(block_len))) chunk_size := int(C.minio_calc_chunk_size(e.k, C.uint32_t(block_len)))
chunk_len := chunk_size * e.p.k chunk_len := chunk_size * e.p.K
pad_len := chunk_len - block_len pad_len := chunk_len - block_len
if pad_len > 0 { if pad_len > 0 {
@ -131,28 +131,28 @@ func (e *Encoder) Encode(block []byte) ([][]byte, int) {
block = append(block, s...) block = append(block, s...)
} }
coded_len := chunk_size * e.p.m coded_len := chunk_size * e.p.M
c := make([]byte, coded_len) c := make([]byte, coded_len)
block = append(block, c...) block = append(block, c...)
// Allocate chunks // Allocate chunks
chunks := make([][]byte, e.p.k+e.p.m) chunks := make([][]byte, e.p.K+e.p.M)
pointers := make([]*byte, e.p.k+e.p.m) pointers := make([]*byte, e.p.K+e.p.M)
var i int var i int
// Add data blocks to chunks // Add data blocks to chunks
for i = 0; i < e.p.k; i++ { for i = 0; i < e.p.K; i++ {
chunks[i] = block[i*chunk_size : (i+1)*chunk_size] chunks[i] = block[i*chunk_size : (i+1)*chunk_size]
pointers[i] = &chunks[i][0] pointers[i] = &chunks[i][0]
} }
for i = e.p.k; i < (e.p.k + e.p.m); i++ { for i = e.p.K; i < (e.p.K + e.p.M); i++ {
chunks[i] = make([]byte, chunk_size) chunks[i] = make([]byte, chunk_size)
pointers[i] = &chunks[i][0] pointers[i] = &chunks[i][0]
} }
data := (**C.uint8_t)(unsafe.Pointer(&pointers[:e.p.k][0])) data := (**C.uint8_t)(unsafe.Pointer(&pointers[:e.p.K][0]))
coding := (**C.uint8_t)(unsafe.Pointer(&pointers[e.p.k:][0])) coding := (**C.uint8_t)(unsafe.Pointer(&pointers[e.p.K:][0]))
C.ec_encode_data(C.int(chunk_size), e.k, e.m, e.encode_tbls, data, C.ec_encode_data(C.int(chunk_size), e.k, e.m, e.encode_tbls, data,
coding) coding)

@ -10,6 +10,7 @@ import (
"path" "path"
"strconv" "strconv"
"github.com/minio-io/minio/pkgs/checksum/crc32c"
"github.com/minio-io/minio/pkgs/storage" "github.com/minio-io/minio/pkgs/storage"
) )
@ -96,6 +97,10 @@ func (aStorage *appendStorage) Put(objectPath string, object io.Reader) error {
} }
header.Offset = offset header.Offset = offset
header.Length = len(objectBytes) header.Length = len(objectBytes)
header.Crc, err = crc32c.Crc32c(objectBytes)
if err != nil {
return err
}
aStorage.objects[objectPath] = header aStorage.objects[objectPath] = header
var mapBuffer bytes.Buffer var mapBuffer bytes.Buffer
encoder := gob.NewEncoder(&mapBuffer) encoder := gob.NewEncoder(&mapBuffer)

@ -28,9 +28,8 @@ type encodedStorage struct {
type StorageEntry struct { type StorageEntry struct {
Path string Path string
Md5sum string Md5sum string
Crc uint32
Blocks []StorageBlockEntry Blocks []StorageBlockEntry
Encoderparams *erasure.EncoderParams Encoderparams erasure.EncoderParams
} }
type StorageBlockEntry struct { type StorageBlockEntry struct {
@ -111,9 +110,12 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
entry := StorageEntry{ entry := StorageEntry{
Path: objectPath, Path: objectPath,
Md5sum: "md5sum", Md5sum: "md5sum",
Crc: 0,
Blocks: make([]StorageBlockEntry, 0), Blocks: make([]StorageBlockEntry, 0),
Encoderparams: encoderParameters, Encoderparams: erasure.EncoderParams{
K: eStorage.K,
M: eStorage.M,
Technique: erasure.CAUCHY,
},
} }
i := 0 i := 0
// encode // encode
@ -163,7 +165,12 @@ func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []erro
} }
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) {
encoder := erasure.NewEncoder(entry.Encoderparams) ep, err := erasure.ParseEncoderParams(entry.Encoderparams.K, entry.Encoderparams.M, entry.Encoderparams.Technique)
if err != nil {
writer.CloseWithError(err)
return
}
encoder := erasure.NewEncoder(ep)
for i, chunk := range entry.Blocks { for i, chunk := range entry.Blocks {
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i))
var blocks [][]byte var blocks [][]byte

Loading…
Cancel
Save