Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerated DAG: Support dynamic resizing of shared memory channels #45323

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

jackhumphries
Copy link
Contributor

@jackhumphries jackhumphries commented May 14, 2024

Why are these changes needed?

In an accelerated DAG, all channels have their backing store specific upfront. However, this size may not be large enough for all objects that are communicated via channels, particularly since channels are reused across multiple DAG invocations.

This PR adds support for increasing the size of a shared memory channel backing store. Once the backing store size is increased, the size is maintained (i.e., it is not decreased) until it needs to be increased again.

Related issue number

Addresses one issue noted in #45597

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jackhumphries jackhumphries force-pushed the dynamic-size branch 2 times, most recently from c4702ce to ced81ea Compare May 14, 2024 06:29
@jackhumphries jackhumphries changed the title Dynamic size WIP: Dynamic size May 14, 2024
@jackhumphries jackhumphries changed the title WIP: Dynamic size Accelerated DAG: Support dynamic resizing of shared memory channels May 28, 2024
@jackhumphries jackhumphries self-assigned this May 28, 2024
@jackhumphries
Copy link
Contributor Author

This is ready for another pass.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 29, 2024
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python changes overall look good to me. I am still trying to understand the C++ changes.

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
@jackhumphries jackhumphries removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 29, 2024
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, very clean :)

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/tests/test_channel.py Show resolved Hide resolved
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still trying to understand the C++ parts.


io_services_.push_back(std::make_unique<instrumented_io_context>());
instrumented_io_context &io_service = *io_services_.back();
io_works_.push_back(std::make_unique<boost::asio::io_service::work>(io_service));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use io_context instead? io_service seems to be deprecated boostorg/asio#110.

std::vector<std::unique_ptr<boost::asio::io_service::work>> io_works_;
// Contexts in which the application looks for local changes to mutable objects and
// sends the changes to remote nodes via the network.
std::vector<std::unique_ptr<instrumented_io_context>> io_services_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to io_contexts_.

});
}

void MutableObjectProvider::RunIOService() {
void MutableObjectProvider::RunIOService(instrumented_io_context &io_service) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void MutableObjectProvider::RunIOService(instrumented_io_context &io_service) {
void MutableObjectProvider::RunIOContext(instrumented_io_context &io_context) {


io_services_.push_back(std::make_unique<instrumented_io_context>());
instrumented_io_context &io_service = *io_services_.back();
io_works_.push_back(std::make_unique<boost::asio::io_service::work>(io_service));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need boost::asio::io_service::work here? Where do we call work.reset()?

@@ -177,14 +183,19 @@ void MutableObjectProvider::PollWriterClosure(
object->GetData()->Size(),
object->GetMetadata()->Size(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raylet_client_factory_: Creates a function for each object. This function waits for changes on the object and then sends those changes to a remote node via RPC.

The above comment is from experimental_mutable_object_provider.h. Can you explain how does reader->PushMutableObject "sends those changes to a remote node via RPC"? Thanks!

Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants