Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_local backend and allow using an existing dview for local and async_local backends #311

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ethanbb
Copy link
Collaborator

@ethanbb ethanbb commented Aug 12, 2024

This is a feature I added for myself, and I think it may be useful for others so I'm offering it here.

Problem: mesmerize is currently inflexible with regard to how the parallel processing is set up. Running an item always opens a new multiprocessing pool; you can control the number of processes through the MESMERIZE_N_PROCESSES environment variable, but that's it. I wanted to have the ability to a) pass in an existing cluster to multiple runs, to save overhead, and/or b) use a different type of cluster (e.g. an ipyparallel cluster spanning multiple nodes, or even a load-balanced view).

Solution: Passing a pool or dview into the run function clearly won't work with a subprocess, so the subprocess and slurm backends are out. The local backend calls the function directly, but it has the disadvantage that it blocks, so only one gridsearch run can be done at a time. However, we can get around that by spawning a thread (again, not a subprocess; the cluster objects can't be pickled).

I added a new backend called "local_async," which just launches a local run in a thread using the concurrent.futures module from the standard library. I also made it possible to pass a dview into both the local and local_async backends. I factored out some boilerplate from the 3 algorithm files into a _utils.py file that launches a new multiprocessing pool if no dview was passed in (and closes it when finished), and otherwise just forwards what was passed in.

Finally, I added a test that compares results from the 3 non-SLURM backends.

The diff is not really as big as Github is saying; there are a lot of whitespace changes, since I added a context manager in the 3 algorithms files, but checking the box to hide whitespace changes shows something more reasonable.

Comment on lines -78 to -82
# in fname new load in memmap order C
cm.stop_server(dview=dview)
c, dview, n_processes = cm.cluster.setup_cluster(
backend="local", n_processes=None, single_thread=False
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what this part is for; it doesn't really work with the changes here, so I just deleted it, but we can try to do something else if it's important.

@kushalkolar
Copy link
Collaborator

Thanks! I haven't used the concurrent module in any of my own code so this will take me a while to review.

@ethanbb
Copy link
Collaborator Author

ethanbb commented Aug 12, 2024

Sure, no worries! I also haven't used that module before today. Thus I'm not 100% sure there won't be issues with it, but so far it seems pretty straightforward.

If it helps, I used this toy script to help convince myself that it's doing the right thing:

from concurrent.futures import ThreadPoolExecutor
from typing import Iterable
from multiprocessing.pool import Pool
import time
from caiman.cluster import setup_cluster, stop_server

def calc_square(x: int) -> int:
    return x ** 2


def calc_squares(xs: Iterable[int], pool: Pool):
    time.sleep(10)
    return pool.map_async(calc_square, xs).get()

if __name__ == '__main__':
    _, pool, n_procs = setup_cluster(backend="multiprocessing", n_processes=4)

    t0 = time.time()
    with ThreadPoolExecutor(max_workers=2) as executor:
        future0 = executor.submit(calc_squares, range(2), pool)
        future1 = executor.submit(calc_squares, range(2, 4), pool)
        
    print(future0.result())
    print(future1.result())
    
    t1 = time.time()
    print(f'Executed in {t1-t0:.2f} seconds')

    stop_server(dview=pool)

@kushalkolar
Copy link
Collaborator

This makes me think whether we can use it to easily run blocking calculations in the background and visualizing the results in real-time as they come in with fastplotlib 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants