Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ bench/shakespeare.txt
.idea/
.ipynb_checkpoints/
coverage.xml
venv/
40 changes: 33 additions & 7 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3654,13 +3654,39 @@ def query(self, expr, **kwargs):
return new_collection(Query(self, expr, kwargs))

@derived_from(pd.DataFrame)
def mode(self, dropna=True, split_every=False, numeric_only=False):
modes = []
for _, col in self.items():
if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype):
continue
modes.append(col.mode(dropna=dropna, split_every=split_every))
return concat(modes, axis=1)
# GH#11389 - Dask issue related to adding row-wise mode functionality
# GH#1136 - Dask-Expr specific implementation for row-wise mode functionality
# Contributor: @thyripian
def mode(self, axis=0, numeric_only=False, dropna=True, split_every=False):
if axis == 0:
# Existing logic for axis=0 (column-wise mode)
modes = []
for _, col in self.items():
if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype):
continue
modes.append(col.mode(dropna=dropna, split_every=split_every))
return concat(modes, axis=1)
elif axis == 1:
# Use self._meta_nonempty to generate meta
meta = self._meta_nonempty.mode(
axis=1, numeric_only=numeric_only, dropna=dropna
)

# Determine the maximum number of modes any row can have
max_modes = len(self.columns)

# Reindex meta to have the maximum number of columns
meta = meta.reindex(columns=range(max_modes))

# Apply map_partitions using pandas' mode function directly
return self.map_partitions(
lambda df: df.mode(
axis=1, numeric_only=numeric_only, dropna=dropna
).reindex(columns=range(max_modes)),
meta=meta,
)
else:
raise ValueError(f"No axis named {axis} for object type {type(self)}")

@derived_from(pd.DataFrame)
def add_prefix(self, prefix):
Expand Down
32 changes: 20 additions & 12 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1269,9 +1269,11 @@ def _simplify_up(self, parent, dependents):
columns = _convert_to_list(columns)
frame_columns = set(self.frame.columns)
columns = [
reverse_mapping[col]
if col in reverse_mapping and reverse_mapping[col] in frame_columns
else col
(
reverse_mapping[col]
if col in reverse_mapping and reverse_mapping[col] in frame_columns
else col
)
for col in columns
]
columns = [col for col in self.frame.columns if col in columns]
Expand Down Expand Up @@ -2342,9 +2344,11 @@ class AddPrefix(Elemwise):
@functools.cached_property
def unique_partition_mapping_columns_from_shuffle(self):
return {
f"{self.prefix}{c}"
if not isinstance(c, tuple)
else tuple(self.prefix + t for t in c)
(
f"{self.prefix}{c}"
if not isinstance(c, tuple)
else tuple(self.prefix + t for t in c)
)
for c in self.frame.unique_partition_mapping_columns_from_shuffle
}

Expand Down Expand Up @@ -2373,9 +2377,11 @@ class AddSuffix(AddPrefix):
@functools.cached_property
def unique_partition_mapping_columns_from_shuffle(self):
return {
f"{c}{self.suffix}"
if not isinstance(c, tuple)
else tuple(t + self.suffix for t in c)
(
f"{c}{self.suffix}"
if not isinstance(c, tuple)
else tuple(t + self.suffix for t in c)
)
for c in self.frame.unique_partition_mapping_columns_from_shuffle
}

Expand Down Expand Up @@ -2421,9 +2427,11 @@ def _task(self, index: int):
def _simplify_down(self):
if isinstance(self.frame, Elemwise):
operands = [
Head(op, self.n, self.operand("npartitions"))
if isinstance(op, Expr) and not isinstance(op, _DelayedExpr)
else op
(
Head(op, self.n, self.operand("npartitions"))
if isinstance(op, Expr) and not isinstance(op, _DelayedExpr)
else op
)
for op in self.frame.operands
]
return type(self.frame)(*operands)
Expand Down
Loading