Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions rdagent/app/data_science/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ class DataScienceBasePropSetting(KaggleBasePropSetting):
ensemble_time_upper_bound: bool = False


### Graph related
enable_node_restart : bool = True
"""Enable node restart for failed nodes in the graph"""

enable_node_a_restart : bool = True

"""Enable node A restart for failed nodes in the graph"""

enable_node_b_restart : bool = True
"""Enable node B restart for failed nodes in the graph"""




DS_RD_SETTING = DataScienceBasePropSetting()

# enable_cross_trace_diversity 和 llm_select_hypothesis should not be true at the same time
Expand Down
224 changes: 222 additions & 2 deletions rdagent/scenarios/data_science/proposal/exp_gen/proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from rdagent.utils.agent.tpl import T
from rdagent.utils.repo.diff import generate_diff_from_dict
from rdagent.utils.workflow import wait_retry
from collections import OrderedDict

_COMPONENT_META: Dict[str, Dict[str, Any]] = {
"DataLoadSpec": {
Expand Down Expand Up @@ -1271,6 +1272,206 @@ def get_all_hypotheses(self, problem_dict: dict, hypothesis_dict: dict) -> list[
)
)
return result

def _get_exp_index(self, trace: DSTrace) -> int:
leaves: list[int] = trace.get_leaves()
if trace.sota_exp_to_submit is not None:
sota_submit_value = trace.sota_exp_to_submit.result.loc["ensemble"].iloc[0]
trace_scores = []
for i, leaf in enumerate(leaves):
if leaf == trace.current_selection[0]:
continue
fb = trace.sota_experiment_fb(selection=(leaf,))
if fb is None:
continue
final_score = fb[0].result.loc["ensemble"].iloc[0]
trace_scores.append((i, abs(final_score - sota_submit_value)))
if trace_scores:
return min(trace_scores, key=lambda item: item[1])[0]
return next((i for i, leaf in enumerate(leaves) if leaf != trace.current_selection[0]))

@wait_retry(retry_n=5)
def merge_node_gen(
self,
trace: DSTrace,
component_desc: str,
sota_exp_desc: str,
enable_idea_pool: bool,
pipeline: bool = True,
exp_feedback_list_desc: str = "",
scenario_desc: str = "",
problems: dict = {},
) -> Dict:

sota_exp_fb = trace.sota_experiment_fb(selection=trace.current_selection)
if sota_exp_fb:
sota_exp_desc = T("scenarios.data_science.share:describe.exp").r(
exp=sota_exp_fb[0],
heading="Best previous exploration of the scenario",
)
eda_output = sota_exp_fb[0].experiment_workspace.file_dict.get("EDA.md", None)
else:
sota_exp_desc = ""
eda_output = None

trace_fbs: list[tuple[DSExperiment, ExperimentFeedback]] = []
# find the best exp to merge
leaves: list[int] = trace.get_leaves()
max_sota_retrieved_num_per_trace = max(DS_RD_SETTING.max_sota_retrieved_num * 2 // len(leaves), 4)
for leaf in leaves:
if leaf == trace.current_selection[0]:
continue

trace_fbs.extend(
trace.experiment_and_feedback_list_after_init(
return_type="sota",
search_type="ancestors",
selection=(leaf,),
max_retrieve_num=max_sota_retrieved_num_per_trace,
)
)

success_fb_list = list(set(trace_fbs))
logger.info(
f"Merge Hypothesis: select {len(success_fb_list)} from {len(trace_fbs)} SOTA experiments found in {len(leaves)} traces"
)

if len(success_fb_list) > 0:
exp_to_merge_fb_desc = T("scenarios.data_science.proposal.exp_gen.merge:trace").r(
exp_and_feedback_list=success_fb_list,
type="success",
heading="Successful iterations:",
success_trial_desc="These trials are the steps or changes that led to the success of the solution to be merged",
pipeline=DS_RD_SETTING.coder_on_whole_pipeline,
)
else:
exp_index = self._get_exp_index(trace)
exp_to_merge_fb = trace.sota_experiment_fb(selection=(exp_index,))
if exp_to_merge_fb is None:
exp_to_merge_fb = trace.hist[exp_index]

exp_to_merge_fb_desc = T("scenarios.data_science.share:describe.feedback").r(
exp_and_feedback=exp_to_merge_fb,
heading="The feedback for the solution to be merged",
)

component_desc = T("scenarios.data_science.share:component_description_in_pipeline").r()

sys_prompt = T(".restart:hypothesis_gen.system").r(
component_desc=component_desc,
hypothesis_output_format=T(".prompts_v2:output_format.hypothesis").r(
pipeline=pipeline, enable_idea_pool=enable_idea_pool
),
pipeline=pipeline,
)
user_prompt = T(".restart:hypothesis_gen.user").r(
exp_and_feedback_list_desc=exp_to_merge_fb_desc,
sota_exp_desc=sota_exp_desc,
)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt=user_prompt,
system_prompt=sys_prompt,
json_mode=True,
json_target_type=Dict[str, Dict[str, str | Dict[str, str | int]]],
)
resp_dict = json.loads(response)
return resp_dict


def _get_scores(self,trace,loop_id2idx, loop_id_list,root_id) -> list:
id_and_scores = []
for loop_id in loop_id_list:
if trace.hist[loop_id2idx[loop_id]][1].decision == True:
id_and_scores.append(
(root_id,loop_id,trace.hist[loop_id2idx[loop_id]][0].result.loc["ensemble"].iloc[0].round(3))
)
else:
id_and_scores.append((root_id,loop_id,-1))
return id_and_scores


def identify_current_node_type(self, trace: DSTrace) -> str:

competition = trace.scen.competition

root_nodes = {}
parent_nodes = {}
for node in range(len(trace.hist)):
parents = trace.get_parents(node)
root_nodes[node] = parents[0]
parent_nodes[node] = parents[-2] if len(parents) > 1 else None
if hasattr(trace, "idx2loop_id"):
root_nodes = {trace.idx2loop_id[n]: trace.idx2loop_id[r] for n, r in root_nodes.items()}
parent_nodes = {
trace.idx2loop_id[n]: trace.idx2loop_id[r] if r is not None else r
for n, r in parent_nodes.items()
}


current_record_id = trace.current_selection[0]
current_loop_id = trace.idx2loop_id[current_record_id]


loop_id_list = self._get_path(current_loop_id, parent_nodes)

loop_id2idx = {v: k for k, v in trace.idx2loop_id.items()}

unique_roots = list(OrderedDict.fromkeys(root_nodes.values()))

node_list_from_different_root = [
[node for node, r in root_nodes.items() if r == root]
for root in unique_roots
]

score_list = [self._get_scores(trace,loop_id2idx ,l,root_id)
for l,root_id in zip(node_list_from_different_root, unique_roots)]

all_nodes = [item for sublist in score_list for item in sublist]


current_parent_root = [
root_id
for l in score_list
for root_id, loop_id, score in l
if loop_id == current_loop_id
]

mean_scores = []
successful_rates = []
for l in score_list:
valid_scores = [s for _, _, s in l if s != -1]
root = l[0][0]
mean_score = np.mean(valid_scores) if valid_scores else None
successful_rate = 100*len(valid_scores)/len(l) if l else 0.0
successful_rates.append((root, successful_rate))
mean_scores.append((root,mean_score))


current_success_rate = next((rate for r, rate in successful_rates if r == current_parent_root), 0.0)
if current_success_rate > 50 :
#percentile=25
min_threshold=0.7
all_scores = np.array([score for _, score in mean_scores])
bigger_is_better = get_metric_direction(competition)

if bigger_is_better:
percentile = 75
dynamic_threshold = max(np.percentile(all_scores, percentile), min_threshold)
root_score = next((score for root, score in mean_scores if root == current_parent_root), None)
if root_score is None or root_score < dynamic_threshold:
return "restart"
else:
return "explore"
else:
percentile = 25
dynamic_threshold = min(np.percentile(all_scores, 100 - percentile), 1 - min_threshold)
root_score = next((score for root, score in mean_scores if root == current_parent_root), None)
if root_score is None or root_score > dynamic_threshold:
return "restart"
else:
return "explore"
else:
return "restart"

def gen(
self,
Expand Down Expand Up @@ -1372,6 +1573,20 @@ def gen(
is_new_tree=is_new_tree,
sibling_exp=sibling_exp,
)

node_type = "none"
if DS_RD_SETTING.enable_node_restart:
if len(trace.current_selection) != 0:
node_type = self.identify_current_node_type(trace)

if node_type == "restart":
hypothesis_dict = self.merge_node_gen(trace= trace,component_desc = component_desc,sota_exp_desc = sota_exp_desc,
enable_idea_pool =DS_RD_SETTING.enable_knowledge_base,pipeline =pipeline,exp_feedback_list_desc=exp_feedback_list_desc,
scenario_desc=scenario_desc,problems = all_problems )
else:
hypothesis_dict = hypothesis_dict


if not pipeline:
sota_exp_model_file_count = len(
[
Expand All @@ -1388,6 +1603,8 @@ def gen(
for name in pop_names:
hypothesis_dict.pop(name)



# Step 2.1 & 2.2: Hypothesis Critique and Rewrite Stage (controlled by enable_hypo_critique_rewrite)
if DS_RD_SETTING.enable_hypo_critique_rewrite and len(trace.hist) > 0:
logger.info(f"Hypothesis critique and rewrite enabled - processing {len(hypothesis_dict)} hypotheses")
Expand Down Expand Up @@ -1424,7 +1641,8 @@ def gen(
logger.info(f"Hypothesis critique and rewrite disabled - using original {len(hypothesis_dict)} hypotheses")

# Step 3: Select the best hypothesis
if DS_RD_SETTING.llm_select_hypothesis:

if DS_RD_SETTING.llm_select_hypothesis and node_type != "restart":
response_dict = self.hypothesis_select_with_llm(
scenario_desc=scenario_desc,
exp_feedback_list_desc=exp_feedback_list_desc,
Expand All @@ -1439,11 +1657,13 @@ def gen(
)
pickled_problem_name = None
else:
all_problems = {}
pickled_problem_name, new_hypothesis = self.hypothesis_rank(
hypothesis_dict=hypothesis_dict,
problem_dict=all_problems,
selected_idx=0,
)

# Step 3.5: Update knowledge base with the picked problem
if DS_RD_SETTING.enable_knowledge_base:
trace.knowledge_base.update_pickled_problem(all_problems, pickled_problem_name)
Expand Down
47 changes: 47 additions & 0 deletions rdagent/scenarios/data_science/proposal/exp_gen/restart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
hypothesis_gen:
system: |-
{% include "scenarios.data_science.share:scen.role" %}
The user is improving a Kaggle competition implementation iteratively through traces where each new trace is modified from the current SOTA in the trace. If new trace surpasses the current SOTA, it will be the new SOTA. If not, it will be a failed experiment.
You will be provided with:
1. A detailed competition scenario description;
2. Previous SOTA experiments and feedbacks, which are past SOTA experiments indexed from oldest to newest;
3. The current SOTA implementation and feedback, which is the latest SOTA experiments from the previous experiments;
4. Extra implementations from another users' experiments;
Your task is to:
1. **Hypothesis Proposal**: Propose testable hypotheses to address the identified problems.
2. **Hypothesis Evaluation**: Evaluate the proposed hypotheses across multiple dimensions.

# Task 1: Hypothesis Proposal
For each identified problem, propose a hypothesis to improve the current SOTA implementation.

## Hypothesis Guidelines
Here are few guidelines to help you formulate hypotheses:
1. Previous Experiments Analysis
- For previous SOTA experiments, analyze insights and implicit patterns that can be leveraged to improve the current SOTA implementation.
- For failed experiments, think about the persistent problems they facing. If these experiments consistently failed due to time/memory constraints, prioritize changes on efficiency.
2. Note on Time/Memory Constraints
- If prior experiments failed due to time/memory limitations, assume your new hypothesis will face the same constraints. In this case, prioritize efficiency and **ONLY** response to the problems related to time/memory constraints in your response dictionary.
- Besides, do not compromise performance merely for efficiency since the current SOTA implementation do not encounter the constraints. You should think about how to balance the efficiency and performance so that your new hypothesis can be executed successfully and achieve satisfactory performance.

# Task 2: Hypothesis Evaluation
## Evaluation Instruction
Firstly, you should tag the hypothesis with one of the following components. If the hypothesis is related to multiple components, you should choose the most relevant one.
{{ component_desc }}
After proposing the hypothesis, your second task is to evaluate the hypothesis from multiple dimensions.

Secondly, please score the proposed hypothesis from 1 to 10 for each of the following dimensions (where 1 means lowest and 10 means highest):
1. Problem-Hypothesis Alignment: How well the hypothesis addresses the identified problem.
2. Expected Impact: The estimated improvement after applying the hypothesis to current SOTA implementation.
3. Novelty: Degree of innovation compared to previous attempts. If the proposed hypothesis is similar to previous experiments' hypothesis, assign novelty score to one.
4. Feasibility: The ease of implementing the proposed hypothesis in the current SOTA implementation.
5. Risk-Reward Balance: The exploration-exploitation balance of the proposed hypothesis.

## Final Output Format in JSON Schema:
{{ hypothesis_output_format }}

user: |-
# Ertra Experiments and Feedbacks
{{ exp_and_feedback_list_desc }}

# Current SOTA Implementation
{{ sota_exp_desc }}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def select(self, trace: DSTrace) -> tuple[int, ...] | None:
return None



# ======================================================================================
# Probabilistic Scheduler and its potential functions
# ======================================================================================
Expand Down
Loading