Browse Source

feat: migrate protocol module to NetworkService (Part 4) (#18084)

* Parse stream protocol handler

* Pipe node stream to mojo

* Merge the parser for headers

* Add ToDict helper to simplify code

* Simplify dispatching logic

* Add an experimental API for returning any type of response

* Fix subscribing event

* URL loaders' lifetime is independent of the factory

* HandleError helper is no longer needed

* Rename "SendResponse" => "StartLoading" to follow naming conventions

* Delete when connection error happens

* Fix cpplint warning
Cheng Zhao 6 years ago
parent
commit
0a6eb8afca

+ 2 - 0
atom/browser/api/atom_api_protocol_ns.cc

@@ -119,6 +119,8 @@ void ProtocolNS::BuildPrototype(v8::Isolate* isolate,
                  &ProtocolNS::RegisterProtocolFor<ProtocolType::kHttp>)
       .SetMethod("registerStreamProtocol",
                  &ProtocolNS::RegisterProtocolFor<ProtocolType::kStream>)
+      .SetMethod("registerProtocol",
+                 &ProtocolNS::RegisterProtocolFor<ProtocolType::kFree>)
       .SetMethod("unregisterProtocol", &ProtocolNS::UnregisterProtocol)
       .SetMethod("isProtocolRegistered", &ProtocolNS::IsProtocolRegistered)
       .SetMethod("isProtocolHandled", &ProtocolNS::IsProtocolHandled)

+ 212 - 137
atom/browser/net/atom_url_loader_factory.cc

@@ -9,6 +9,7 @@
 
 #include "atom/browser/api/atom_api_session.h"
 #include "atom/browser/atom_browser_context.h"
+#include "atom/browser/net/node_stream_loader.h"
 #include "atom/common/atom_constants.h"
 #include "atom/common/native_mate_converters/file_path_converter.h"
 #include "atom/common/native_mate_converters/gurl_converter.h"
@@ -18,20 +19,105 @@
 #include "content/public/browser/browser_thread.h"
 #include "content/public/browser/file_url_loader.h"
 #include "content/public/browser/storage_partition.h"
-#include "native_mate/dictionary.h"
 #include "net/base/filename_util.h"
+#include "net/http/http_status_code.h"
 #include "services/network/public/cpp/url_loader_completion_status.h"
-#include "services/network/public/mojom/url_loader.mojom.h"
+#include "services/network/public/mojom/url_loader_factory.mojom.h"
 
 #include "atom/common/node_includes.h"
 
 using content::BrowserThread;
 
+namespace mate {
+
+template <>
+struct Converter<atom::ProtocolType> {
+  static bool FromV8(v8::Isolate* isolate,
+                     v8::Local<v8::Value> val,
+                     atom::ProtocolType* out) {
+    std::string type;
+    if (!ConvertFromV8(isolate, val, &type))
+      return false;
+    if (type == "buffer")
+      *out = atom::ProtocolType::kBuffer;
+    else if (type == "string")
+      *out = atom::ProtocolType::kString;
+    else if (type == "file")
+      *out = atom::ProtocolType::kFile;
+    else if (type == "http")
+      *out = atom::ProtocolType::kHttp;
+    else if (type == "stream")
+      *out = atom::ProtocolType::kStream;
+    else  // note "free" is internal type, not allowed to be passed from user
+      return false;
+    return true;
+  }
+};
+
+}  // namespace mate
+
 namespace atom {
 
+namespace {
+
+// Determine whether a protocol type can accept non-object response.
+bool ResponseMustBeObject(ProtocolType type) {
+  switch (type) {
+    case ProtocolType::kString:
+    case ProtocolType::kFile:
+    case ProtocolType::kFree:
+      return false;
+    default:
+      return true;
+  }
+}
+
+// Helper to convert value to Dictionary.
+mate::Dictionary ToDict(v8::Isolate* isolate, v8::Local<v8::Value> value) {
+  if (value->IsObject())
+    return mate::Dictionary(
+        isolate,
+        value->ToObject(isolate->GetCurrentContext()).ToLocalChecked());
+  else
+    return mate::Dictionary();
+}
+
+// Parse headers from response object.
+network::ResourceResponseHead ToResponseHead(const mate::Dictionary& dict) {
+  network::ResourceResponseHead head;
+  head.mime_type = "text/html";
+  head.charset = "utf-8";
+  if (dict.IsEmpty())
+    return head;
+
+  int status_code = 200;
+  dict.Get("statusCode", &status_code);
+  head.headers = new net::HttpResponseHeaders(base::StringPrintf(
+      "HTTP/1.1 %d %s", status_code,
+      net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code))));
+
+  base::DictionaryValue headers;
+  if (dict.Get("headers", &headers)) {
+    if (!head.headers)
+      head.headers = new net::HttpResponseHeaders("HTTP/1.1 200 OK");
+    for (const auto& iter : headers.DictItems()) {
+      head.headers->AddHeader(iter.first + ": " + iter.second.GetString());
+      // Some apps are passing content-type via headers, which is not accepted
+      // in NetworkService.
+      if (iter.first == "content-type")
+        head.mime_type = iter.second.GetString();
+    }
+  }
+  dict.Get("mimeType", &head.mime_type);
+  dict.Get("charset", &head.charset);
+  return head;
+}
+
+}  // namespace
+
 AtomURLLoaderFactory::AtomURLLoaderFactory(ProtocolType type,
                                            const ProtocolHandler& handler)
