From 628ef081d18ab7276ac5626d88419086822e141c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 13 Jan 2021 09:58:08 -0800 Subject: [PATCH] fix: preserve cache calculated previously while moving from v2 to v3 (#11269) This ensures that all the prometheus monitoring and usage trackers to avoid alerts configured, although we cannot support v1 to v2 here - we can v2 to v3. --- cmd/data-usage-cache.go | 72 ++++-- cmd/data-usage-cache_gen.go | 428 +++++++++++++++++++++++++++++++ cmd/data-usage-cache_gen_test.go | 226 ++++++++++++++++ 3 files changed, 711 insertions(+), 15 deletions(-) diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index d7483ceb2..b3760d6bc 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -56,13 +56,29 @@ type dataUsageEntry struct { Children dataUsageHashMap } -// dataUsageCache contains a cache of data usage entries. +//msgp:tuple dataUsageEntryV2 +type dataUsageEntryV2 struct { + // These fields do no include any children. + Size int64 + Objects uint64 + ObjSizes sizeHistogram + Children dataUsageHashMap +} + +// dataUsageCache contains a cache of data usage entries latest version 3. type dataUsageCache struct { Info dataUsageCacheInfo Disks []string Cache map[string]dataUsageEntry } +// dataUsageCache contains a cache of data usage entries version 2. +type dataUsageCacheV2 struct { + Info dataUsageCacheInfo + Disks []string + Cache map[string]dataUsageEntryV2 +} + //msgp:ignore dataUsageEntryInfo type dataUsageEntryInfo struct { Name string @@ -513,12 +529,16 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) // dataUsageCacheVer indicates the cache version. // Bumping the cache version will drop data from previous versions // and write new data with the new version. -const dataUsageCacheVer = 3 +const ( + dataUsageCacheVerV3 = 3 + dataUsageCacheVerV2 = 2 + dataUsageCacheVerV1 = 1 +) // serialize the contents of the cache. func (d *dataUsageCache) serializeTo(dst io.Writer) error { // Add version and compress. - _, err := dst.Write([]byte{dataUsageCacheVer}) + _, err := dst.Write([]byte{dataUsageCacheVerV3}) if err != nil { return err } @@ -553,21 +573,43 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { return io.ErrUnexpectedEOF } switch b[0] { - case 1, 2: + case dataUsageCacheVerV1: return errors.New("cache version deprecated (will autoupdate)") - case dataUsageCacheVer: - default: - return fmt.Errorf("dataUsageCache: unknown version: %d", int(b[0])) - } + case dataUsageCacheVerV2: + // Zstd compressed. + dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2)) + if err != nil { + return err + } + defer dec.Close() - // Zstd compressed. - dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2)) - if err != nil { - return err - } - defer dec.Close() + dold := &dataUsageCacheV2{} + if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil { + return err + } + d.Info = dold.Info + d.Disks = dold.Disks + d.Cache = make(map[string]dataUsageEntry, len(dold.Cache)) + for k, v := range dold.Cache { + d.Cache[k] = dataUsageEntry{ + Size: v.Size, + Objects: v.Objects, + ObjSizes: v.ObjSizes, + Children: v.Children, + } + } + return nil + case dataUsageCacheVerV3: + // Zstd compressed. + dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2)) + if err != nil { + return err + } + defer dec.Close() - return d.DecodeMsg(msgp.NewReader(dec)) + return d.DecodeMsg(msgp.NewReader(dec)) + } + return fmt.Errorf("dataUsageCache: unknown version: %d", int(b[0])) } // Trim this from start+end of hashes. diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index 8b47da075..2366b1aac 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -484,6 +484,277 @@ func (z *dataUsageCacheInfo) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *dataUsageCacheV2) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + err = z.Info.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Disks": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntryV2, zb0003) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0003 > 0 { + zb0003-- + var za0002 string + var za0003 dataUsageEntryV2 + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0003.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + z.Cache[za0002] = za0003 + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *dataUsageCacheV2) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "Info" + err = en.Append(0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + if err != nil { + return + } + err = z.Info.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // write "Disks" + err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Disks))) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + for za0001 := range z.Disks { + err = en.WriteString(z.Disks[za0001]) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + // write "Cache" + err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Cache))) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + for za0002, za0003 := range z.Cache { + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0003.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *dataUsageCacheV2) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "Info" + o = append(o, 0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + o, err = z.Info.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // string "Disks" + o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Disks))) + for za0001 := range z.Disks { + o = msgp.AppendString(o, z.Disks[za0001]) + } + // string "Cache" + o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) + for za0002, za0003 := range z.Cache { + o = msgp.AppendString(o, za0002) + o, err = za0003.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *dataUsageCacheV2) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + bts, err = z.Info.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Disks": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntryV2, zb0003) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0003 > 0 { + var za0002 string + var za0003 dataUsageEntryV2 + zb0003-- + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + bts, err = za0003.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + z.Cache[za0002] = za0003 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *dataUsageCacheV2) Msgsize() (s int) { + s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Disks { + s += msgp.StringPrefixSize + len(z.Disks[za0001]) + } + s += 6 + msgp.MapHeaderSize + if z.Cache != nil { + for za0002, za0003 := range z.Cache { + _ = za0003 + s += msgp.StringPrefixSize + len(za0002) + za0003.Msgsize() + } + } + return +} + // DecodeMsg implements msgp.Decodable func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 @@ -705,6 +976,163 @@ func (z *dataUsageEntry) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *dataUsageEntryV2) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} + return + } + z.Size, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + z.Objects, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + if zb0002 != uint32(dataUsageBucketLen) { + err = msgp.ArrayError{Wanted: uint32(dataUsageBucketLen), Got: zb0002} + return + } + for za0001 := range z.ObjSizes { + z.ObjSizes[za0001], err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + err = z.Children.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *dataUsageEntryV2) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 4 + err = en.Append(0x94) + if err != nil { + return + } + err = en.WriteInt64(z.Size) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + err = en.WriteUint64(z.Objects) + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + err = en.WriteArrayHeader(uint32(dataUsageBucketLen)) + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + for za0001 := range z.ObjSizes { + err = en.WriteUint64(z.ObjSizes[za0001]) + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + err = z.Children.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *dataUsageEntryV2) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 4 + o = append(o, 0x94) + o = msgp.AppendInt64(o, z.Size) + o = msgp.AppendUint64(o, z.Objects) + o = msgp.AppendArrayHeader(o, uint32(dataUsageBucketLen)) + for za0001 := range z.ObjSizes { + o = msgp.AppendUint64(o, z.ObjSizes[za0001]) + } + o, err = z.Children.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *dataUsageEntryV2) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} + return + } + z.Size, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + z.Objects, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + if zb0002 != uint32(dataUsageBucketLen) { + err = msgp.ArrayError{Wanted: uint32(dataUsageBucketLen), Got: zb0002} + return + } + for za0001 := range z.ObjSizes { + z.ObjSizes[za0001], bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + bts, err = z.Children.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *dataUsageEntryV2) Msgsize() (s int) { + s = 1 + msgp.Int64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.Children.Msgsize() + return +} + // DecodeMsg implements msgp.Decodable func (z *dataUsageHash) DecodeMsg(dc *msgp.Reader) (err error) { { diff --git a/cmd/data-usage-cache_gen_test.go b/cmd/data-usage-cache_gen_test.go index 8922e16fa..51d2173b0 100644 --- a/cmd/data-usage-cache_gen_test.go +++ b/cmd/data-usage-cache_gen_test.go @@ -235,6 +235,119 @@ func BenchmarkDecodedataUsageCacheInfo(b *testing.B) { } } +func TestMarshalUnmarshaldataUsageCacheV2(t *testing.T) { + v := dataUsageCacheV2{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgdataUsageCacheV2(b *testing.B) { + v := dataUsageCacheV2{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgdataUsageCacheV2(b *testing.B) { + v := dataUsageCacheV2{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaldataUsageCacheV2(b *testing.B) { + v := dataUsageCacheV2{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodedataUsageCacheV2(t *testing.T) { + v := dataUsageCacheV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodedataUsageCacheV2 Msgsize() is inaccurate") + } + + vn := dataUsageCacheV2{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodedataUsageCacheV2(b *testing.B) { + v := dataUsageCacheV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodedataUsageCacheV2(b *testing.B) { + v := dataUsageCacheV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshaldataUsageEntry(t *testing.T) { v := dataUsageEntry{} bts, err := v.MarshalMsg(nil) @@ -348,6 +461,119 @@ func BenchmarkDecodedataUsageEntry(b *testing.B) { } } +func TestMarshalUnmarshaldataUsageEntryV2(t *testing.T) { + v := dataUsageEntryV2{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgdataUsageEntryV2(b *testing.B) { + v := dataUsageEntryV2{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgdataUsageEntryV2(b *testing.B) { + v := dataUsageEntryV2{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaldataUsageEntryV2(b *testing.B) { + v := dataUsageEntryV2{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodedataUsageEntryV2(t *testing.T) { + v := dataUsageEntryV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodedataUsageEntryV2 Msgsize() is inaccurate") + } + + vn := dataUsageEntryV2{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodedataUsageEntryV2(b *testing.B) { + v := dataUsageEntryV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodedataUsageEntryV2(b *testing.B) { + v := dataUsageEntryV2{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalsizeHistogram(t *testing.T) { v := sizeHistogram{} bts, err := v.MarshalMsg(nil)