概览

StreamSplitter 是单生产者多消费者的一种实现。它通过在 BufferQueue 之间传递同一个 GraphicBuffer 来实现了无拷贝多消费者。

whiteboard_exported_image-10.png

// StreamSplitter is an autonomous class that managesone input BufferQueue
// andmultiple output BufferQueues. By using the bufferattach and detach logic
// in BufferQueue, it is able to present the illusion of a single split
// BufferQueue, where each buffer queued to the input is available to be
// acquired by each of the outputs, and is able to be dequeued by the input
// again only once all of the outputs have released it.
class StreamSplitter : public BnConsumerListener {
    static status_t createSplitter(const sp<IGraphicBufferConsumer>& inputQueue,
            sp<StreamSplitter>* outSplitter);
    status_t addOutput(const sp<IGraphicBufferProducer>& outputQueue);

    virtual void onFrameAvailable(const BufferItem& item);
    void onBufferReleasedByOutput(const sp<IGraphicBufferProducer>& from);

    class BufferTracker : public LightRefBase<BufferTracker> {
        const sp<GraphicBuffer>& getBuffer() const { return mBuffer; }
        sp<GraphicBuffer> mBuffer; // One instance that holds this native handle
    };

    sp<IGraphicBufferConsumer> mInput;
    Vector<sp<IGraphicBufferProducer> > mOutputs;

    KeyedVector<uint64_t, sp<BufferTracker> > mBuffers;
}

StreamSplitter 管理一个输入 BufferQueue 对应多个输出 BufferQueue,即单生产者多消费者。

createSplitter()

status_t StreamSplitter::createSplitter(
        const sp<IGraphicBufferConsumer>& inputQueue,
        sp<StreamSplitter>* outSplitter) {
    sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
    status_t status = splitter->mInput->consumerConnect(splitter, false);
    if (status == NO_ERROR) {
        splitter->mInput->setConsumerName(String8("StreamSplitter"));
        *outSplitter = splitter;
    }
    return status;
}

创建一个 StreamSplitter 对象,然后调用 BufferQueueConsumer::connect() 连接 BufferQueue 并注册了监听器。在 BufferQueueProducer::queueBuffer() 中,把 BufferItem 插入到 mQueue 之后,回调监听器的 onFrameAvailable()

BufferState

下图是 BufferState 的状态转换图,StreamSplitter 中使用了其中的 attach 和 detach 接口。

Untitled

onFrameAvailable()

void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
    Mutex::Autolock lock(mMutex);
    // If there are too many outstanding buffers, we block until a buffer is
    // released back to the input in onBufferReleased
    while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
        mReleaseCondition.wait(mMutex);
    }
    ++mOutstandingBuffers;

    BufferItem bufferItem;
    status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
    status = mInput->detachBuffer(bufferItem.mSlot);

    mBuffers.add(bufferItem.mGraphicBuffer->getId(),
            new BufferTracker(bufferItem.mGraphicBuffer));

    IGraphicBufferProducer::QueueBufferInput queueInput(
            bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
            bufferItem.mDataSpace, bufferItem.mCrop,
            static_cast<int32_t>(bufferItem.mScalingMode),
            bufferItem.mTransform, bufferItem.mFence);

    // Attach and queue the buffer to each of the outputs
    Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
    for (; output != mOutputs.end(); ++output) {
        int slot;
        status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);

        IGraphicBufferProducer::QueueBufferOutput queueOutput;
        status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
    }
}
  1. 当 input 生产者调用 BufferQueueProducer::queueBuffer() 时回调 onFrameAvailable()