-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Open
Labels
Description
Hi — I'm seeing lower-than-expected throughput/latency for the inference stage when using tbb::flow::graph, and I would like advice from maintainers/experts.
Background / observed behavior
- I have a video-stream model pipeline where inference time is much larger than preprocessing and postprocessing.
- I run inference with the OpenVINO backend on an Intel CPU — all data, including model instances, live on the CPU.
- I suspect the performance issue is caused by data/memory affinity because model instances (from a pool) are effectively being used across different threads, i.e. model instance index is assigned at preprocess time and then the
inferencenode executes on a worker thread that may be different from the one that handled the model earlier.
Questions
-
For a video-stream model inference scenario like mine, is
tbb::flow::graphan appropriate choice? -
If inference performance is below expectations, what are the likely causes and remedies? Specifically:
- Is this likely a problem in my implementation?
- Is it likely a data-affinity (cache / NUMA / thread-locality) issue due to how model instances are assigned and used across threads?
- Or could there be other causes (TBB scheduling, contention, model internals, memory allocation, etc.)?
Any suggestions on how to diagnose and fix this would be appreciated.
My implementation
template <typename ModelHandlerType>
class PipelineNodeTbb
{
public:
using ItemType = typename ModelHandlerType::ItemType;
using ModelType = typename ModelHandlerType::ModelType;
using InputType = std::optional<typename ModelHandlerType::InputType>;
using OutputType = typename ModelHandlerType::OutputType;
using ModelParams = const std::shared_ptr<typename ModelType::Params> &;
using DataPacket = std::tuple<std::shared_ptr<ItemType>, uint64_t>;
using InNode = tbb::flow::multifunction_node<InputType, std::tuple<DataPacket>>;
using ExNode = tbb::flow::function_node<DataPacket, DataPacket>;
using OuNode = tbb::flow::multifunction_node<DataPacket, std::tuple<OutputType>>;
tbb::flow::limiter_node<InputType> &get_input() { return *_limiter; } // Expose node for external connection
OuNode &get_output() { return tbb::flow::output_port<0>(*_postprocess); } // Expose node for external connection
void start() { _stop_requested.store(false, std::memory_order_release); }
void stop()
{
_stop_requested.store(true, std::memory_order_release);
_preprocess->try_put(std::nullopt);
}
explicit PipelineNodeTbb(tbb::flow::graph &g, ModelParams model_cfg,
std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> fun,
int num_thread = 1, int token_capacity = 64)
: _batch_size(std::max<int>(1, model_cfg->batch_size_)), _unpack(std::move(fun))
{
if (!_unpack)
throw std::runtime_error("Invalid unpack function ...");
num_thread = std::min(std::max<uint32_t>(1, num_thread), std::thread::hardware_concurrency()); // NOLINT
token_capacity = std::max(token_capacity, 1);
// Initialize model pool
_models.reserve(num_thread);
for (int i = 0; i < num_thread; ++i)
{
auto model = std::make_unique<ModelType>(model_cfg);
if (!model || !model->initialize() || !model->is_loaded())
throw std::runtime_error("Failed to create model instance.");
_models.emplace_back(std::move(model));
}
// preprocess
_preprocess = std::make_unique<InNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
std::shared_ptr<ItemType> item = nullptr;
if (_stop_requested.load(std::memory_order_acquire))
{
if (_batch_collector) // Directly push incomplete batch to avoid dropping frames
item = std::move(_batch_collector);
}
else
{
if (!inp_.has_value())
return;
if (!_batch_collector)
_batch_collector = std::make_shared<ItemType>();
ModelHandlerType::collect(_batch_collector, *inp_);
if (ModelHandlerType::get_batch_count(_batch_collector) == _batch_size)
item = std::move(_batch_collector);
}
if (!item)
return;
_batch_collector.reset();
const uint64_t index = _model_idx.fetch_add(1, std::memory_order_relaxed) % _models.size();
item->success = ModelHandlerType::preprocess(*_models[index], item);
std::get<0>(outp_).try_put(std::make_tuple(std::move(item), index));
});
// inference
_inference = std::make_unique<ExNode>(g, num_thread, [this](auto &&inp_) { // NOLINT
const auto [item, index] = inp_;
if (item && item->success)
item->success = ModelHandlerType::inference(*_models[index], item); // Only the inference step needs to be bound to a model instance
return inp_;
});
// postprocess
_postprocess = std::make_unique<OuNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
const auto [item, index] = inp_;
if (item && item->success)
item->success = ModelHandlerType::postprocess(*_models[index], item);
_unpack(item, outp_);
_limiter->decrementer().try_put(tbb::flow::continue_msg()); // Notify limiter_node to release one token
});
// make_edge
_limiter = std::make_unique<tbb::flow::limiter_node<InputType>>(g, token_capacity);
tbb::flow::make_edge(*_limiter, *_preprocess);
tbb::flow::make_edge(tbb::flow::output_port<0>(*_preprocess), *_inference);
tbb::flow::make_edge(*_inference, *_postprocess);
}
private:
std::atomic<bool> _stop_requested = false;
int _batch_size = 1;
std::shared_ptr<ItemType> _batch_collector = nullptr;
std::unique_ptr<tbb::flow::limiter_node<InputType>> _limiter = nullptr;
std::unique_ptr<InNode> _preprocess = nullptr;
std::unique_ptr<ExNode> _inference = nullptr;
std::unique_ptr<OuNode> _postprocess = nullptr;
std::atomic<uint64_t> _model_idx = 0; // avoid data races when assigning model index
std::vector<std::unique_ptr<ModelType>> _models; // model pool (length == parallelism)
std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> _unpack = nullptr;
};Thanks in advance for any guidance