@@ -147,7 +147,71 @@ vector<Selectable *> Orch::getSelectables()
147147 return selectables;
148148}
149149
150- void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry)
150+ void Orch::createRetryCache (const std::string &executorName) {
151+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
152+ m_retryCaches[executorName] = std::make_shared<RetryCache>(executorName);
153+ }
154+
155+ RetryCache *Orch::getRetryCache (const std::string &executorName)
156+ {
157+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
158+ return nullptr ;
159+ else
160+ return m_retryCaches[executorName].get ();
161+ }
162+
163+ ConsumerBase* Orch::getConsumerBase (const std::string &executorName)
164+ {
165+ if (m_consumerMap.find (executorName) == m_consumerMap.end ())
166+ return nullptr ;
167+ return dynamic_cast <ConsumerBase*>(m_consumerMap[executorName].get ());
168+ }
169+
170+ void ConsumerBase::addToRetry (const Task &task, const Constraint &cst) {
171+ Recorder::Instance ().retry .record (dumpTuple (task).append (CACHE));
172+ if (getOrch ())
173+ getOrch ()->getRetryCache (getName ())->cache_failed_task (task, cst);
174+ }
175+
176+ void Orch::addToRetry (const std::string &executorName, const Task &task, const Constraint &cst) {
177+ getRetryCache (executorName)->cache_failed_task (task, cst);
178+ }
179+
180+ size_t Orch::retryToSync (const std::string &executorName, size_t threshold)
181+ {
182+ auto retryCache = getRetryCache (executorName);
183+
184+ if (!retryCache || threshold <= 0 )
185+ return 0 ;
186+
187+ std::unordered_set<Constraint>& constraints = retryCache->getResolvedConstraints ();
188+
189+ size_t count = 0 ;
190+
191+ for (auto it = constraints.begin (); it != constraints.end () && count < threshold;)
192+ {
193+ auto cst = *it++;
194+
195+ auto tasks = retryCache->resolve (cst, threshold - count);
196+
197+ count += tasks->size ();
198+
199+ getConsumerBase (executorName)->addToSync (tasks, true );
200+
201+ }
202+ return count;
203+ }
204+
205+ void Orch::notifyRetry (Orch *retryOrch, const std::string &executorName, const Constraint &cst)
206+ {
207+ retryOrch->getRetryCache (executorName)->add_resolution (cst);
208+ }
209+
210+ size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries, bool onRetry) {
211+ return addToSync (*entries, onRetry);
212+ }
213+
214+ void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry, bool onRetry)
151215{
152216 SWSS_LOG_ENTER ();
153217
@@ -157,6 +221,24 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
157221 /* Record incoming tasks */
158222 Recorder::Instance ().swss .record (dumpTuple (entry));
159223
224+ auto retryCache = getOrch () ? getOrch ()->getRetryCache (getName ()) : nullptr ;
225+
226+ if (retryCache)
227+ {
228+ auto it = retryCache->getRetryMap ().find (key);
229+ if (it != retryCache->getRetryMap ().end ()) // key exists
230+ {
231+ if (it->second .second == entry) // skip duplicate task
232+ return ;
233+
234+ auto cache = retryCache->erase_stale_cache (key);
235+ Recorder::Instance ().retry .record (dumpTuple (*cache).append (DECACHE));
236+
237+ if (op == SET_COMMAND)
238+ m_toSync.emplace (key, std::move (*cache));
239+ }
240+ }
241+
160242 /*
161243 * m_toSync is a multimap which will allow one key with multiple values,
162244 * Also, the order of the key-value pairs whose keys compare equivalent
@@ -230,22 +312,18 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
230312
231313}
232314
233- size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries)
315+ size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries, bool onRetry )
234316{
235317 SWSS_LOG_ENTER ();
236318
237319 for (auto & entry: entries)
238320 {
239- addToSync (entry);
321+ addToSync (entry, onRetry );
240322 }
241323
242324 return entries.size ();
243325}
244326
245- size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
246- return addToSync (*entries);
247- }
248-
249327// TODO: Table should be const
250328size_t ConsumerBase::refillToSync (Table* table)
251329{
0 commit comments