StreamSplitter 是单生产者多消费者的一种实现。它通过在 BufferQueue 之间传递同一个 GraphicBuffer 来实现了无拷贝多消费者。
// StreamSplitter is an autonomous class that managesone input BufferQueue
// andmultiple output BufferQueues. By using the bufferattach and detach logic
// in BufferQueue, it is able to present the illusion of a single split
// BufferQueue, where each buffer queued to the input is available to be
// acquired by each of the outputs, and is able to be dequeued by the input
// again only once all of the outputs have released it.
class StreamSplitter : public BnConsumerListener {
static status_t createSplitter(const sp<IGraphicBufferConsumer>& inputQueue,
sp<StreamSplitter>* outSplitter);
status_t addOutput(const sp<IGraphicBufferProducer>& outputQueue);
virtual void onFrameAvailable(const BufferItem& item);
void onBufferReleasedByOutput(const sp<IGraphicBufferProducer>& from);
class BufferTracker : public LightRefBase<BufferTracker> {
const sp<GraphicBuffer>& getBuffer() const { return mBuffer; }
sp<GraphicBuffer> mBuffer; // One instance that holds this native handle
};
sp<IGraphicBufferConsumer> mInput;
Vector<sp<IGraphicBufferProducer> > mOutputs;
KeyedVector<uint64_t, sp<BufferTracker> > mBuffers;
}
StreamSplitter 管理一个输入 BufferQueue 对应多个输出 BufferQueue,即单生产者多消费者。
createSplitter()
接收一个 IGraphicBufferConsumer 作为 buffer 输入,然后创建一个 StreamSplitter 对象。addOutput()
函数接受 IGraphicBufferProducer 作为输出 buffer 输出。onFrameAvailable()
回调时分发 buffer。onFrameAvailable()
加入到 mBuffers
,在 onBufferReleasedByOutput()
移除。status_t StreamSplitter::createSplitter(
const sp<IGraphicBufferConsumer>& inputQueue,
sp<StreamSplitter>* outSplitter) {
sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
status_t status = splitter->mInput->consumerConnect(splitter, false);
if (status == NO_ERROR) {
splitter->mInput->setConsumerName(String8("StreamSplitter"));
*outSplitter = splitter;
}
return status;
}
创建一个 StreamSplitter 对象,然后调用 BufferQueueConsumer::connect() 连接 BufferQueue 并注册了监听器。在 BufferQueueProducer::queueBuffer() 中,把 BufferItem 插入到 mQueue
之后,回调监听器的 onFrameAvailable()
。
下图是 BufferState 的状态转换图,StreamSplitter 中使用了其中的 attach 和 detach 接口。
void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
Mutex::Autolock lock(mMutex);
// If there are too many outstanding buffers, we block until a buffer is
// released back to the input in onBufferReleased
while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
mReleaseCondition.wait(mMutex);
}
++mOutstandingBuffers;
BufferItem bufferItem;
status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
status = mInput->detachBuffer(bufferItem.mSlot);
mBuffers.add(bufferItem.mGraphicBuffer->getId(),
new BufferTracker(bufferItem.mGraphicBuffer));
IGraphicBufferProducer::QueueBufferInput queueInput(
bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
bufferItem.mDataSpace, bufferItem.mCrop,
static_cast<int32_t>(bufferItem.mScalingMode),
bufferItem.mTransform, bufferItem.mFence);
// Attach and queue the buffer to each of the outputs
Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
for (; output != mOutputs.end(); ++output) {
int slot;
status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);
IGraphicBufferProducer::QueueBufferOutput queueOutput;
status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
}
}
onFrameAvailable()
。