Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification filtering #278

Merged
merged 10 commits into from
May 27, 2019
Merged

Notification filtering #278

merged 10 commits into from
May 27, 2019

Conversation

georgifarashev
Copy link
Contributor

Notification filtering

Motivation

Some notifications might not be necessary for some platforms.
For example if a broker must not be registered in a specific platform then a filter can be introduced on /v1/service_brokers which filters the broker. But the creation of the broker will create a notification for all platforms. A way of filtering this notification must be introduced.

Approach

Introduce notification filters - functions which filter the recipients of a notification.

Pull Request status

  • Implementation
  • Tests

@coveralls
Copy link

coveralls commented May 17, 2019

Coverage Status

Coverage decreased (-0.05%) to 88.735% when pulling 2380091 on notification-filtering into 098d2e6 on master.

Copy link
Member

@dpanayotov dpanayotov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very confusing implementation that doesn't make sense without a lot of additional comments. I'd try a different approach to filtering because this implementation will be unmaintainable in a month. Maybe inverse the logic.

@@ -58,9 +48,18 @@ func (c *Controller) handleWS(req *web.Request) (*web.Response, error) {
return util.NewJSONResponse(http.StatusGone, nil)
}

notificationQueue, lastKnownRevision, notificationsList, err := c.registerConsumer(ctx, revisionKnownToProxy, user)
user, ok := web.UserFromContext(req.Context())
if !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move the user check down here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not used above. I have moved it to the place where it is needed for a first time.

@@ -100,6 +100,9 @@ var (

// ErrConcurrentResourceModification error returned when concurrent resource updates are happening
ErrConcurrentResourceModification = errors.New("another resource update happened concurrently. Please reattempt the update")

// ErrInvalidNotificationRevision provided notification revision is not valid, must return http status GONE
ErrInvalidNotificationRevision = errors.New("notification revision is not valid")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - I'd rather remove all these util.Err* because they are not util errors. One option is to move all errors and their handling in an errors package or move every error in its respective package (e.g. storage.ErrConcurrentModification, notifications.ErrInvalidRevision)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue for that - #281 as it could be done in a different PR.

@@ -30,6 +36,10 @@ import (
type notificationStorage interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that's not part of this PR, but this smells of bad design... a private interface for the Postgres storage only to make the tests work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This interface will be removed when the storage supports sorting by a specific property and limit of returned results (in sql - order by and limit).

}

// NotificationFilterFunc filters recipients for a given notifications
type NotificationFilterFunc func(recipients []*types.Platform, notification *types.Notification) (filteredRecipients []*types.Platform)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe ReceiversFilterFunc

}

func (n *Notificator) replaceQueueWithMissingNotificationsQueue(queue storage.NotificationQueue, lastKnownRevision, lastKnownRevisionToSM int64, platform *types.Platform) (storage.NotificationQueue, error) {
if lastKnownRevision > lastKnownRevisionToSM {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this check outside as it's unrelated to this function

@@ -35,6 +35,9 @@ const (

// DELETED represents a notification type for deleting a resource
DELETED OperationType = "DELETED"

// INVALIDREVISION revision with invalid value
INVALIDREVISION int64 = -1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strange for this to be in the types package. I'd rather have the previous state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in the storage/postgres and api/notifications. It will be strange for any of them to import the other. It cannot be in storage as well as postgres must not import storage. This seemed the most appropriate as InvalidRevision is some value of a property of notification similar to the operation types (CREATED, DELETED ...).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be InvalidNotificationRevision then, because types.InvalidRevision doesn't mean much

}

if n.queueSize < len(filteredMissedNotification) {
log.C(n.ctx).Debugf("too many missed notifications %d", len(filteredMissedNotification))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? it should just queue them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if it can't queue them (queues have limited size) the proxy will do a full resync.

return nil, err
}
filteredMissedNotification := make([]*types.Notification, 0, len(missedNotifications))
var recipients []*types.Platform
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this row

filteredMissedNotification := make([]*types.Notification, 0, len(missedNotifications))
var recipients []*types.Platform
for _, notification := range missedNotifications {
recipients = n.filterRecipients([]*types.Platform{platform}, notification)
Copy link
Member

@dpanayotov dpanayotov May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very weird... it says "filter recepients" that uses "notifications filter" but filters platforms and after that this filtering doesn't use the returned result. The problem is that you're using the same function to do 2 different things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully with the rename of NotificationFilterFunc to ReceiversFilterFunc it might be more clear. The idea is to filter given recipients for a specific notification by returning the filtered recipients. This way the one who is writing the filter can process the notification once for all recipients. filterRecipients function iterates over all registered filter functions and applies them to the initial recipients and given notification.

func (n *Notificator) filterRecipients(recipients []*types.Platform, notification *types.Notification) []*types.Platform {
for _, filter := range n.notificationFilters {
recipients = filter(recipients, notification)
if len(recipients) == 0 {
Copy link
Member

@dpanayotov dpanayotov May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't understand why this works this way - filter returns an empty slice if any filter decides that no platform should receive it and the same list if all filters decide that at least one platform should receive it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an optimisation - if a filter in the chain of filters returns 0 recipients then it is not necessary to call the other filters as there are no platforms to filter. See also comment above.

@@ -24,22 +22,14 @@ const (
LastKnownRevisionHeader = "last_notification_revision"
LastKnownRevisionQueryParam = "last_notification_revision"

unknownRevision int64 = -1
noRevision int64 = 0
noRevision int64 = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like this constant is not needed.
If the revision is Unknown the flow is the same.

In the producer: https://github.com/Peripli/service-broker-proxy/blob/master/pkg/sbproxy/notifications/producer.go#L43
The value for InvalidRevision is returned when it is unknown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SM sends 0 as last notification when there are no notifications in the database. This 0 is received by the proxy and if its connection drops it should resend it as last known revision. In this case SM sends 410 (see line 47 and 79). Idea to improve this is not to send the 0 at all (both by the SM and proxy). But this requires changes in the proxy as well. It should be done in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -69,3 +79,38 @@ func (ns *notificationStorageImpl) GetNotification(ctx context.Context, id strin
}
return notificationObj.(*types.Notification), nil
}

func (ns *notificationStorageImpl) ListNotifications(ctx context.Context, platformID string, from, to int64) ([]*types.Notification, error) {
// TODO: replace with less than or equal operator when available
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid TODOs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They were from a previous PR. I will remove them.

return nil, err
}
notificationsList := objectList.(*types.Notifications)
// TODO: Should be done in the database with order by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same - create a ticket/issues for the desired/missing functionalities and clean the TODOs.

// When consumer wants to stop listening for notifications it must unregister the notification queue.
RegisterConsumer(userContext *web.UserContext) (NotificationQueue, int64, error)
RegisterConsumer(platform *types.Platform, lastKnownRevision int64) (NotificationQueue, int64, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - platform should be consumer

@dzahariev dzahariev dismissed dpanayotov’s stale review May 27, 2019 08:24

Will be addressed in another PR

@dzahariev dzahariev merged commit b405502 into master May 27, 2019
@dzahariev dzahariev deleted the notification-filtering branch May 27, 2019 08:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants