|
|
|
@ -18,11 +18,14 @@ package cmd |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"encoding/json" |
|
|
|
|
"encoding/xml" |
|
|
|
|
"errors" |
|
|
|
|
"io" |
|
|
|
|
"net/http" |
|
|
|
|
"net/url" |
|
|
|
|
"path" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/gorilla/mux" |
|
|
|
|
xhttp "github.com/minio/minio/cmd/http" |
|
|
|
@ -171,6 +174,144 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, |
|
|
|
|
writeSuccessResponseHeadersOnly(w) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
ctx := newContext(r, w, "ListenBucketNotificationV2") |
|
|
|
|
|
|
|
|
|
defer logger.AuditLog(w, r, "ListenBucketNotificationV2", mustGetClaimsFromToken(r)) |
|
|
|
|
|
|
|
|
|
// Validate if bucket exists.
|
|
|
|
|
objAPI := api.ObjectAPI() |
|
|
|
|
if objAPI == nil { |
|
|
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !objAPI.IsNotificationSupported() { |
|
|
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !objAPI.IsListenBucketSupported() { |
|
|
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
vars := mux.Vars(r) |
|
|
|
|
bucketName := vars["bucket"] |
|
|
|
|
|
|
|
|
|
values := r.URL.Query() |
|
|
|
|
|
|
|
|
|
var prefix string |
|
|
|
|
if len(values["prefix"]) > 1 { |
|
|
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNamePrefix), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(values["prefix"]) == 1 { |
|
|
|
|
if err := event.ValidateFilterRuleValue(values["prefix"][0]); err != nil { |
|
|
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
prefix = values["prefix"][0] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var suffix string |
|
|
|
|
if len(values["suffix"]) > 1 { |
|
|
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNameSuffix), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(values["suffix"]) == 1 { |
|
|
|
|
if err := event.ValidateFilterRuleValue(values["suffix"][0]); err != nil { |
|
|
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
suffix = values["suffix"][0] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pattern := event.NewPattern(prefix, suffix) |
|
|
|
|
|
|
|
|
|
eventNames := []event.Name{} |
|
|
|
|
for _, s := range values["events"] { |
|
|
|
|
eventName, err := event.ParseName(s) |
|
|
|
|
if err != nil { |
|
|
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
eventNames = append(eventNames, eventName) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if _, err := objAPI.GetBucketInfo(ctx, bucketName); err != nil { |
|
|
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
host, err := xnet.ParseHost(r.RemoteAddr) |
|
|
|
|
if err != nil { |
|
|
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rulesMap := event.NewRulesMap(eventNames, pattern, |
|
|
|
|
event.TargetID{ID: "listen" + "+" + mustGetUUID() + "+" + host.Name, Name: host.Port.String()}) |
|
|
|
|
|
|
|
|
|
w.Header().Set(xhttp.ContentType, "text/event-stream") |
|
|
|
|
|
|
|
|
|
doneCh := make(chan struct{}) |
|
|
|
|
defer close(doneCh) |
|
|
|
|
|
|
|
|
|
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
|
|
|
|
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
|
|
|
|
listenCh := make(chan interface{}, 4000) |
|
|
|
|
|
|
|
|
|
peers := getRestClients(globalEndpoints) |
|
|
|
|
|
|
|
|
|
globalHTTPListen.Subscribe(listenCh, doneCh, func(evI interface{}) bool { |
|
|
|
|
ev, ok := evI.(event.Event) |
|
|
|
|
if !ok { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
objectName, uerr := url.QueryUnescape(ev.S3.Object.Key) |
|
|
|
|
if uerr != nil { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0 |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
for _, peer := range peers { |
|
|
|
|
if peer == nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
peer.Listen(listenCh, doneCh) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
keepAliveTicker := time.NewTicker(500 * time.Millisecond) |
|
|
|
|
defer keepAliveTicker.Stop() |
|
|
|
|
|
|
|
|
|
enc := json.NewEncoder(w) |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case evI := <-listenCh: |
|
|
|
|
ev := evI.(event.Event) |
|
|
|
|
if err := enc.Encode(struct{ Records []event.Event }{[]event.Event{ev}}); err != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
w.(http.Flusher).Flush() |
|
|
|
|
case <-keepAliveTicker.C: |
|
|
|
|
if _, err := w.Write([]byte(" ")); err != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
w.(http.Flusher).Flush() |
|
|
|
|
case <-GlobalServiceDoneCh: |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListenBucketNotificationHandler - This HTTP handler sends events to the connected HTTP client.
|
|
|
|
|
// Client should send prefix/suffix object name to match and events to watch as query parameters.
|
|
|
|
|
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|