Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False,
"a filesystem compatible with fsspec") from e
self.open = open_with
self._statistics = None
self.global_cats = {}

def _parse_header(self, f, verify=True):
if self.fn and self.fn.endswith("_metadata"):
Expand Down Expand Up @@ -318,7 +319,8 @@ def __getitem__(self, item):
new_pf.__setstate__(
{"fn": self.fn, "open": self.open, "fmd": fmd,
"pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype,
"tz": self.tz, "_columns_dtype": self._columns_dtype}
"tz": self.tz, "_columns_dtype": self._columns_dtype,
"global_cats": {}} # fresh empty dict for the slice
)
new_pf._set_attrs()
return new_pf
Expand Down Expand Up @@ -389,7 +391,7 @@ def read_row_group_file(self, rg, columns, categories, index=None,
f, rg, columns, categories, self.schema, self.cats,
selfmade=self.selfmade, index=index,
assign=assign, scheme=self.file_scheme, partition_meta=partition_meta,
row_filter=row_filter
row_filter=row_filter, global_cats=self.global_cats
)
if ret:
return df
Expand Down Expand Up @@ -1011,7 +1013,7 @@ def __getstate__(self):
self.fmd.row_groups = []
return {"fn": self.fn, "open": self.open, "fmd": self.fmd,
"pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype,
"tz": self.tz}
"tz": self.tz, "global_cats": self.global_cats}

def __setstate__(self, state):
self.__dict__.update(state)
Expand Down
90 changes: 82 additions & 8 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata,

def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
dic, assign, num, use_cat, file_offset, ph, idx=None,
selfmade=False, row_filter=None):
selfmade=False, row_filter=None, remap_array=None):
"""
:param infile: open file
:param schema_helper:
Expand All @@ -211,6 +211,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
:param assign: output array (all of it)
:param num: offset, rows so far
:param use_cat: output is categorical?
:param remap_array: array for remapping categorical indices
:return: None

test data "/Users/mdurant/Downloads/datapage_v2.snappy.parquet"
Expand Down Expand Up @@ -338,6 +339,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
if bit_width in [8, 16, 32] and selfmade:
# special fastpath for cats
outbytes = raw_bytes[pagefile.tell():]
if remap_array is not None:
# Apply remapping to outbytes.
outbytes = remap_array[outbytes]
if len(outbytes) == assign[num:num+data_header2.num_values].nbytes:
assign[num:num+data_header2.num_values].view('uint8')[row_filter] = outbytes[row_filter]
else:
Expand All @@ -358,6 +362,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')),
itemsize=bit_width
)
if remap_array is not None:
# Apply remapping after reading
assign[num:num+data_header2.num_values] = remap_array[assign[num:num+data_header2.num_values]]
else:
temp = np.empty(data_header2.num_values, assign.dtype)
encoding.read_rle_bit_packed_hybrid(
Expand All @@ -367,6 +374,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
encoding.NumpyIO(temp.view('uint8')),
itemsize=bit_width
)
if remap_array is not None:
temp = remap_array[temp]
if not nullable:
assign[num:num+data_header2.num_values][nulls[row_filter]] = None
assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter]
Expand Down Expand Up @@ -429,7 +438,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,

def read_col(column, schema_helper, infile, use_cat=False,
selfmade=False, assign=None, catdef=None,
row_filter=None):
row_filter=None, global_cats=None):
"""Using the given metadata, read one column in one row-group.

Parameters
Expand All @@ -443,10 +452,20 @@ def read_col(column, schema_helper, infile, use_cat=False,
use_cat: bool (False)
If this column is encoded throughout with dict encoding, give back
a pandas categorical column; otherwise, decode to values
selfmade: bool (False)
If data created by fastparquet
assign: numpy array
Where to store the result
catdef: pandas.Categorical or CategoricalDtype
If reading a categorical column, the categorical definition (categories and
ordering).
row_filter: bool array or None
if given, selects which of the values read are to be written
into the output. Effectively implies NULLs, even for a required
column.
global_cats: dict or None
Optional dictionary for storing global categorical values across row groups.
Format: {col_path: array}
"""
cmd = column.meta_data
try:
Expand Down Expand Up @@ -480,6 +499,15 @@ def read_col(column, schema_helper, infile, use_cat=False,
row_idx = [0] # map/list objects
dic = None
index_off = 0 # how far through row_filter we are

# Initialize tracking variables for categorical dictionaries
# Only set up global dictionary tracking if using categorical and global_cats is provided
remap_dict = {} # Dictionary for collecting mappings
if use_cat and global_cats is not None:
path_str = ".".join(cmd.path_in_schema)
# Register this column in global_cats if not already present
if path_str not in global_cats:
global_cats[path_str] = None

while num < rows:
off = infile.tell()
Expand All @@ -497,7 +525,44 @@ def read_col(column, schema_helper, infile, use_cat=False,
ddt = [kv.value.decode() for kv in (cmd.key_value_metadata or [])
if kv.key == b"label_dtype"]
ddt = ddt[0] if ddt else None
catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True)

if global_cats is not None:
# Check if categorical values are consistent with global dictionary.
if global_cats[path_str] is None:
# This is the first dictionary for this column, save it as global
global_cats[path_str] = dic
else:
# Dictionary already defined for this column, check for inconsistency.
global_dict = global_cats[path_str]
new_values = []
# Build remap_dict in a single comprehension,
# appending new values to new_values at the same time:
# - Use walrus operator (:=) to store found_idx from global_dict lookup.
# - When found_idx is -1, append val to new_values and use its new position.
# - Only include indices that need remapping (found_idx != i).
remap_dict = {i: (len(global_dict) + len(new_values) - 1)
if found_idx == -1 and not new_values.append(val) else found_idx
for (i,), val in np.ndenumerate(dic)
if (found_idx := next((j
for (j,), gval in np.ndenumerate(global_dict)
if val == gval), -1)) != i
}
if remap_dict:
# If any remapping is needed, create a complete remap array.
# Initialize with identity mapping (no change)
remap_array = np.arange(len(dic), dtype=np.int32)
# Update indices that need remapping
remap_array[list(remap_dict)] = list(remap_dict.values())
if new_values:
# Add new values to global dictionary
global_cats[path_str] = np.append(global_dict, new_values)
# Update categories
catdef._set_categories(pd.Index(global_cats[path_str], dtype=ddt), fastpath=True)

# Normal case - always set categories for this dictionary
if global_cats is None or not remap_dict:
catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True)

if np.iinfo(assign.dtype).max < len(dic):
raise RuntimeError('Assigned array dtype (%s) cannot accommodate '
'number of category labels (%i)' %
Expand All @@ -509,7 +574,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
if ph.type == parquet_thrift.PageType.DATA_PAGE_V2:
num += read_data_page_v2(infile, schema_helper, se, ph.data_page_header_v2, cmd,
dic, assign, num, use_cat, off, ph, row_idx, selfmade=selfmade,
row_filter=row_filter)
row_filter=row_filter, remap_array=remap_array if remap_dict else None)
continue
if (selfmade and hasattr(cmd, 'statistics') and
getattr(cmd.statistics, 'null_count', 1) == 0):
Expand Down Expand Up @@ -563,6 +628,9 @@ def read_col(column, schema_helper, infile, use_cat=False,
part[defi == max_defi] = dic[val]
elif not use_cat:
part[defi == max_defi] = convert(val, se, dtype=assign.dtype)
elif remap_dict:
# Apply remapping of categorical codes
part[defi == max_defi] = remap_array[val]
else:
part[defi == max_defi] = val
else:
Expand All @@ -582,14 +650,18 @@ def read_col(column, schema_helper, infile, use_cat=False,
piece[:] = dic[val]
elif not use_cat:
piece[:] = convert(val, se, dtype=assign.dtype)
elif remap_dict:
# Apply remapping of categorical codes
piece[:] = remap_array[val]
else:
piece[:] = val

num += len(defi) if defi is not None else len(val)


def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
selfmade=False, assign=None, row_filter=False):
selfmade=False, assign=None, row_filter=False,
global_cats=None):
"""
Read a row group and return as a dict of arrays

Expand All @@ -615,7 +687,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
selfmade=selfmade, assign=out[name],
catdef=out.get(name+'-catdef', None),
row_filter=row_filter)
row_filter=row_filter, global_cats=global_cats)

if _is_map_like(schema_helper, column):
# TODO: could be done in fast loop in _assemble_objects?
Expand All @@ -634,15 +706,17 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,

def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
scheme='hive', partition_meta=None, row_filter=False):
scheme='hive', partition_meta=None, row_filter=False,
global_cats=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
partition_meta = partition_meta or {}
if assign is None:
raise RuntimeError('Going with pre-allocation!')
read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade, assign=assign, row_filter=row_filter)
cats, selfmade, assign=assign, row_filter=row_filter,
global_cats=global_cats)

for cat in cats:
if cat not in assign:
Expand Down
88 changes: 88 additions & 0 deletions fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -1214,3 +1214,91 @@ def test_attrs_roundtrip(tempdir):
df.to_parquet(path=fn, engine="fastparquet")
df2 = pd.read_parquet(fn, engine="fastparquet")
assert df2.attrs == attrs


def test_append_different_categorical_simple(tempdir):
"""Test for issue #949: wrong categories data when appending with categorical columns"""
fn = os.path.join(str(tempdir), 'test.parquet')
# First DataFrame with a categorical column
df1 = pd.DataFrame({
"col1": [1, 4, 7],
"col2": [2, 5, 8]
})
df1["col2"] = df1["col2"].astype("category")
write(fn, df1, write_index=False, file_scheme='simple')
# Second DataFrame to append
df2 = pd.DataFrame({
"col1": [4, 7, 10],
"col2": [5, 8, 11]
})
df2["col2"] = df2["col2"].astype("category")
write(fn, df2, append=True, write_index=False, file_scheme='simple')
# Read back again - this should maintain correct categorical values
df_combined = pd.read_parquet(fn, engine="fastparquet")
# Expected result when concatenating the two dataframes
expected = pd.concat([df1, df2], ignore_index=True)
expected["col2"] = expected["col2"].astype("category")
assert_frame_equal(df_combined, expected)


def test_append_different_categorical_multi(tempdir):
"""Test for issue #949: wrong categories data when appending with categorical columns"""
# Testing ParquetFile slicing as well.
# Set random seed for reproducibility
np.random.seed(42)
# Create initial DataFrame with categorical columns
def create_test_df(start_idx, rows, cats1, cats2):
cat1 = [cats1[i % len(cats1)] for i in range(rows)]
cat2 = [cats2[i % len(cats2)] for i in range(rows)]
df = pd.DataFrame({
'cat_col1': cat1,
'cat_col2': cat2,
'value': np.random.rand(rows)
})
df['cat_col1'] = df['cat_col1'].astype('category')
df['cat_col2'] = df['cat_col2'].astype('category')
return df
# Initial categories
cats1 = ['A', 'B', 'C']
cats2 = [10, 20, 30]
# First dataframe
fn = os.path.join(str(tempdir), 'test_parquet')
df1 = create_test_df(0, 5, cats1, cats2)
write(fn, df1, file_scheme='hive', write_index=False)
# New categories for second dataframe (overlapping + new values)
cats1_2 = ['B', 'C', 'D'] # B,C overlap with first df, D is new
cats2_2 = [30, 40, 50]
# Create second dataframe
df2 = create_test_df(len(df1), 6, cats1_2, cats2_2)
# Append second dataframe
write(fn, df2, file_scheme='hive', append=True, write_index=False)
# New categories for third dataframe (different ordering + new values)
cats1_3 = ['E', 'C', 'A'] # A,C from first, E is new
cats2_3 = [60, 70, 50] # Mixed order
# Create third dataframe
df3 = create_test_df(len(df1)+len(df2), 7, cats1_3, cats2_3)
# Append third dataframe
write(fn, df3, file_scheme='hive', append=True, write_index=False)
# Combine all original dataframes for comparison
expected_df = pd.concat([df1, df2, df3], axis=0, ignore_index=True)
expected_df['cat_col1'] = expected_df['cat_col1'].astype('category')
expected_df['cat_col2'] = expected_df['cat_col2'].astype('category')
pf = ParquetFile(fn)
actual_df = pf.to_pandas()
# Assert that the dataframes are equal
assert_frame_equal(expected_df, actual_df)
# Test slicing.
actual_df_subset = pf[1:].to_pandas()
expected_df_subset = pd.concat([df2, df3], axis=0, ignore_index=True)
expected_df_subset['cat_col1'] = expected_df_subset['cat_col1'].astype('category')
expected_df_subset['cat_col2'] = expected_df_subset['cat_col2'].astype('category')
try:
# Code to manage new categorical values in fastparquet does not reorder them.
# Code in pandas concat seems to do so.
actual_df_subset['cat_col1'] = actual_df_subset['cat_col1'].cat.reorder_categories(
expected_df_subset['cat_col1'].cat.categories
)
except ValueError:
raise AssertionError("failed to reorder categories")
assert_frame_equal(expected_df_subset, actual_df_subset)

Loading