summaryrefslogtreecommitdiff
path: root/libs/bufferstreams/rust/src/subscribers/shared.rs
blob: 46c58dc04a732fc1e25492b8abeb4b4daade465e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module provides [BufferSubscriber] implementations and helpers.

use std::sync::{Arc, Mutex};

use crate::*;

/// A [BufferSubscriber] wrapper that provides shared access.
///
/// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With
/// [SharedSubscriber], a
///
/// # Panics
///
/// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it
/// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to
/// reflect the usual behavior where a publisher takes full ownership of a subscriber.
pub struct SharedSubscriber<S: BufferSubscriber>(Arc<Mutex<SharedSubscriberInner<S>>>);

struct SharedSubscriberInner<S: BufferSubscriber> {
    subscriber: S,
    is_subscribed: bool,
}

impl<S: BufferSubscriber> SharedSubscriber<S> {
    /// Create a new wrapper around a [BufferSubscriber].
    pub fn new(subscriber: S) -> Self {
        Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false })))
    }

    /// Provides access to an immutable reference to the wrapped [BufferSubscriber].
    pub fn map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
        let inner = self.0.lock().unwrap();
        f(&inner.subscriber)
    }

    /// Provides access to a mutable reference to the wrapped [BufferSubscriber].
    pub fn map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R {
        let mut inner = self.0.lock().unwrap();
        f(&mut inner.subscriber)
    }
}

impl<S: BufferSubscriber> Clone for SharedSubscriber<S> {
    fn clone(&self) -> Self {
        Self(Arc::clone(&self.0))
    }
}

impl<S: BufferSubscriber> BufferSubscriber for SharedSubscriber<S> {
    fn get_subscriber_stream_config(&self) -> StreamConfig {
        let inner = self.0.lock().unwrap();
        inner.subscriber.get_subscriber_stream_config()
    }

    fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) {
        let mut inner = self.0.lock().unwrap();
        assert!(
            !inner.is_subscribed,
            "A SharedSubscriber can not be shared between two BufferPublishers"
        );
        inner.is_subscribed = true;

        inner.subscriber.on_subscribe(subscription);
    }

    fn on_next(&mut self, frame: Frame) {
        let mut inner = self.0.lock().unwrap();
        inner.subscriber.on_next(frame);
    }

    fn on_error(&mut self, error: BufferError) {
        let mut inner = self.0.lock().unwrap();
        inner.subscriber.on_error(error);
    }

    fn on_complete(&mut self) {
        let mut inner = self.0.lock().unwrap();
        inner.subscriber.on_complete();
    }
}