Browse Source

feat: migrate protocol module to NetworkService (Part 6) (#18223)

* fix: start node strem asyncly

* fix: headers value may be a list

* fix: simply destruct on finish/error

* fix: class may destruct immediately after subscribing "data"

* fix: send meaningful error

* fix: must always provide a response body

* fix: handle the case when one write can not write all data

* fix: handle connection error
Cheng Zhao 6 years ago
parent
commit
326215e1f1

+ 25 - 2
atom/browser/net/atom_url_loader_factory.cc

@@ -103,10 +103,21 @@ network::ResourceResponseHead ToResponseHead(const mate::Dictionary& dict) {
   base::DictionaryValue headers;
   if (dict.Get("headers", &headers)) {
     for (const auto& iter : headers.DictItems()) {
-      head.headers->AddHeader(iter.first + ": " + iter.second.GetString());
+      if (iter.second.is_string()) {
+        // key: value
+        head.headers->AddHeader(iter.first + ": " + iter.second.GetString());
+      } else if (iter.second.is_list()) {
+        // key: [values...]
+        for (const auto& item : iter.second.GetList()) {
+          if (item.is_string())
+            head.headers->AddHeader(iter.first + ": " + item.GetString());
+        }
+      } else {
+        continue;
+      }
       // Some apps are passing content-type via headers, which is not accepted
       // in NetworkService.
-      if (iter.first == "content-type")
+      if (iter.first == "content-type" && iter.second.is_string())
         head.mime_type = iter.second.GetString();
     }
   }
@@ -339,7 +350,19 @@ void AtomURLLoaderFactory::StartLoadingStream(
   } else if (stream->IsNullOrUndefined()) {
     // "data" was explicitly passed as null or undefined, assume the user wants
     // to send an empty body.
+    //
+    // Note that We must submit a empty body otherwise NetworkService would
+    // crash.
     client->OnReceiveResponse(head);
+    mojo::ScopedDataPipeProducerHandle producer;
+    mojo::ScopedDataPipeConsumerHandle consumer;
+    if (mojo::CreateDataPipe(nullptr, &producer, &consumer) != MOJO_RESULT_OK) {
+      client->OnComplete(
+          network::URLLoaderCompletionStatus(net::ERR_INSUFFICIENT_RESOURCES));
+      return;
+    }
+    producer.reset();  // The data pipe is empty.
+    client->OnStartLoadingResponseBody(std::move(consumer));
     client->OnComplete(network::URLLoaderCompletionStatus(net::OK));
     return;
   } else if (!stream->IsObject()) {

+ 83 - 60
atom/browser/net/node_stream_loader.cc

@@ -18,32 +18,19 @@ NodeStreamLoader::NodeStreamLoader(network::ResourceResponseHead head,
                                    network::mojom::URLLoaderClientPtr client,
                                    v8::Isolate* isolate,
                                    v8::Local<v8::Object> emitter)
-    : binding_(this),
+    : binding_(this, std::move(loader)),
       client_(std::move(client)),
       isolate_(isolate),
       emitter_(isolate, emitter),
       weak_factory_(this) {
-  auto weak = weak_factory_.GetWeakPtr();
-  binding_.Bind(std::move(loader));
   binding_.set_connection_error_handler(
-      base::BindOnce(&NodeStreamLoader::OnConnectionError, weak));
-
-  mojo::ScopedDataPipeConsumerHandle consumer;
-  MojoResult rv = mojo::CreateDataPipe(nullptr, &producer_, &consumer);
-  if (rv != MOJO_RESULT_OK) {
-    OnError(nullptr);
-    return;
-  }
-
-  client_->OnReceiveResponse(head);
-  client_->OnStartLoadingResponseBody(std::move(consumer));
+      base::BindOnce(&NodeStreamLoader::NotifyComplete,
+                     weak_factory_.GetWeakPtr(), net::ERR_FAILED));
 
-  On("end", base::BindRepeating(&NodeStreamLoader::OnEnd, weak));
-  On("error", base::BindRepeating(&NodeStreamLoader::OnError, weak));
-  // Since every node::MakeCallback call has a micro scope itself, we have to
-  // subscribe |data| at last otherwise |end|'s listener won't be called when
-  // it is emitted in the same tick.
-  On("data", base::BindRepeating(&NodeStreamLoader::OnData, weak));
+  // PostTask since it might destruct.
+  base::SequencedTaskRunnerHandle::Get()->PostTask(
+      FROM_HERE, base::BindOnce(&NodeStreamLoader::Start,
+                                weak_factory_.GetWeakPtr(), std::move(head)));
 }
 
 NodeStreamLoader::~NodeStreamLoader() {
@@ -58,62 +45,98 @@ NodeStreamLoader::~NodeStreamLoader() {
     node::MakeCallback(isolate_, emitter_.Get(isolate_), "removeListener",
                        node::arraysize(args), args, {0, 0});
   }
+
+  // Release references.
+  emitter_.Reset();
+  buffer_.Reset();
 }
 
-void NodeStreamLoader::On(const char* event, EventCallback callback) {
-  v8::Locker locker(isolate_);
-  v8::Isolate::Scope isolate_scope(isolate_);
-  v8::HandleScope handle_scope(isolate_);
+void NodeStreamLoader::Start(network::ResourceResponseHead head) {
+  mojo::ScopedDataPipeProducerHandle producer;
+  mojo::ScopedDataPipeConsumerHandle consumer;
+  MojoResult rv = mojo::CreateDataPipe(nullptr, &producer, &consumer);
+  if (rv != MOJO_RESULT_OK) {
+    NotifyComplete(net::ERR_INSUFFICIENT_RESOURCES);
+    return;
+  }
 
-  // emitter.on(event, callback)
-  v8::Local<v8::Value> args[] = {
-      mate::StringToV8(isolate_, event),
-      mate::CallbackToV8(isolate_, std::move(callback)),
-  };
-  node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
-                     node::arraysize(args), args, {0, 0});
+  producer_ =
+      std::make_unique<mojo::StringDataPipeProducer>(std::move(producer));
 
-  handlers_[event].Reset(isolate_, args[1]);
+  client_->OnReceiveResponse(head);
+  client_->OnStartLoadingResponseBody(std::move(consumer));
+
+  auto weak = weak_factory_.GetWeakPtr();
+  On("end",
+     base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
+  On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
+                                  net::ERR_FAILED));
+  On("readable", base::BindRepeating(&NodeStreamLoader::ReadMore, weak));
 }
 
-void NodeStreamLoader::OnData(mate::Arguments* args) {
-  v8::Local<v8::Value> buffer;
-  args->GetNext(&buffer);
-  if (!node::Buffer::HasInstance(buffer)) {
-    args->ThrowError("data must be Buffer");
+void NodeStreamLoader::NotifyComplete(int result) {
+  // Wait until write finishes or fails.
+  if (is_writing_) {
+    ended_ = true;
+    result_ = result;
     return;
   }
 
-  size_t ssize = node::Buffer::Length(buffer);
-  uint32_t size = base::saturated_cast<uint32_t>(ssize);
-  MojoResult result = producer_->WriteData(node::Buffer::Data(buffer), &size,
-                                           MOJO_WRITE_DATA_FLAG_NONE);
-  if (result != MOJO_RESULT_OK || size < ssize) {
-    OnError(nullptr);
-    return;
-  }
+  client_->OnComplete(network::URLLoaderCompletionStatus(result));
+  delete this;
 }
 
-void NodeStreamLoader::OnEnd(mate::Arguments* args) {
-  client_->OnComplete(network::URLLoaderCompletionStatus(net::OK));
-  client_.reset();
-  MaybeDeleteSelf();
-}
+void NodeStreamLoader::ReadMore() {
+  // buffer = emitter.read()
+  v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
+      isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});
 
-void NodeStreamLoader::OnError(mate::Arguments* args) {
-  client_->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
-  client_.reset();
-  MaybeDeleteSelf();
+  // If there is no buffer read, wait until |readable| is emitted again.
+  v8::Local<v8::Value> buffer;
+  if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer))
+    return;
+
+  // Hold the buffer until the write is done.
+  buffer_.Reset(isolate_, buffer);
+
+  // Write buffer to mojo pipe asyncronously.
+  is_writing_ = true;
+  producer_->Write(
+      base::StringPiece(node::Buffer::Data(buffer),
+                        node::Buffer::Length(buffer)),
+      mojo::StringDataPipeProducer::AsyncWritingMode::
+          STRING_STAYS_VALID_UNTIL_COMPLETION,
+      base::BindOnce(&NodeStreamLoader::DidWrite, weak_factory_.GetWeakPtr()));
 }
 
