Skip to content

Question: low inference performance with tbb::flow::graph — possible data-affinity issue #1911

@xxf1ow

Description

@xxf1ow

Hi — I'm seeing lower-than-expected throughput/latency for the inference stage when using tbb::flow::graph, and I would like advice from maintainers/experts.

Background / observed behavior

  • I have a video-stream model pipeline where inference time is much larger than preprocessing and postprocessing.
  • I run inference with the OpenVINO backend on an Intel CPU — all data, including model instances, live on the CPU.
  • I suspect the performance issue is caused by data/memory affinity because model instances (from a pool) are effectively being used across different threads, i.e. model instance index is assigned at preprocess time and then the inference node executes on a worker thread that may be different from the one that handled the model earlier.

Questions

  1. For a video-stream model inference scenario like mine, is tbb::flow::graph an appropriate choice?

  2. If inference performance is below expectations, what are the likely causes and remedies? Specifically:

    • Is this likely a problem in my implementation?
    • Is it likely a data-affinity (cache / NUMA / thread-locality) issue due to how model instances are assigned and used across threads?
    • Or could there be other causes (TBB scheduling, contention, model internals, memory allocation, etc.)?
      Any suggestions on how to diagnose and fix this would be appreciated.

My implementation

template <typename ModelHandlerType>
class PipelineNodeTbb
{
public:
    using ItemType = typename ModelHandlerType::ItemType;
    using ModelType = typename ModelHandlerType::ModelType;
    using InputType = std::optional<typename ModelHandlerType::InputType>;
    using OutputType = typename ModelHandlerType::OutputType;
    using ModelParams = const std::shared_ptr<typename ModelType::Params> &;
    using DataPacket = std::tuple<std::shared_ptr<ItemType>, uint64_t>;
    using InNode = tbb::flow::multifunction_node<InputType, std::tuple<DataPacket>>;
    using ExNode = tbb::flow::function_node<DataPacket, DataPacket>;
    using OuNode = tbb::flow::multifunction_node<DataPacket, std::tuple<OutputType>>;

    tbb::flow::limiter_node<InputType> &get_input() { return *_limiter; } // Expose node for external connection

    OuNode &get_output() { return tbb::flow::output_port<0>(*_postprocess); } // Expose node for external connection

    void start() { _stop_requested.store(false, std::memory_order_release); }

    void stop()
    {
        _stop_requested.store(true, std::memory_order_release);
        _preprocess->try_put(std::nullopt);
    }


    explicit PipelineNodeTbb(tbb::flow::graph &g, ModelParams model_cfg,
                             std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> fun,
                             int num_thread = 1, int token_capacity = 64)
        : _batch_size(std::max<int>(1, model_cfg->batch_size_)), _unpack(std::move(fun))
    {
        if (!_unpack)
            throw std::runtime_error("Invalid unpack function ...");
        num_thread = std::min(std::max<uint32_t>(1, num_thread), std::thread::hardware_concurrency()); // NOLINT
        token_capacity = std::max(token_capacity, 1);
        // Initialize model pool
        _models.reserve(num_thread);
        for (int i = 0; i < num_thread; ++i)
        {
            auto model = std::make_unique<ModelType>(model_cfg);
            if (!model || !model->initialize() || !model->is_loaded())
                throw std::runtime_error("Failed to create model instance.");
            _models.emplace_back(std::move(model));
        }
        // preprocess
        _preprocess = std::make_unique<InNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            std::shared_ptr<ItemType> item = nullptr;
            if (_stop_requested.load(std::memory_order_acquire))
            {
                if (_batch_collector) // Directly push incomplete batch to avoid dropping frames
                    item = std::move(_batch_collector);
            }
            else
            {
                if (!inp_.has_value())
                    return;
                if (!_batch_collector)
                    _batch_collector = std::make_shared<ItemType>();
                ModelHandlerType::collect(_batch_collector, *inp_);
                if (ModelHandlerType::get_batch_count(_batch_collector) == _batch_size)
                    item = std::move(_batch_collector);
            }
            if (!item)
                return;
            _batch_collector.reset();
            const uint64_t index = _model_idx.fetch_add(1, std::memory_order_relaxed) % _models.size();
            item->success = ModelHandlerType::preprocess(*_models[index], item);
            std::get<0>(outp_).try_put(std::make_tuple(std::move(item), index));
        });
        // inference
        _inference = std::make_unique<ExNode>(g, num_thread, [this](auto &&inp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::inference(*_models[index], item); // Only the inference step needs to be bound to a model instance
            return inp_;
        });
        // postprocess
        _postprocess = std::make_unique<OuNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::postprocess(*_models[index], item);
            _unpack(item, outp_);
            _limiter->decrementer().try_put(tbb::flow::continue_msg()); // Notify limiter_node to release one token
        });
        // make_edge
        _limiter = std::make_unique<tbb::flow::limiter_node<InputType>>(g, token_capacity);
        tbb::flow::make_edge(*_limiter, *_preprocess);
        tbb::flow::make_edge(tbb::flow::output_port<0>(*_preprocess), *_inference);
        tbb::flow::make_edge(*_inference, *_postprocess);
    }

private:
    std::atomic<bool> _stop_requested = false;
    int _batch_size = 1;
    std::shared_ptr<ItemType> _batch_collector = nullptr;
    std::unique_ptr<tbb::flow::limiter_node<InputType>> _limiter = nullptr;
    std::unique_ptr<InNode> _preprocess = nullptr;
    std::unique_ptr<ExNode> _inference = nullptr;
    std::unique_ptr<OuNode> _postprocess = nullptr;
    std::atomic<uint64_t> _model_idx = 0;            // avoid data races when assigning model index
    std::vector<std::unique_ptr<ModelType>> _models; // model pool (length == parallelism)
    std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> _unpack = nullptr;
};

Thanks in advance for any guidance

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions