-
Notifications
You must be signed in to change notification settings - Fork 772
MNT: Simplify parallel calls #632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: DerWeh <[email protected]>
Signed-off-by: DerWeh <[email protected]>
Signed-off-by: DerWeh <[email protected]>
Signed-off-by: DerWeh <[email protected]>
Signed-off-by: DerWeh <[email protected]>
|
Hi @DerWeh -- With regards to the shared memory, we use it as a signaling method between the child processes for the new callback parameter. If any of the child processes indicate fitting should stop, all child processes observe the same boolean in the shared memory block, and exit when they observe this. At one point I tried moving the bigger buffers into shared memory as well because that would have saved some memory, but it turns out docker severely limits the amount of shared memory allowed on a system. The single boolean that we use for signaling isn't an issue though. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #632 +/- ##
==========================================
- Coverage 73.17% 73.14% -0.03%
==========================================
Files 74 74
Lines 10552 10531 -21
==========================================
- Hits 7721 7703 -18
+ Misses 2831 2828 -3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thanks for the hint @paulbkoch
Yes, I already saw this. I think probably using a Manager creating a boolean Value would be the idiomatic why to do this. Eventually, I would like to get rid of this entirely, but I haven't any suggestions yet. That's probably two more steps ahead...
So far, the results in #630 seem to indicate that for setups with large memory usage (large datasets) the majority of memory is required by To my knowledge on Linux (which is also used in Docker), multiprocessing employs copy-on-write. Thus, there is probably no memory savings on Linux by using shared memory. Per default, joblib automatically uses shared memory for multiprocessing (loky) using memory maps for you, see https://joblib.readthedocs.io/en/latest/parallel.html#working-with-numerical-data-in-shared-memory-memmapping. It even has a fallback:
So it might be usable in docker containers. Anyway, you can always use the This summarizes (or at least outlines) why I want to get rid of manually managing shared memory: we can profit from Furthermore, the code should become less complex. And honestly, EBMs are already complex enough. |
|
@DerWeh -- I already tried the Manager and boolean Value route (see 5f200c8). It was A LOT slower. I think the Value uses an OS synchronization primitive, which required locking anytime someone wanted to check the value. With shared memory there isn't any locking required since all threads can just read the boolean value, and the only legal transition for the bool is to signal termination. It would also be possible to pass the boolean into C++ if that became useful, although we don't currently use that. I did think a bit about the distributed scenario, and I decided the simple option would be to not support the feature that callbacks can terminate all processing nodes. Given that we only parallelize across outer_bags currently, it seems like this wouldn't be a huge limitation in a well-resourced distributed system, because I'd assume processing would start on all outer bags simultaneously. I believe the current code won't work on dask/ray, but I haven't checked. We could make it work in the simple case by simply creating a numpy array instead of using shared memory if the shutdown signal won't be shared. I'm open to other ideas on this. In the ideal world we'd be able to pass a message on termination, but I'm not sure if that's supported in most frameworks since MPI is hard to do properly. |
|
On the large dataset thing, yes, I agree it's a minor benefit, so I wasn't too unhappy to back-out the shared memory change and just use joblib for that. I estimated the benefit would be about 5% on most workloads (approx 1/outer_bags). joblib does use shared memory, but it accepts non-aligned numpy memory blocks, which requires making a copy into an OS aligned memory block. In theory we could have skipped that extra copy and put the allocation into an aligned block from the start. |
Thanks for the early feedback. As far as I see, you used a locking value, have you also tried if What have you used to check the performance? The tests
I agree. Please correct me if I am wrong, but probably the reason for this "synchronized" callback is to create the timeouts when there are fewer jobs than bags (probably hinted in 575) The hard part is probably finding out whether we are in a distributed scenario, as the form of parallelism is abstracted away by
I remember using Thanks for all the useful discussions. I'll include the idea's in the follow-up PR after this one is merged. |
|
I didn't try a manager with lock=False. The docs don't say how it's implemented. If it's using shared memory under the hood, it could be fast. The docs do say that the manager approach should work across machines, which would be nice. I think in that specific case there would need be a cross-machine call though, which I'd expect to be slow, but perhaps the slowdown is fine in a larger distributed network. We'd have to see. I'd say, if it's fast locally across processes using lock=False, then there's no reason not to use that instead of shared memory. Users on distributed networks can use the callback or not depending on how much of a slowdown they can tolerate. Would be good to have a better solution over the network, but I'm not sure there's a generally great solution for many of these distributed frameworks that lack message passing. I noticed the slowdown in the synthetic dataset notebook (https://github.com/interpretml/interpret/blob/main/docs/interpret/python/examples/interpretable-regression-synthetic.ipynb). Observing the slowdown requires setting a callback. I tried it with a ridiculously long timeout since otherwise it'll just end at the timeout. |
|
Looks great. Thank you @DerWeh! |
|
@paulbkoch I tried using The only example I found was in the following testcase: def callback_generator(seconds):
class Callback:
def __init__(self, seconds):
self._seconds = seconds
def __call__(self, bag_index, step_index, progress, metric):
import time
if not hasattr(self, "_end_time"):
self._end_time = time.monotonic() + self._seconds
return False
else:
return time.monotonic() > self._end_time
return Callback(seconds)
X, y, names, types = make_synthetic(seed=42, output_type="float", n_samples=10000)
# run for half a second
ebm = ExplainableBoostingClassifier(names, types, callback=callback_generator(0.5))
ebm.fit(X, y)This seems like a global timeout, where all bags abort, after the bag started first runs into a timeout Instead, you would use time_out = 0.5
def callback(bag_index, step_index, progress, metric, start_time):
import time
return time.monotonic() > start_time + time_outThe only issue, I see, is that in a distributed setup this could lead to strange behavior if the time of the machines is out of sync. Another option, is starting the clock right away (not only after the first boosting iteration) and move the callback from the class to the function: def timeout_factory(time_out):
import time
end_time = time.monotonic() + time_out
def callback(bag_index, step_index, progress, metric, start_time):
import time
return time.monotonic() > end_time
return callback
ebm = ExplainableBoostingClassifier(names, types)
ebm.fit(X, y, callback=timeout_factory(time_out=0.5)In this case, the clock would start ticking just before we start the fitting routine. Again there might be strange behavior if the time of machines is out of sync. |
|
@DerWeh, I have to admit, I wasn't fond of the current callback mechanism. I implemented it mainly to get EBMs into AutoGluon where timeouts are either required or highly desirable (https://github.com/autogluon/autogluon/blob/4f492749be1b289333da5d29100985fd5b083f35/tabular/src/autogluon/tabular/models/ebm/ebm_model.py#L19-L30). Is using dask/ray the only reason to avoid using shared memory? I've spent quite a bit of time thinking through the parallelism model, and I can't think of a good way to significantly improve parallelization using these frameworks. There is a way to significantly speed things up using message passing interfaces. XGBoost and LightGBM seem to have the same problems. I think the EBM solution will look remarkably similar to how those packages implemented distributed parallelism. |
Currently, it's just an itch I try to scratch. The solution doesn't strike me as elegant, so I try to come up with something less complex. But finding a practical solution for your problem is quite hard... In the future, I'll probably have to consider distributed computation. So far, distributing each bag to a separate machine is the only idea I have to fit large datasets... |
|
The best method I've thought of to massively parallelize the work is to split up the dataset into a number of chunks, then assign each chunk to a processing node, and then compute the histogram statistics on each processing node, followed by a reduce operation to add the histograms from the separate nodes together. The reduce operation takes log N time, so in theory it should scale to practically any size dataset. But this will require a fast interconnect between the nodes since we have so many serial boosting steps. It is essentially the same method employed by XGBoost/LightGBM/catboost. I've done some of the work to enable this internally, but there's still quite a bit to do. |
Instead of generating lists of arguments, dispatching the parallel functions, and then deleting the argument lists, the functions are directly called passing arguments on demand. This should be clearer.
Furthermore, we create a partial to avoid repeating common function arguments. This reduces code duplications and makes differences to the boost calls clearer.
This is a minor refactor to prepare a PR which hopefully eliminates manually managing shared memory. I previously failed with big PRs (#578 isn't abandoned yet, just haven't managed yet to work my way through
_harmonize_tensor), so I'm trying a more iterative approach now.