-    : type_(type), handler_(handler), weak_factory_(this) {}
+    : type_(type), handler_(handler) {}
 
 AtomURLLoaderFactory::~AtomURLLoaderFactory() = default;
 
@@ -44,152 +130,140 @@ void AtomURLLoaderFactory::CreateLoaderAndStart(
     network::mojom::URLLoaderClientPtr client,
     const net::MutableNetworkTrafficAnnotationTag& traffic_annotation) {
   DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  handler_.Run(
+      request,
+      base::BindOnce(&AtomURLLoaderFactory::StartLoading, std::move(loader),
+                     routing_id, request_id, options, request,
+                     std::move(client), traffic_annotation, type_));
+}
 
-  v8::Isolate* isolate = v8::Isolate::GetCurrent();
-  v8::Locker locker(isolate);
-  v8::HandleScope handle_scope(isolate);
-  v8::Local<v8::Context> context = isolate->GetCurrentContext();
-  v8::Context::Scope context_scope(context);
+void AtomURLLoaderFactory::Clone(
+    network::mojom::URLLoaderFactoryRequest request) {
+  bindings_.AddBinding(this, std::move(request));
+}
+
+// static
+void AtomURLLoaderFactory::StartLoading(
+    network::mojom::URLLoaderRequest loader,
+    int32_t routing_id,
+    int32_t request_id,
+    uint32_t options,
+    const network::ResourceRequest& request,
+    network::mojom::URLLoaderClientPtr client,
+    const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
+    ProtocolType type,
+    v8::Local<v8::Value> response,
+    mate::Arguments* args) {
+  // Parse {error} object.
+  mate::Dictionary dict = ToDict(args->isolate(), response);
+  if (!dict.IsEmpty()) {
+    int error_code;
+    if (dict.Get("error", &error_code)) {
+      client->OnComplete(network::URLLoaderCompletionStatus(error_code));
+      return;
+    }
+  }
+
+  // Some protocol accepts non-object responses.
+  if (dict.IsEmpty() && ResponseMustBeObject(type)) {
+    client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
+    return;
+  }
 
-  switch (type_) {
+  switch (type) {
     case ProtocolType::kBuffer:
-      handler_.Run(request,
-                   base::BindOnce(&AtomURLLoaderFactory::SendResponseBuffer,
-                                  weak_factory_.GetWeakPtr(), std::move(client),
-                                  isolate));
+      StartLoadingBuffer(std::move(client), dict);
       break;
     case ProtocolType::kString:
-      handler_.Run(request,
-                   base::BindOnce(&AtomURLLoaderFactory::SendResponseString,
-                                  weak_factory_.GetWeakPtr(), std::move(client),
-                                  isolate));
+      StartLoadingString(std::move(client), dict, args->isolate(), response);
       break;
     case ProtocolType::kFile:
-      handler_.Run(request,
-                   base::BindOnce(&AtomURLLoaderFactory::SendResponseFile,
-                                  weak_factory_.GetWeakPtr(), std::move(loader),
-                                  request, std::move(client), isolate));
+      StartLoadingFile(std::move(loader), request, std::move(client), dict,
+                       args->isolate(), response);
       break;
     case ProtocolType::kHttp:
-      handler_.Run(
-          request,
-          base::BindOnce(&AtomURLLoaderFactory::SendResponseHttp,
-                         weak_factory_.GetWeakPtr(), std::move(loader),
-                         routing_id, request_id, options, request,
-                         std::move(client), traffic_annotation, isolate));
+      StartLoadingHttp(std::move(loader), routing_id, request_id, options,
+                       request, std::move(client), traffic_annotation, dict);
+      break;
+    case ProtocolType::kStream:
+      StartLoadingStream(std::move(loader), std::move(client), dict);
+      break;
+    case ProtocolType::kFree:
+      ProtocolType type;
+      v8::Local<v8::Value> extra_arg;
+      if (!mate::ConvertFromV8(args->isolate(), response, &type) ||
+          !args->GetNext(&extra_arg)) {
+        client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
+        args->ThrowError("Invalid args, must pass (type, options)");
+        return;
+      }
+      StartLoading(std::move(loader), routing_id, request_id, options, request,
+                   std::move(client), traffic_annotation, type, extra_arg,
+                   args);
       break;
-    default: {
-      std::string contents = "Not Implemented";
-      SendContents(std::move(client), "text/html", "utf-8", contents.data(),
-                   contents.size());
-    }
   }
 }
 
-void AtomURLLoaderFactory::Clone(
-    network::mojom::URLLoaderFactoryRequest request) {
-  bindings_.AddBinding(this, std::move(request));
-}
-
-void AtomURLLoaderFactory::SendResponseBuffer(
+// static
+void AtomURLLoaderFactory::StartLoadingBuffer(
     network::mojom::URLLoaderClientPtr client,
-    v8::Isolate* isolate,
-    v8::Local<v8::Value> response) {
-  if (HandleError(&client, isolate, response))
-    return;
-
-  std::string mime_type = "text/html";
-  std::string charset = "utf-8";
-  v8::Local<v8::Value> buffer;
-  if (node::Buffer::HasInstance(response)) {
-    buffer = response;
-  } else if (response->IsObject()) {
-    mate::Dictionary dict(
-        isolate,
-        response->ToObject(isolate->GetCurrentContext()).ToLocalChecked());
-    dict.Get("mimeType", &mime_type);
-    dict.Get("charset", &charset);
-    dict.Get("data", &buffer);
-    if (!node::Buffer::HasInstance(response))
-      buffer = v8::Local<v8::Value>();
-  }
-
-  if (buffer.IsEmpty()) {
-    network::URLLoaderCompletionStatus status;
-    status.error_code = net::ERR_NOT_IMPLEMENTED;
-    client->OnComplete(status);
+    const mate::Dictionary& dict) {
+  v8::Local<v8::Value> buffer = dict.GetHandle();
+  dict.Get("data", &buffer);
+  if (!node::Buffer::HasInstance(buffer)) {
+    client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
     return;
   }
 
-  SendContents(std::move(client), std::move(mime_type), std::move(charset),
+  SendContents(std::move(client), ToResponseHead(dict),
                node::Buffer::Data(buffer), node::Buffer::Length(buffer));
 }
 
-void AtomURLLoaderFactory::SendResponseString(
+// static
+void AtomURLLoaderFactory::StartLoadingString(
     network::mojom::URLLoaderClientPtr client,
+    const mate::Dictionary& dict,
     v8::Isolate* isolate,
     v8::Local<v8::Value> response) {
-  if (HandleError(&client, isolate, response))
-    return;
-
-  std::string mime_type = "text/html";
-  std::string charset = "utf-8";
   std::string contents;
-  if (response->IsString()) {
+  if (response->IsString())
     contents = gin::V8ToString(isolate, response);
-  } else if (response->IsObject()) {
-    mate::Dictionary dict(
-        isolate,
-        response->ToObject(isolate->GetCurrentContext()).ToLocalChecked());
-    dict.Get("mimeType", &mime_type);
-    dict.Get("charset", &charset);
+  else if (!dict.IsEmpty())
     dict.Get("data", &contents);
-  }
-  SendContents(std::move(client), std::move(mime_type), std::move(charset),
-               contents.data(), contents.size());
+
+  SendContents(std::move(client), ToResponseHead(dict), contents.data(),
+               contents.size());
 }
 
-void AtomURLLoaderFactory::SendResponseFile(
+// static
+void AtomURLLoaderFactory::StartLoadingFile(
     network::mojom::URLLoaderRequest loader,
     network::ResourceRequest request,
     network::mojom::URLLoaderClientPtr client,
+    const mate::Dictionary& dict,
     v8::Isolate* isolate,
     v8::Local<v8::Value> response) {
-  if (HandleError(&client, isolate, response))
-    return;
-
   base::FilePath path;
-  scoped_refptr<net::HttpResponseHeaders> response_headers;
   if (mate::ConvertFromV8(isolate, response, &path)) {
     request.url = net::FilePathToFileURL(path);
-  } else if (response->IsObject()) {
-    mate::Dictionary dict(
-        isolate,
-        response->ToObject(isolate->GetCurrentContext()).ToLocalChecked());
+  } else if (!dict.IsEmpty()) {
     dict.Get("referrer", &request.referrer);
     dict.Get("method", &request.method);
     if (dict.Get("path", &path))
       request.url = net::FilePathToFileURL(path);
-    base::DictionaryValue headers;
-    if (dict.Get("headers", &headers)) {
-      response_headers = new net::HttpResponseHeaders("HTTP/1.1 200 OK");
-      response_headers->AddHeader(kCORSHeader);
-      for (const auto& iter : headers.DictItems())
-        response_headers->AddHeader(iter.first + ": " +
-                                    iter.second.GetString());
-    }
   } else {
-    network::URLLoaderCompletionStatus status;
-    status.error_code = net::ERR_NOT_IMPLEMENTED;
-    client->OnComplete(status);
+    client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
     return;
   }
 
+  network::ResourceResponseHead head = ToResponseHead(dict);
+  head.headers->AddHeader(kCORSHeader);
   content::CreateFileURLLoader(request, std::move(loader), std::move(client),
-                               nullptr, false, response_headers);
+                               nullptr, false, head.headers);
 }
 
-void AtomURLLoaderFactory::SendResponseHttp(
+// static
+void AtomURLLoaderFactory::StartLoadingHttp(
     network::mojom::URLLoaderRequest loader,
     int32_t routing_id,
     int32_t request_id,
@@ -197,25 +271,11 @@ void AtomURLLoaderFactory::SendResponseHttp(
     const network::ResourceRequest& original_request,
     network::mojom::URLLoaderClientPtr client,
     const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
-    v8::Isolate* isolate,
-    v8::Local<v8::Value> response) {
-  if (HandleError(&client, isolate, response))
-    return;
-
-  if (!response->IsObject()) {
-    network::URLLoaderCompletionStatus status;
-    status.error_code = net::ERR_NOT_IMPLEMENTED;
-    client->OnComplete(status);
-    return;
-  }
-
+    const mate::Dictionary& dict) {
   network::ResourceRequest request;
   request.headers = original_request.headers;
   request.cors_exempt_headers = original_request.cors_exempt_headers;
 
-  mate::Dictionary dict(
-      isolate,
-      response->ToObject(isolate->GetCurrentContext()).ToLocalChecked());
   dict.Get("url", &request.url);
   dict.Get("referrer", &request.referrer);
   if (!dict.Get("method", &request.method))
@@ -229,7 +289,8 @@ void AtomURLLoaderFactory::SendResponseHttp(
       browser_context = AtomBrowserContext::From(base::GenerateGUID(), true);
     } else {
       mate::Handle<api::Session> session;
-      if (mate::ConvertFromV8(isolate, value, &session) && !session.IsEmpty()) {
+      if (mate::ConvertFromV8(dict.isolate(), value, &session) &&
+          !session.IsEmpty()) {
         browser_context = session->browser_context();
       }
     }
@@ -243,25 +304,43 @@ void AtomURLLoaderFactory::SendResponseHttp(
       std::move(client), traffic_annotation);
 }
 
-bool AtomURLLoaderFactory::HandleError(
-    network::mojom::URLLoaderClientPtr* client,
-    v8::Isolate* isolate,
-    v8::Local<v8::Value> response) {
-  if (!response->IsObject())
-    return false;
-  v8::Local<v8::Object> obj =
-      response->ToObject(isolate->GetCurrentContext()).ToLocalChecked();
-  network::URLLoaderCompletionStatus status;
-  if (!mate::Dictionary(isolate, obj).Get("error", &status.error_code))
-    return false;
-  std::move(*client)->OnComplete(status);
-  return true;
+// static
+void AtomURLLoaderFactory::StartLoadingStream(
+    network::mojom::URLLoaderRequest loader,
+    network::mojom::URLLoaderClientPtr client,
+    const mate::Dictionary& dict) {
+  network::ResourceResponseHead head = ToResponseHead(dict);
+  v8::Local<v8::Value> stream;
+  if (!dict.Get("data", &stream)) {
+    // Assume the opts is already a stream.
+    stream = dict.GetHandle();
+  } else if (stream->IsNullOrUndefined()) {
+    // "data" was explicitly passed as null or undefined, assume the user wants
+    // to send an empty body.
+    client->OnReceiveResponse(head);
+    client->OnComplete(network::URLLoaderCompletionStatus(net::OK));
+    return;
+  } else if (!stream->IsObject()) {
+    client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
+    return;
+  }
+
+  mate::Dictionary data = ToDict(dict.isolate(), stream);
+  v8::Local<v8::Value> method;
+  if (!data.Get("on", &method) || !method->IsFunction() ||
+      !data.Get("removeListener", &method) || !method->IsFunction()) {
+    client->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
+    return;
+  }
+
+  new NodeStreamLoader(std::move(head), std::move(loader), std::move(client),
+                       data.isolate(), data.GetHandle());
 }
 
+// static
 void AtomURLLoaderFactory::SendContents(
     network::mojom::URLLoaderClientPtr client,
-    std::string mime_type,
-    std::string charset,
+    network::ResourceResponseHead head,
     const char* data,
     size_t ssize) {
   uint32_t size = base::saturated_cast<uint32_t>(ssize);
@@ -273,10 +352,6 @@ void AtomURLLoaderFactory::SendContents(
     return;
   }
 
-  network::ResourceResponseHead head;
-  head.mime_type = std::move(mime_type);
-  head.charset = std::move(charset);
-  head.headers = new net::HttpResponseHeaders("HTTP/1.1 200 OK");
   head.headers->AddHeader(kCORSHeader);
   client->OnReceiveResponse(head);
   client->OnStartLoadingResponseBody(std::move(pipe.consumer_handle));

+ 37 - 28
atom/browser/net/atom_url_loader_factory.h

@@ -7,11 +7,10 @@
 
 #include <string>
 
-#include "base/memory/weak_ptr.h"
 #include "mojo/public/cpp/bindings/binding_set.h"
+#include "native_mate/dictionary.h"
 #include "net/url_request/url_request_job_factory.h"
 #include "services/network/public/mojom/url_loader_factory.mojom.h"
-#include "v8/include/v8.h"
 
 namespace atom {
 
@@ -25,9 +24,10 @@ enum class ProtocolType {
   kFree,  // special type for returning arbitrary type of response.
 };
 
-using SendResponseCallback = base::OnceCallback<void(v8::Local<v8::Value>)>;
+using StartLoadingCallback =
+    base::OnceCallback<void(v8::Local<v8::Value>, mate::Arguments*)>;
 using ProtocolHandler =
-    base::Callback<void(const network::ResourceRequest&, SendResponseCallback)>;
+    base::Callback<void(const network::ResourceRequest&, StartLoadingCallback)>;
 
 // Implementation of URLLoaderFactory.
 class AtomURLLoaderFactory : public network::mojom::URLLoaderFactory {
@@ -47,18 +47,30 @@ class AtomURLLoaderFactory : public network::mojom::URLLoaderFactory {
   void Clone(network::mojom::URLLoaderFactoryRequest request) override;
 
  private:
-  void SendResponseBuffer(network::mojom::URLLoaderClientPtr client,
-                          v8::Isolate* isolate,
-                          v8::Local<v8::Value> response);
-  void SendResponseString(network::mojom::URLLoaderClientPtr client,
-                          v8::Isolate* isolate,
-                          v8::Local<v8::Value> response);
-  void SendResponseFile(network::mojom::URLLoaderRequest loader,
-                        network::ResourceRequest request,
-                        network::mojom::URLLoaderClientPtr client,
-                        v8::Isolate* isolate,
-                        v8::Local<v8::Value> response);
-  void SendResponseHttp(
+  static void StartLoading(
+      network::mojom::URLLoaderRequest loader,
+      int32_t routing_id,
+      int32_t request_id,
+      uint32_t options,
+      const network::ResourceRequest& request,
+      network::mojom::URLLoaderClientPtr client,
+      const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
+      ProtocolType type,
+      v8::Local<v8::Value> response,
+      mate::Arguments* args);
+  static void StartLoadingBuffer(network::mojom::URLLoaderClientPtr client,
+                                 const mate::Dictionary& dict);
+  static void StartLoadingString(network::mojom::URLLoaderClientPtr client,
+                                 const mate::Dictionary& dict,
+                                 v8::Isolate* isolate,
+                                 v8::Local<v8::Value> response);
+  static void StartLoadingFile(network::mojom::URLLoaderRequest loader,
+                               network::ResourceRequest request,
+                               network::mojom::URLLoaderClientPtr client,
+                               const mate::Dictionary& dict,
+                               v8::Isolate* isolate,
+                               v8::Local<v8::Value> response);
+  static void StartLoadingHttp(
       network::mojom::URLLoaderRequest loader,
       int32_t routing_id,
       int32_t request_id,
@@ -66,17 +78,16 @@ class AtomURLLoaderFactory : public network::mojom::URLLoaderFactory {
       const network::ResourceRequest& original_request,
       network::mojom::URLLoaderClientPtr client,
       const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
-      v8::Isolate* isolate,
-      v8::Local<v8::Value> response);
+      const mate::Dictionary& dict);
+  static void StartLoadingStream(network::mojom::URLLoaderRequest loader,
+                                 network::mojom::URLLoaderClientPtr client,
+                                 const mate::Dictionary& dict);
 
-  bool HandleError(network::mojom::URLLoaderClientPtr* client,
-                   v8::Isolate* isolate,
-                   v8::Local<v8::Value> response);
-  void SendContents(network::mojom::URLLoaderClientPtr client,
-                    std::string mime_type,
-                    std::string charset,
-                    const char* data,
-                    size_t size);
+  // Helper to send string as response.
+  static void SendContents(network::mojom::URLLoaderClientPtr client,
+                           network::ResourceResponseHead head,
+                           const char* data,
+                           size_t size);
 
   // TODO(zcbenz): This comes from extensions/browser/extension_protocols.cc
   // but I don't know what it actually does, find out the meanings of |Clone|
@@ -86,8 +97,6 @@ class AtomURLLoaderFactory : public network::mojom::URLLoaderFactory {
   ProtocolType type_;
   ProtocolHandler handler_;
 
-  base::WeakPtrFactory<AtomURLLoaderFactory> weak_factory_;
-
   DISALLOW_COPY_AND_ASSIGN(AtomURLLoaderFactory);
 };
 

+ 119 - 0
atom/browser/net/node_stream_loader.cc

@@ -0,0 +1,119 @@
+// Copyright (c) 2019 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#include "atom/browser/net/node_stream_loader.h"
+
+#include <utility>
+
+#include "atom/common/api/event_emitter_caller.h"
+#include "atom/common/native_mate_converters/callback.h"
+
+#include "atom/common/node_includes.h"
+
+namespace atom {
+
+NodeStreamLoader::NodeStreamLoader(network::ResourceResponseHead head,
+                                   network::mojom::URLLoaderRequest loader,
+                                   network::mojom::URLLoaderClientPtr client,
+                                   v8::Isolate* isolate,
+                                   v8::Local<v8::Object> emitter)
+    : binding_(this),
+      client_(std::move(client)),
+      isolate_(isolate),
+      emitter_(isolate, emitter),
+      weak_factory_(this) {
+  auto weak = weak_factory_.GetWeakPtr();
+  binding_.Bind(std::move(loader));
+  binding_.set_connection_error_handler(
+      base::BindOnce(&NodeStreamLoader::OnConnectionError, weak));
+
+  mojo::ScopedDataPipeConsumerHandle consumer;
+  MojoResult rv = mojo::CreateDataPipe(nullptr, &producer_, &consumer);
+  if (rv != MOJO_RESULT_OK) {
+    OnError(nullptr);
+    return;
+  }
+
+  client_->OnReceiveResponse(head);
+  client_->OnStartLoadingResponseBody(std::move(consumer));
+
+  On("end", base::BindRepeating(&NodeStreamLoader::OnEnd, weak));
+  On("error", base::BindRepeating(&NodeStreamLoader::OnError, weak));
+  // Since every node::MakeCallback call has a micro scope itself, we have to
+  // subscribe |data| at last otherwise |end|'s listener won't be called when
+  // it is emitted in the same tick.
+  On("data", base::BindRepeating(&NodeStreamLoader::OnData, weak));
+}
+
+NodeStreamLoader::~NodeStreamLoader() {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+
+  // Unsubscribe all handlers.
+  for (const auto& it : handlers_) {
+    v8::Local<v8::Value> args[] = {mate::StringToV8(isolate_, it.first),
+                                   it.second.Get(isolate_)};
+    node::MakeCallback(isolate_, emitter_.Get(isolate_), "removeListener",
+                       node::arraysize(args), args, {0, 0});
+  }
+}
+
+void NodeStreamLoader::On(const char* event, EventCallback callback) {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+
+  // emitter.on(event, callback)
+  v8::Local<v8::Value> args[] = {
+      mate::StringToV8(isolate_, event),
+      mate::CallbackToV8(isolate_, std::move(callback)),
+  };
+  node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
+                     node::arraysize(args), args, {0, 0});
+
+  handlers_[event].Reset(isolate_, args[1]);
+}
+
+void NodeStreamLoader::OnData(mate::Arguments* args) {
+  v8::Local<v8::Value> buffer;
+  args->GetNext(&buffer);
+  if (!node::Buffer::HasInstance(buffer)) {
+    args->ThrowError("data must be Buffer");
+    return;
+  }
+
+  size_t ssize = node::Buffer::Length(buffer);
+  uint32_t size = base::saturated_cast<uint32_t>(ssize);
+  MojoResult result = producer_->WriteData(node::Buffer::Data(buffer), &size,
+                                           MOJO_WRITE_DATA_FLAG_NONE);
+  if (result != MOJO_RESULT_OK || size < ssize) {
+    OnError(nullptr);
+    return;
+  }
+}
+
+void NodeStreamLoader::OnEnd(mate::Arguments* args) {
+  client_->OnComplete(network::URLLoaderCompletionStatus(net::OK));
+  client_.reset();
+  MaybeDeleteSelf();
+}
+
+void NodeStreamLoader::OnError(mate::Arguments* args) {
+  client_->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
+  client_.reset();
+  MaybeDeleteSelf();
+}
+
+void NodeStreamLoader::OnConnectionError() {
+  binding_.Close();
+  MaybeDeleteSelf();
+}
+
+void NodeStreamLoader::MaybeDeleteSelf() {
+  if (!binding_.is_bound() && !client_.is_bound())
+    delete this;
+}
+
+}  // namespace atom

+ 77 - 0
atom/browser/net/node_stream_loader.h

@@ -0,0 +1,77 @@
+// Copyright (c) 2019 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#ifndef ATOM_BROWSER_NET_NODE_STREAM_LOADER_H_
+#define ATOM_BROWSER_NET_NODE_STREAM_LOADER_H_
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "mojo/public/cpp/bindings/strong_binding.h"
+#include "services/network/public/mojom/url_loader.mojom.h"
+#include "v8/include/v8.h"
+
+namespace mate {
+class Arguments;
+}
+
+namespace atom {
+
+class NodeStreamLoader : public network::mojom::URLLoader {
+ public:
+  NodeStreamLoader(network::ResourceResponseHead head,
+                   network::mojom::URLLoaderRequest loader,
+                   network::mojom::URLLoaderClientPtr client,
+                   v8::Isolate* isolate,
+                   v8::Local<v8::Object> emitter);
+
+ private:
+  ~NodeStreamLoader() override;
+
+  using EventCallback = base::RepeatingCallback<void(mate::Arguments* args)>;
+
+  // URLLoader:
+  void FollowRedirect(const std::vector<std::string>& removed_headers,
+                      const net::HttpRequestHeaders& modified_headers,
+                      const base::Optional<GURL>& new_url) override {}
+  void ProceedWithResponse() override {}
+  void SetPriority(net::RequestPriority priority,
+                   int32_t intra_priority_value) override {}
+  void PauseReadingBodyFromNet() override {}
+  void ResumeReadingBodyFromNet() override {}
+
+  // JS bindings.
+  void On(const char* event, EventCallback callback);
+  void OnData(mate::Arguments* args);
+  void OnEnd(mate::Arguments* args);
+  void OnError(mate::Arguments* args);
+
+  // This class manages its own lifetime and should delete itself when the
+  // connection is lost or finished.
+  //
+  // The code is updated with `content::FileURLLoader`.
+  void OnConnectionError();
+  void MaybeDeleteSelf();
+
+  mojo::Binding<network::mojom::URLLoader> binding_;
+  network::mojom::URLLoaderClientPtr client_;
+
+  v8::Isolate* isolate_;
+  v8::Global<v8::Object> emitter_;
+
+  // Pipes for communicating between Node and NetworkService.
+  mojo::ScopedDataPipeProducerHandle producer_;
+
+  // Store the V8 callbacks to unsubscribe them later.
+  std::map<std::string, v8::Global<v8::Value>> handlers_;
+
+  base::WeakPtrFactory<NodeStreamLoader> weak_factory_;
+
+  DISALLOW_COPY_AND_ASSIGN(NodeStreamLoader);
+};
+
+}  // namespace atom
+
+#endif  // ATOM_BROWSER_NET_NODE_STREAM_LOADER_H_

+ 2 - 0
filenames.gni

@@ -340,6 +340,8 @@ filenames = {
     "atom/browser/net/network_context_service_factory.h",
     "atom/browser/net/network_context_service.cc",
     "atom/browser/net/network_context_service.h",
+    "atom/browser/net/node_stream_loader.cc",
+    "atom/browser/net/node_stream_loader.h",
     "atom/browser/net/require_ct_delegate.cc",
     "atom/browser/net/require_ct_delegate.h",
     "atom/browser/net/resolve_proxy_helper.cc",