-void NodeStreamLoader::OnConnectionError() {
-  binding_.Close();
-  MaybeDeleteSelf();
+void NodeStreamLoader::DidWrite(MojoResult result) {
+  is_writing_ = false;
+  // We were told to end streaming.
+  if (ended_) {
+    NotifyComplete(result_);
+    return;
+  }
+
+  if (result == MOJO_RESULT_OK)
+    ReadMore();
+  else
+    NotifyComplete(net::ERR_FAILED);
 }
 
-void NodeStreamLoader::MaybeDeleteSelf() {
-  if (!binding_.is_bound() && !client_.is_bound())
-    delete this;
+void NodeStreamLoader::On(const char* event, EventCallback callback) {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+
+  // emitter.on(event, callback)
+  v8::Local<v8::Value> args[] = {
+      mate::StringToV8(isolate_, event),
+      mate::CallbackToV8(isolate_, std::move(callback)),
+  };
+  handlers_[event].Reset(isolate_, args[1]);
+  node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
+                     node::arraysize(args), args, {0, 0});
+  // No more code bellow, as this class may destruct when subscribing.
 }
 
 }  // namespace atom

+ 30 - 20
atom/browser/net/node_stream_loader.h

@@ -6,19 +6,25 @@
 #define ATOM_BROWSER_NET_NODE_STREAM_LOADER_H_
 
 #include <map>
