|
|
@ -41,13 +41,11 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) { |
|
|
|
b.h.Reset() |
|
|
|
b.h.Reset() |
|
|
|
b.h.Write(p) |
|
|
|
b.h.Write(p) |
|
|
|
hashBytes := b.h.Sum(nil) |
|
|
|
hashBytes := b.h.Sum(nil) |
|
|
|
n, err := b.iow.Write(hashBytes) |
|
|
|
_, err := b.iow.Write(hashBytes) |
|
|
|
if n != len(hashBytes) { |
|
|
|
if err != nil { |
|
|
|
logger.LogIf(context.Background(), err) |
|
|
|
|
|
|
|
return 0, err |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
} |
|
|
|
n, err = b.iow.Write(p) |
|
|
|
return b.iow.Write(p) |
|
|
|
return n, err |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (b *streamingBitrotWriter) Close() error { |
|
|
|
func (b *streamingBitrotWriter) Close() error { |
|
|
@ -75,11 +73,6 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i |
|
|
|
totalFileSize = bitrotSumsTotalSize + length |
|
|
|
totalFileSize = bitrotSumsTotalSize + length |
|
|
|
} |
|
|
|
} |
|
|
|
err := disk.CreateFile(volume, filePath, totalFileSize, r) |
|
|
|
err := disk.CreateFile(volume, filePath, totalFileSize, r) |
|
|
|
if err != nil { |
|
|
|
|
|
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", disk.String()) |
|
|
|
|
|
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo) |
|
|
|
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
r.CloseWithError(err) |
|
|
|
r.CloseWithError(err) |
|
|
|
close(bw.canClose) |
|
|
|
close(bw.canClose) |
|
|
|
}() |
|
|
|
}() |
|
|
@ -110,7 +103,7 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { |
|
|
|
var err error |
|
|
|
var err error |
|
|
|
if offset%b.shardSize != 0 { |
|
|
|
if offset%b.shardSize != 0 { |
|
|
|
// Offset should always be aligned to b.shardSize
|
|
|
|
// Offset should always be aligned to b.shardSize
|
|
|
|
logger.LogIf(context.Background(), errUnexpected) |
|
|
|
// Can never happen unless there are programmer bugs
|
|
|
|
return 0, errUnexpected |
|
|
|
return 0, errUnexpected |
|
|
|
} |
|
|
|
} |
|
|
|
if b.rc == nil { |
|
|
|
if b.rc == nil { |
|
|
@ -119,25 +112,20 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { |
|
|
|
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset |
|
|
|
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset |
|
|
|
b.rc, err = b.disk.ReadFileStream(b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) |
|
|
|
b.rc, err = b.disk.ReadFileStream(b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", b.disk.String()) |
|
|
|
|
|
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo) |
|
|
|
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
|
|
|
return 0, err |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if offset != b.currOffset { |
|
|
|
if offset != b.currOffset { |
|
|
|
logger.LogIf(context.Background(), errUnexpected) |
|
|
|
// Can never happen unless there are programmer bugs
|
|
|
|
return 0, errUnexpected |
|
|
|
return 0, errUnexpected |
|
|
|
} |
|
|
|
} |
|
|
|
b.h.Reset() |
|
|
|
b.h.Reset() |
|
|
|
_, err = io.ReadFull(b.rc, b.hashBytes) |
|
|
|
_, err = io.ReadFull(b.rc, b.hashBytes) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
logger.LogIf(context.Background(), err) |
|
|
|
|
|
|
|
return 0, err |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
} |
|
|
|
_, err = io.ReadFull(b.rc, buf) |
|
|
|
_, err = io.ReadFull(b.rc, buf) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
logger.LogIf(context.Background(), err) |
|
|
|
|
|
|
|
return 0, err |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
} |
|
|
|
b.h.Write(buf) |
|
|
|
b.h.Write(buf) |
|
|
|