@@ -147,7 +147,69 @@ vector<Selectable *> Orch::getSelectables()
147147 return selectables;
148148}
149149
150- void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry)
150+ void Orch::createRetryCache (const std::string &executorName) {
151+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
152+ m_retryCaches[executorName] = std::make_shared<RetryCache>(executorName);
153+ }
154+
155+ RetryCache *Orch::getRetryCache (const std::string &executorName)
156+ {
157+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
158+ return nullptr ;
159+ else
160+ return m_retryCaches[executorName].get ();
161+ }
162+
163+ ConsumerBase* Orch::getConsumerBase (const std::string &executorName)
164+ {
165+ if (m_consumerMap.find (executorName) == m_consumerMap.end ())
166+ return nullptr ;
167+ return dynamic_cast <ConsumerBase*>(m_consumerMap[executorName].get ());
168+ }
169+
170+ void ConsumerBase::addToRetry (const Task &task, const Constraint &cst) {
171+ getOrch ()->getRetryCache (getName ())->cache_failed_task (task, cst);
172+ }
173+
174+ void Orch::addToRetry (const std::string &executorName, const Task &task, const Constraint &cst) {
175+ getRetryCache (executorName)->cache_failed_task (task, cst);
176+ }
177+
178+ size_t Orch::retryToSync (const std::string &executorName, size_t threshold)
179+ {
180+ auto retryCache = getRetryCache (executorName);
181+
182+ if (!retryCache || threshold <= 0 )
183+ return 0 ;
184+
185+ std::unordered_set<Constraint>& constraints = retryCache->getResolvedConstraints ();
186+
187+ size_t count = 0 ;
188+
189+ for (auto it = constraints.begin (); it != constraints.end () && count < threshold;)
190+ {
191+ auto cst = *it++;
192+
193+ auto tasks = retryCache->resolve (cst, threshold - count);
194+
195+ count += tasks->size ();
196+
197+ getConsumerBase (executorName)->addToSync (tasks, true );
198+
199+ }
200+ return count;
201+ }
202+
203+ void Orch::notifyRetry (Orch *retryOrch, const std::string &executorName, const Constraint &cst)
204+ {
205+ retryOrch->getRetryCache (executorName)->add_resolution (cst);
206+ }
207+
208+ size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries, bool onRetry) {
209+ return addToSync (*entries, onRetry);
210+ }
211+
212+ void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry, bool onRetry)
151213{
152214 SWSS_LOG_ENTER ();
153215
@@ -157,6 +219,24 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
157219 /* Record incoming tasks */
158220 Recorder::Instance ().swss .record (dumpTuple (entry));
159221
222+ auto retryCache = getOrch ()->getRetryCache (getName ());
223+
224+ if (retryCache)
225+ {
226+ auto it = retryCache->getRetryMap ().find (key);
227+ if (it != retryCache->getRetryMap ().end ()) // key exists
228+ {
229+ if (it->second .second == entry) // skip duplicate task
230+ return ;
231+
232+ auto cache = retryCache->erase_stale_cache (key);
233+ Recorder::Instance ().retry .record (dumpTuple (*cache).append (DECACHE));
234+
235+ if (op == SET_COMMAND)
236+ m_toSync.emplace (key, std::move (*cache));
237+ }
238+ }
239+
160240 /*
161241 * m_toSync is a multimap which will allow one key with multiple values,
162242 * Also, the order of the key-value pairs whose keys compare equivalent
@@ -230,22 +310,18 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
230310
231311}
232312
233- size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries)
313+ size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries, bool onRetry )
234314{
235315 SWSS_LOG_ENTER ();
236316
237317 for (auto & entry: entries)
238318 {
239- addToSync (entry);
319+ addToSync (entry, onRetry );
240320 }
241321
242322 return entries.size ();
243323}
244324
245- size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
246- return addToSync (*entries);
247- }
248-
249325// TODO: Table should be const
250326size_t ConsumerBase::refillToSync (Table* table)
251327{
0 commit comments