+#include <memory>
 #include <string>
 #include <vector>
 
 #include "mojo/public/cpp/bindings/strong_binding.h"
+#include "mojo/public/cpp/system/string_data_pipe_producer.h"
 #include "services/network/public/mojom/url_loader.mojom.h"
 #include "v8/include/v8.h"
 
-namespace mate {
-class Arguments;
-}
-
 namespace atom {
 
+// Read data from node Stream and feed it to NetworkService.
+//
+// This class manages its own lifetime and should delete itself when the
+// connection is lost or finished.
+//
+// We use |paused mode| to read data from |Readable| stream, so we don't need to
+// copy data from buffer and hold it in memory, and we only need to make sure
+// the passed |Buffer| is alive while writing data to pipe.
 class NodeStreamLoader : public network::mojom::URLLoader {
  public:
   NodeStreamLoader(network::ResourceResponseHead head,
@@ -30,7 +36,15 @@ class NodeStreamLoader : public network::mojom::URLLoader {
  private:
   ~NodeStreamLoader() override;
 
-  using EventCallback = base::RepeatingCallback<void(mate::Arguments* args)>;
+  using EventCallback = base::RepeatingCallback<void()>;
+
+  void Start(network::ResourceResponseHead head);
+  void NotifyComplete(int result);
+  void ReadMore();
+  void DidWrite(MojoResult result);
+
+  // Subscribe to events of |emitter|.
+  void On(const char* event, EventCallback callback);
 
   // URLLoader:
   void FollowRedirect(const std::vector<std::string>& removed_headers,
@@ -42,27 +56,23 @@ class NodeStreamLoader : public network::mojom::URLLoader {
   void PauseReadingBodyFromNet() override {}
   void ResumeReadingBodyFromNet() override {}
 
-  // JS bindings.
-  void On(const char* event, EventCallback callback);
-  void OnData(mate::Arguments* args);
-  void OnEnd(mate::Arguments* args);
-  void OnError(mate::Arguments* args);
-
-  // This class manages its own lifetime and should delete itself when the
-  // connection is lost or finished.
-  //
-  // The code is updated with `content::FileURLLoader`.
-  void OnConnectionError();
-  void MaybeDeleteSelf();
-
   mojo::Binding<network::mojom::URLLoader> binding_;
   network::mojom::URLLoaderClientPtr client_;
 
   v8::Isolate* isolate_;
   v8::Global<v8::Object> emitter_;
+  v8::Global<v8::Value> buffer_;
+
+  // Mojo data pipe where the data that is being read is written to.
+  std::unique_ptr<mojo::StringDataPipeProducer> producer_;
+
+  // Whether we are in the middle of write.
+  bool is_writing_ = false;
 
-  // Pipes for communicating between Node and NetworkService.
-  mojo::ScopedDataPipeProducerHandle producer_;
+  // When NotifyComplete is called while writing, we will save the result and
+  // quit with it after the write is done.
+  bool ended_ = false;
+  int result_ = net::OK;
 
   // Store the V8 callbacks to unsubscribe them later.
   std::map<std::string, v8::Global<v8::Value>> handlers_;