node_stream_loader.cc 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // Copyright (c) 2019 GitHub, Inc.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. #include "shell/browser/net/node_stream_loader.h"
  5. #include <string_view>
  6. #include <utility>
  7. #include "mojo/public/cpp/system/string_data_source.h"
  8. #include "shell/common/gin_converters/callback_converter.h"
  9. #include "shell/common/node_includes.h"
  10. namespace electron {
  11. NodeStreamLoader::NodeStreamLoader(
  12. network::mojom::URLResponseHeadPtr head,
  13. mojo::PendingReceiver<network::mojom::URLLoader> loader,
  14. mojo::PendingRemote<network::mojom::URLLoaderClient> client,
  15. v8::Isolate* isolate,
  16. v8::Local<v8::Object> emitter)
  17. : url_loader_(this, std::move(loader)),
  18. client_(std::move(client)),
  19. isolate_(isolate),
  20. emitter_(isolate, emitter) {
  21. url_loader_.set_disconnect_handler(
  22. base::BindOnce(&NodeStreamLoader::NotifyComplete,
  23. weak_factory_.GetWeakPtr(), net::ERR_FAILED));
  24. Start(std::move(head));
  25. }
  26. NodeStreamLoader::~NodeStreamLoader() {
  27. v8::Isolate::Scope isolate_scope(isolate_);
  28. v8::HandleScope handle_scope(isolate_);
  29. // Unsubscribe all handlers.
  30. for (const auto& it : handlers_) {
  31. v8::Local<v8::Value> args[] = {gin::StringToV8(isolate_, it.first),
  32. it.second.Get(isolate_)};
  33. node::MakeCallback(isolate_, emitter_.Get(isolate_), "removeListener",
  34. node::arraysize(args), args, {0, 0});
  35. }
  36. // Destroy the stream if not already ended
  37. if (!ended_) {
  38. node::MakeCallback(isolate_, emitter_.Get(isolate_), "destroy", 0, nullptr,
  39. {0, 0});
  40. }
  41. }
  42. void NodeStreamLoader::Start(network::mojom::URLResponseHeadPtr head) {
  43. mojo::ScopedDataPipeProducerHandle producer;
  44. mojo::ScopedDataPipeConsumerHandle consumer;
  45. MojoResult rv = mojo::CreateDataPipe(nullptr, producer, consumer);
  46. if (rv != MOJO_RESULT_OK) {
  47. NotifyComplete(net::ERR_INSUFFICIENT_RESOURCES);
  48. return;
  49. }
  50. producer_ = std::make_unique<mojo::DataPipeProducer>(std::move(producer));
  51. client_->OnReceiveResponse(std::move(head), std::move(consumer),
  52. std::nullopt);
  53. auto weak = weak_factory_.GetWeakPtr();
  54. On("end",
  55. base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
  56. On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
  57. net::ERR_FAILED));
  58. On("readable", base::BindRepeating(&NodeStreamLoader::NotifyReadable, weak));
  59. }
  60. void NodeStreamLoader::NotifyReadable() {
  61. if (!readable_)
  62. ReadMore();
  63. else if (is_reading_)
  64. has_read_waiting_ = true;
  65. readable_ = true;
  66. }
  67. void NodeStreamLoader::NotifyComplete(int result) {
  68. // Wait until write finishes or fails.
  69. if (is_reading_ || is_writing_) {
  70. ended_ = true;
  71. result_ = result;
  72. return;
  73. }
  74. network::URLLoaderCompletionStatus status(result);
  75. status.completion_time = base::TimeTicks::Now();
  76. status.decoded_body_length = bytes_written_;
  77. client_->OnComplete(status);
  78. delete this;
  79. }
  80. void NodeStreamLoader::ReadMore() {
  81. if (is_reading_) {
  82. // Calling read() can trigger the "readable" event again, making this
  83. // function re-entrant. If we're already reading, we don't want to start
  84. // a nested read, so short-circuit.
  85. return;
  86. }
  87. is_reading_ = true;
  88. auto weak = weak_factory_.GetWeakPtr();
  89. v8::HandleScope scope(isolate_);
  90. // buffer = emitter.read()
  91. v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
  92. isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});
  93. DCHECK(weak) << "We shouldn't have been destroyed when calling read()";
  94. // If there is no buffer read, wait until |readable| is emitted again.
  95. v8::Local<v8::Value> buffer;
  96. if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer)) {
  97. is_reading_ = false;
  98. // If 'readable' was called after 'read()', try again
  99. if (has_read_waiting_) {
  100. has_read_waiting_ = false;
  101. ReadMore();
  102. return;
  103. }
  104. readable_ = false;
  105. if (ended_) {
  106. NotifyComplete(result_);
  107. }
  108. return;
  109. }
  110. // Hold the buffer until the write is done.
  111. buffer_.Reset(isolate_, buffer);
  112. bytes_written_ += node::Buffer::Length(buffer);
  113. // Write buffer to mojo pipe asynchronously.
  114. is_reading_ = false;
  115. is_writing_ = true;
  116. producer_->Write(std::make_unique<mojo::StringDataSource>(
  117. std::string_view{node::Buffer::Data(buffer),
  118. node::Buffer::Length(buffer)},
  119. mojo::StringDataSource::AsyncWritingMode::
  120. STRING_STAYS_VALID_UNTIL_COMPLETION),
  121. base::BindOnce(&NodeStreamLoader::DidWrite, weak));
  122. }
  123. void NodeStreamLoader::DidWrite(MojoResult result) {
  124. is_writing_ = false;
  125. // We were told to end streaming.
  126. if (ended_) {
  127. NotifyComplete(result_);
  128. return;
  129. }
  130. if (result == MOJO_RESULT_OK && readable_)
  131. ReadMore();
  132. else
  133. NotifyComplete(net::ERR_FAILED);
  134. }
  135. void NodeStreamLoader::On(const char* event, EventCallback callback) {
  136. v8::Isolate::Scope isolate_scope(isolate_);
  137. v8::HandleScope handle_scope(isolate_);
  138. // emitter.on(event, callback)
  139. v8::Local<v8::Value> args[] = {
  140. gin::StringToV8(isolate_, event),
  141. gin_helper::CallbackToV8Leaked(isolate_, std::move(callback)),
  142. };
  143. handlers_[event].Reset(isolate_, args[1]);
  144. node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
  145. node::arraysize(args), args, {0, 0});
  146. // No more code below, as this class may destruct when subscribing.
  147. }
  148. } // namespace electron