Skip to content

Conversation

@DerWeh
Copy link
Collaborator

@DerWeh DerWeh commented Aug 23, 2025

Instead of generating lists of arguments, dispatching the parallel functions, and then deleting the argument lists, the functions are directly called passing arguments on demand. This should be clearer.
Furthermore, we create a partial to avoid repeating common function arguments. This reduces code duplications and makes differences to the boost calls clearer.


This is a minor refactor to prepare a PR which hopefully eliminates manually managing shared memory. I previously failed with big PRs (#578 isn't abandoned yet, just haven't managed yet to work my way through _harmonize_tensor), so I'm trying a more iterative approach now.

@paulbkoch
Copy link
Collaborator

paulbkoch commented Aug 23, 2025

Hi @DerWeh -- With regards to the shared memory, we use it as a signaling method between the child processes for the new callback parameter. If any of the child processes indicate fitting should stop, all child processes observe the same boolean in the shared memory block, and exit when they observe this. At one point I tried moving the bigger buffers into shared memory as well because that would have saved some memory, but it turns out docker severely limits the amount of shared memory allowed on a system. The single boolean that we use for signaling isn't an issue though.

@codecov
Copy link

codecov bot commented Aug 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.14%. Comparing base (97f29e4) to head (056bb36).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #632      +/-   ##
==========================================
- Coverage   73.17%   73.14%   -0.03%     
==========================================
  Files          74       74              
  Lines       10552    10531      -21     
==========================================
- Hits         7721     7703      -18     
+ Misses       2831     2828       -3     
Flag Coverage Δ
bdist_linux_310_python 72.76% <100.00%> (-0.10%) ⬇️
bdist_linux_311_python 72.74% <100.00%> (-0.04%) ⬇️
bdist_linux_312_python 62.72% <92.85%> (-0.09%) ⬇️
bdist_linux_313_python 72.78% <100.00%> (+10.59%) ⬆️
bdist_mac_310_python 73.00% <100.00%> (-0.07%) ⬇️
bdist_mac_311_python 72.91% <100.00%> (-0.13%) ⬇️
bdist_mac_312_python 72.91% <100.00%> (-0.17%) ⬇️
bdist_mac_313_python 72.93% <100.00%> (-0.12%) ⬇️
bdist_win_310_python 73.00% <100.00%> (+10.79%) ⬆️
bdist_win_311_python 72.95% <100.00%> (+10.73%) ⬆️
bdist_win_312_python 62.74% <92.85%> (-0.07%) ⬇️
bdist_win_313_python 59.13% <100.00%> (-0.10%) ⬇️
sdist_linux_310_python 72.75% <100.00%> (+<0.01%) ⬆️
sdist_linux_311_python 72.68% <100.00%> (-0.14%) ⬇️
sdist_linux_312_python 72.75% <100.00%> (+10.01%) ⬆️
sdist_linux_313_python 72.77% <100.00%> (+10.62%) ⬆️
sdist_mac_310_python 72.90% <100.00%> (-0.05%) ⬇️
sdist_mac_311_python 72.91% <100.00%> (-0.04%) ⬇️
sdist_mac_312_python 72.91% <100.00%> (+0.40%) ⬆️
sdist_mac_313_python 72.88% <100.00%> (-0.09%) ⬇️
sdist_win_310_python 62.15% <92.85%> (-0.05%) ⬇️
sdist_win_311_python 62.15% <92.85%> (-0.05%) ⬇️
sdist_win_312_python 62.74% <92.85%> (-7.43%) ⬇️
sdist_win_313_python 59.19% <100.00%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@DerWeh
Copy link
Collaborator Author

DerWeh commented Aug 23, 2025

Thanks for the hint @paulbkoch

With regards to the shared memory, we use it as a signaling method between the child processes for the new callback parameter. If any of the child processes indicate fitting should stop, all child processes observe the same boolean in the shared memory block, and exit when they observe this.

Yes, I already saw this. I think probably using a Manager creating a boolean Value would be the idiomatic why to do this. Eventually, I would like to get rid of this entirely, but I haven't any suggestions yet. That's probably two more steps ahead...

joblib already supports dask and ray as distributed backends, see https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html#joblib.parallel_config
(And https://joblib.readthedocs.io/en/latest/auto_examples/parallel/distributed_backend_simple.html)
This would allow distributing fitting EBMs over multiple machines. This could be desirable to fit large datasets, where the fitting might be memory bound. Each bag can be fitted on a different machine, reducing the memory requirements per machine. This is already possible out of the box, except for the manual used shared memory (currently only the boolean flag).

At one point I tried moving the bigger buffers into shared memory as well because that would have saved some memory, but it turns out docker severely limits the amount of shared memory allowed on a system.

So far, the results in #630 seem to indicate that for setups with large memory usage (large datasets) the majority of memory is required by CreateBooster. So this doesn't really seem important.

To my knowledge on Linux (which is also used in Docker), multiprocessing employs copy-on-write. Thus, there is probably no memory savings on Linux by using shared memory.

Per default, joblib automatically uses shared memory for multiprocessing (loky) using memory maps for you, see https://joblib.readthedocs.io/en/latest/parallel.html#working-with-numerical-data-in-shared-memory-memmapping.
The default is that objects larger than 1 MB are shared. So, even on Windows, there should be no (relevant) memory overhead.

It even has a fallback:

By default the data is dumped to the /dev/shm shared-memory partition if it exists and is writable (typically the case under Linux). Otherwise the operating system’s temporary folder is used.

So it might be usable in docker containers. Anyway, you can always use the --shm-size flag of docker to increase shared memory.


This summarizes (or at least outlines) why I want to get rid of manually managing shared memory: we can profit from joblib's flexibility. Using the threading backend, there is no need for shared memory. Using loky for multiprocessing, joblib might already do a better job. And if we do nothing manually, we can probably deploy to distributed systems using dask or ray out of the box. (Though I probably can't test this, as I currently don't have any cluster access.)

Furthermore, the code should become less complex. And honestly, EBMs are already complex enough.

@paulbkoch
Copy link
Collaborator

@DerWeh -- I already tried the Manager and boolean Value route (see 5f200c8). It was A LOT slower. I think the Value uses an OS synchronization primitive, which required locking anytime someone wanted to check the value. With shared memory there isn't any locking required since all threads can just read the boolean value, and the only legal transition for the bool is to signal termination. It would also be possible to pass the boolean into C++ if that became useful, although we don't currently use that.

I did think a bit about the distributed scenario, and I decided the simple option would be to not support the feature that callbacks can terminate all processing nodes. Given that we only parallelize across outer_bags currently, it seems like this wouldn't be a huge limitation in a well-resourced distributed system, because I'd assume processing would start on all outer bags simultaneously.

I believe the current code won't work on dask/ray, but I haven't checked. We could make it work in the simple case by simply creating a numpy array instead of using shared memory if the shutdown signal won't be shared. I'm open to other ideas on this. In the ideal world we'd be able to pass a message on termination, but I'm not sure if that's supported in most frameworks since MPI is hard to do properly.

@paulbkoch
Copy link
Collaborator

On the large dataset thing, yes, I agree it's a minor benefit, so I wasn't too unhappy to back-out the shared memory change and just use joblib for that. I estimated the benefit would be about 5% on most workloads (approx 1/outer_bags). joblib does use shared memory, but it accepts non-aligned numpy memory blocks, which requires making a copy into an OS aligned memory block. In theory we could have skipped that extra copy and put the allocation into an aligned block from the start.

@DerWeh
Copy link
Collaborator Author

DerWeh commented Aug 24, 2025

I already tried the Manager and boolean Value route (see 5f200c8). It was A LOT slower. I think the Value uses an OS synchronization primitive, which required locking anytime someone wanted to check the value. With shared memory there isn't any locking required since all threads can just read the boolean value, and the only legal transition for the bool is to signal termination. It would also be possible to pass the boolean into C++ if that became useful, although we don't currently use that.

Thanks for the early feedback. As far as I see, you used a locking value, have you also tried if lock=False fixes the performances issues? See multiprocessing.Value.

What have you used to check the performance? The tests test_callbacks_XXX()? Would be interesting for me to check that my (possible) future simplifications don't hurt performance.

I did think a bit about the distributed scenario, and I decided the simple option would be to not support the feature that callbacks can terminate all processing nodes. Given that we only parallelize across outer_bags currently, it seems like this wouldn't be a huge limitation in a well-resourced distributed system, because I'd assume processing would start on all outer bags simultaneously.

I agree. Please correct me if I am wrong, but probably the reason for this "synchronized" callback is to create the timeouts when there are fewer jobs than bags (probably hinted in 575)

The hard part is probably finding out whether we are in a distributed scenario, as the form of parallelism is abstracted away by joblib. But I'll look into it once I get there.

I believe the current code won't work on dask/ray, but I haven't checked. We could make it work in the simple case by simply creating a numpy array instead of using shared memory if the shutdown signal won't be shared. I'm open to other ideas on this. In the ideal world we'd be able to pass a message on termination, but I'm not sure if that's supported in most frameworks since MPI is hard to do properly.

I remember using dask as a backend in some previous release, so at one time it worked in principle. (So I have no cluster, so I ran it locally, just using dask's dashboard to inspect the distribution of jobs. This might also be interesting to check the memory usage of EBMs.)


Thanks for all the useful discussions. I'll include the idea's in the follow-up PR after this one is merged.

@paulbkoch
Copy link
Collaborator

I didn't try a manager with lock=False. The docs don't say how it's implemented. If it's using shared memory under the hood, it could be fast.

The docs do say that the manager approach should work across machines, which would be nice. I think in that specific case there would need be a cross-machine call though, which I'd expect to be slow, but perhaps the slowdown is fine in a larger distributed network. We'd have to see.

I'd say, if it's fast locally across processes using lock=False, then there's no reason not to use that instead of shared memory. Users on distributed networks can use the callback or not depending on how much of a slowdown they can tolerate. Would be good to have a better solution over the network, but I'm not sure there's a generally great solution for many of these distributed frameworks that lack message passing.

I noticed the slowdown in the synthetic dataset notebook (https://github.com/interpretml/interpret/blob/main/docs/interpret/python/examples/interpretable-regression-synthetic.ipynb). Observing the slowdown requires setting a callback. I tried it with a ridiculously long timeout since otherwise it'll just end at the timeout.

@paulbkoch paulbkoch merged commit 0917b29 into interpretml:main Aug 25, 2025
61 of 68 checks passed
@paulbkoch
Copy link
Collaborator

Looks great. Thank you @DerWeh!

@DerWeh DerWeh deleted the simplify-parallel-calls branch August 25, 2025 07:10
@DerWeh
Copy link
Collaborator Author

DerWeh commented Aug 30, 2025

@paulbkoch I tried using lock=False, but multiprocessing.Value is indeed very slow.
So the big question is: what for do we need the synchronization anyway?

The only example I found was in the following testcase:

    def callback_generator(seconds):
        class Callback:
            def __init__(self, seconds):
                self._seconds = seconds

            def __call__(self, bag_index, step_index, progress, metric):
                import time

                if not hasattr(self, "_end_time"):
                    self._end_time = time.monotonic() + self._seconds
                    return False
                else:
                    return time.monotonic() > self._end_time

        return Callback(seconds)

    X, y, names, types = make_synthetic(seed=42, output_type="float", n_samples=10000)

    # run for half a second
    ebm = ExplainableBoostingClassifier(names, types, callback=callback_generator(0.5))
    ebm.fit(X, y)

This seems like a global timeout, where all bags abort, after the bag started first runs into a timeout
Another option to realize this, would be to simply extend the callback by a start time parameter. Before starting the parallel boosting, we take the current time and pass it to every boosting job. This would be an immutable parameter, avoiding the need for synchronization.

Instead, you would use

time_out = 0.5


def callback(bag_index, step_index, progress, metric, start_time):
    import time
    
    return time.monotonic() > start_time + time_out

The only issue, I see, is that in a distributed setup this could lead to strange behavior if the time of the machines is out of sync.

Another option, is starting the clock right away (not only after the first boosting iteration) and move the callback from the class to the function:

def timeout_factory(time_out):
    import time
    end_time = time.monotonic() + time_out
    
    def callback(bag_index, step_index, progress, metric, start_time):
        import time
    
        return time.monotonic() > end_time
        
    return callback
    
ebm = ExplainableBoostingClassifier(names, types)
ebm.fit(X, y, callback=timeout_factory(time_out=0.5)

In this case, the clock would start ticking just before we start the fitting routine. Again there might be strange behavior if the time of machines is out of sync.

@paulbkoch
Copy link
Collaborator

@DerWeh, I have to admit, I wasn't fond of the current callback mechanism. I implemented it mainly to get EBMs into AutoGluon where timeouts are either required or highly desirable (https://github.com/autogluon/autogluon/blob/4f492749be1b289333da5d29100985fd5b083f35/tabular/src/autogluon/tabular/models/ebm/ebm_model.py#L19-L30).

Is using dask/ray the only reason to avoid using shared memory? I've spent quite a bit of time thinking through the parallelism model, and I can't think of a good way to significantly improve parallelization using these frameworks. There is a way to significantly speed things up using message passing interfaces. XGBoost and LightGBM seem to have the same problems. I think the EBM solution will look remarkably similar to how those packages implemented distributed parallelism.

@DerWeh
Copy link
Collaborator Author

DerWeh commented Sep 5, 2025

Is using dask/ray the only reason to avoid using shared memory?

Currently, it's just an itch I try to scratch. The solution doesn't strike me as elegant, so I try to come up with something less complex. But finding a practical solution for your problem is quite hard...

In the future, I'll probably have to consider distributed computation. So far, distributing each bag to a separate machine is the only idea I have to fit large datasets...

@paulbkoch
Copy link
Collaborator

The best method I've thought of to massively parallelize the work is to split up the dataset into a number of chunks, then assign each chunk to a processing node, and then compute the histogram statistics on each processing node, followed by a reduce operation to add the histograms from the separate nodes together. The reduce operation takes log N time, so in theory it should scale to practically any size dataset. But this will require a fast interconnect between the nodes since we have so many serial boosting steps. It is essentially the same method employed by XGBoost/LightGBM/catboost. I've done some of the work to enable this internally, but there's still quite a bit to do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants