| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package eventq |
| |
| import ( |
| "io" |
| "time" |
| ) |
| |
| type EventQueue[T any] struct { |
| events chan<- T |
| subscriberC chan<- eventSubscription[T] |
| shutdownC chan struct{} |
| } |
| |
| type eventSubscription[T any] struct { |
| c chan<- T |
| closeC chan struct{} |
| } |
| |
| func (sub eventSubscription[T]) publish(event T) bool { |
| select { |
| case <-sub.closeC: |
| return false |
| case sub.c <- event: |
| return true |
| } |
| } |
| |
| func (sub eventSubscription[T]) Close() error { |
| select { |
| case <-sub.closeC: |
| default: |
| close(sub.closeC) |
| } |
| return nil |
| } |
| |
| // New provides a queue for sending messages to one or more |
| // subscribers. Messages are held for the given discardAfter duration |
| // if there are no subscribers. |
| func New[T any](discardAfter time.Duration, discardFn func(T)) EventQueue[T] { |
| events := make(chan T) |
| subscriberC := make(chan eventSubscription[T]) |
| shutdownC := make(chan struct{}) |
| |
| go func() { |
| type queuedEvent struct { |
| event T |
| discardAt time.Time |
| } |
| |
| var discardQueue []queuedEvent |
| var discardTime <-chan time.Time |
| var subscribers []eventSubscription[T] |
| for { |
| select { |
| case <-shutdownC: |
| for _, event := range discardQueue { |
| discardFn(event.event) |
| } |
| for _, sub := range subscribers { |
| close(sub.c) |
| } |
| return |
| case event := <-events: |
| if len(subscribers) > 0 { |
| active := subscribers[:0] |
| for _, sub := range subscribers { |
| if sub.publish(event) { |
| active = append(active, sub) |
| } |
| } |
| subscribers = active |
| } |
| if len(subscribers) == 0 { |
| discardQueue = append(discardQueue, queuedEvent{ |
| event: event, |
| discardAt: time.Now().Add(discardAfter), |
| }) |
| if discardTime == nil { |
| discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs()) |
| } |
| } |
| case s := <-subscriberC: |
| var closed bool |
| for i, event := range discardQueue { |
| if !s.publish(event.event) { |
| discardQueue = discardQueue[i:] |
| closed = true |
| break |
| } |
| } |
| if !closed { |
| discardQueue = nil |
| discardTime = nil |
| subscribers = append(subscribers, s) |
| } |
| case t := <-discardTime: |
| toDiscard := discardQueue |
| discardQueue = nil |
| for i, event := range toDiscard { |
| if t.After(event.discardAt) { |
| discardFn(event.event) |
| } else { |
| discardQueue = toDiscard[i:] |
| break |
| } |
| } |
| if len(discardQueue) == 0 { |
| discardTime = nil |
| } else { |
| // Wait until next item in discard queue plus a small buffer to collect a burst of events |
| discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs() + 10*time.Millisecond) |
| } |
| } |
| |
| } |
| }() |
| |
| return EventQueue[T]{ |
| events: events, |
| subscriberC: subscriberC, |
| shutdownC: shutdownC, |
| } |
| } |
| |
| func (eq *EventQueue[T]) Shutdown() { |
| defer close(eq.shutdownC) |
| eq.shutdownC <- struct{}{} |
| } |
| |
| func (eq *EventQueue[T]) Send(event T) { |
| select { |
| case <-eq.shutdownC: |
| case eq.events <- event: |
| } |
| } |
| |
| func (eq *EventQueue[T]) Subscribe() (<-chan T, io.Closer) { |
| c := make(chan T, 100) |
| subscription := eventSubscription[T]{ |
| c: c, |
| closeC: make(chan struct{}), |
| } |
| eq.subscriberC <- subscription |
| |
| return c, subscription |
| } |