Browse Source

Merge pull request #11008 from electron/implement-stream-protocol

Implement {register,intercept}StreamProtocol
Charles Kerr 7 years ago
parent
commit
31172ecaa0

+ 5 - 0
atom/browser/api/atom_api_protocol.cc

@@ -10,6 +10,7 @@
 #include "atom/browser/net/url_request_async_asar_job.h"
 #include "atom/browser/net/url_request_buffer_job.h"
 #include "atom/browser/net/url_request_fetch_job.h"
+#include "atom/browser/net/url_request_stream_job.h"
 #include "atom/browser/net/url_request_string_job.h"
 #include "atom/common/native_mate_converters/callback.h"
 #include "atom/common/native_mate_converters/value_converter.h"
@@ -208,6 +209,8 @@ void Protocol::BuildPrototype(
                  &Protocol::RegisterProtocol<URLRequestAsyncAsarJob>)
       .SetMethod("registerHttpProtocol",
                  &Protocol::RegisterProtocol<URLRequestFetchJob>)
+      .SetMethod("registerStreamProtocol",
+                 &Protocol::RegisterProtocol<URLRequestStreamJob>)
       .SetMethod("unregisterProtocol", &Protocol::UnregisterProtocol)
       .SetMethod("isProtocolHandled", &Protocol::IsProtocolHandled)
       .SetMethod("interceptStringProtocol",
@@ -218,6 +221,8 @@ void Protocol::BuildPrototype(
                  &Protocol::InterceptProtocol<URLRequestAsyncAsarJob>)
       .SetMethod("interceptHttpProtocol",
                  &Protocol::InterceptProtocol<URLRequestFetchJob>)
+      .SetMethod("interceptStreamProtocol",
+                 &Protocol::InterceptProtocol<URLRequestStreamJob>)
       .SetMethod("uninterceptProtocol", &Protocol::UninterceptProtocol);
 }
 

+ 4 - 0
atom/browser/api/atom_api_protocol.h

@@ -78,6 +78,10 @@ class Protocol : public mate::TrackableObject<Protocol> {
     net::URLRequestJob* MaybeCreateJob(
         net::URLRequest* request,
         net::NetworkDelegate* network_delegate) const override {
+      if (!request->initiator().has_value()) {
+        // Don't intercept this request as it was created by `net.request`.
+        return nullptr;
+      }
       RequestJob* request_job = new RequestJob(request, network_delegate);
       request_job->SetHandlerInfo(isolate_, request_context_.get(), handler_);
       return request_job;

+ 121 - 0
atom/browser/api/event_subscriber.cc

@@ -0,0 +1,121 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+#include <string>
+
+#include "atom/browser/api/event_subscriber.h"
+#include "atom/common/native_mate_converters/callback.h"
+
+namespace {
+
+// A FunctionTemplate lifetime is bound to the v8 context, so it can be safely
+// stored as a global here since there's only one for the main process.
+v8::Global<v8::FunctionTemplate> g_cached_template;
+
+struct JSHandlerData {
+  JSHandlerData(v8::Isolate* isolate,
+                mate::internal::EventSubscriberBase* subscriber)
+      : handle_(isolate, v8::External::New(isolate, this)),
+        subscriber_(subscriber) {
+    handle_.SetWeak(this, GC, v8::WeakCallbackType::kFinalizer);
+  }
+
+  static void GC(const v8::WeakCallbackInfo<JSHandlerData>& data) {
+    delete data.GetParameter();
+  }
+
+  v8::Global<v8::External> handle_;
+  mate::internal::EventSubscriberBase* subscriber_;
+};
+
+void InvokeCallback(const v8::FunctionCallbackInfo<v8::Value>& info) {
+  v8::Locker locker(info.GetIsolate());
+  v8::HandleScope handle_scope(info.GetIsolate());
+  v8::Local<v8::Context> context = info.GetIsolate()->GetCurrentContext();
+  v8::Context::Scope context_scope(context);
+  mate::Arguments args(info);
+  v8::Local<v8::Value> handler, event;
+  args.GetNext(&handler);
+  args.GetNext(&event);
+  DCHECK(handler->IsExternal());
+  DCHECK(event->IsString());
+  JSHandlerData* handler_data = static_cast<JSHandlerData*>(
+      v8::Local<v8::External>::Cast(handler)->Value());
+  handler_data->subscriber_->EventEmitted(mate::V8ToString(event), &args);
+}
+
+}  // namespace
+
+namespace mate {
+
+namespace internal {
+
+EventSubscriberBase::EventSubscriberBase(v8::Isolate* isolate,
+                                         v8::Local<v8::Object> emitter)
+    : isolate_(isolate), emitter_(isolate, emitter) {
+  if (g_cached_template.IsEmpty()) {
+    g_cached_template = v8::Global<v8::FunctionTemplate>(
+        isolate_, v8::FunctionTemplate::New(isolate_, InvokeCallback));
+  }
+}
+
+EventSubscriberBase::~EventSubscriberBase() {
+  if (!isolate_) {
+    return;
+  }
+  RemoveAllListeners();
+  emitter_.Reset();
+  DCHECK_EQ(js_handlers_.size(), 0);
+}
+
+void EventSubscriberBase::On(const std::string& event_name) {
+  DCHECK(js_handlers_.find(event_name) == js_handlers_.end());
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  auto fn_template = g_cached_template.Get(isolate_);
+  auto event = mate::StringToV8(isolate_, event_name);
+  auto js_handler_data = new JSHandlerData(isolate_, this);
+  v8::Local<v8::Value> fn = internal::BindFunctionWith(
+      isolate_, isolate_->GetCurrentContext(), fn_template->GetFunction(),
+      js_handler_data->handle_.Get(isolate_), event);
+  js_handlers_.insert(
+      std::make_pair(event_name, v8::Global<v8::Value>(isolate_, fn)));
+  internal::ValueVector converted_args = {event, fn};
+  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on",
+                               &converted_args);
+}
+
+void EventSubscriberBase::Off(const std::string& event_name) {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  auto js_handler = js_handlers_.find(event_name);
+  DCHECK(js_handler != js_handlers_.end());
+  RemoveListener(js_handler);
+}
+
+void EventSubscriberBase::RemoveAllListeners() {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  while (!js_handlers_.empty()) {
+    RemoveListener(js_handlers_.begin());
+  }
+}
+
+std::map<std::string, v8::Global<v8::Value>>::iterator
+EventSubscriberBase::RemoveListener(
+    std::map<std::string, v8::Global<v8::Value>>::iterator it) {
+  internal::ValueVector args = {StringToV8(isolate_, it->first),
+                                it->second.Get(isolate_)};
+  internal::CallMethodWithArgs(
+      isolate_, v8::Local<v8::Object>::Cast(emitter_.Get(isolate_)),
+      "removeListener", &args);
+  it->second.Reset();
+  return js_handlers_.erase(it);
+}
+
+}  // namespace internal
+
+}  // namespace mate

+ 132 - 0
atom/browser/api/event_subscriber.h

@@ -0,0 +1,132 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#ifndef ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_
+#define ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_
+
+#include <map>
+#include <string>
+
+#include "atom/common/api/event_emitter_caller.h"
+#include "base/synchronization/lock.h"
+#include "content/public/browser/browser_thread.h"
+#include "native_mate/native_mate/arguments.h"
+
+namespace mate {
+
+namespace internal {
+
+class EventSubscriberBase {
+ public:
+  EventSubscriberBase(v8::Isolate* isolate, v8::Local<v8::Object> emitter);
+  virtual ~EventSubscriberBase();
+  virtual void EventEmitted(const std::string& event_name,
+                            mate::Arguments* args) = 0;
+
+ protected:
+  void On(const std::string& event_name);
+  void Off(const std::string& event_name);
+  void RemoveAllListeners();
+
+ private:
+  std::map<std::string, v8::Global<v8::Value>>::iterator RemoveListener(
+      std::map<std::string, v8::Global<v8::Value>>::iterator it);
+
+  v8::Isolate* isolate_;
+  v8::Global<v8::Object> emitter_;
+  std::map<std::string, v8::Global<v8::Value>> js_handlers_;
+
+  DISALLOW_COPY_AND_ASSIGN(EventSubscriberBase);
+};
+
+}  // namespace internal
+
+template <typename HandlerType>
+class EventSubscriber : internal::EventSubscriberBase {
+ public:
+  using EventCallback = void (HandlerType::*)(mate::Arguments* args);
+  // Alias to unique_ptr with deleter.
+  using unique_ptr = std::unique_ptr<EventSubscriber<HandlerType>,
+                                     void (*)(EventSubscriber<HandlerType>*)>;
+  // EventSubscriber should only be created/deleted in the main thread since it
+  // communicates with the V8 engine. This smart pointer makes it simpler to
+  // bind the lifetime of EventSubscriber with a class whose lifetime is managed
+  // by a non-UI thread.
+  class SafePtr : public unique_ptr {
+   public:
+    SafePtr() : SafePtr(nullptr) {}
+    explicit SafePtr(EventSubscriber<HandlerType>* ptr)
+        : unique_ptr(ptr, Deleter) {}
+
+   private:
+    // Custom deleter that schedules destructor invocation to the main thread.
+    static void Deleter(EventSubscriber<HandlerType>* ptr) {
+      DCHECK(
+          !::content::BrowserThread::CurrentlyOn(::content::BrowserThread::UI));
+      DCHECK(ptr);
+      // Acquire handler lock and reset handler_ to ensure that any new events
+      // emitted will be ignored after this function returns
+      base::AutoLock auto_lock(ptr->handler_lock_);
+      ptr->handler_ = nullptr;
+      content::BrowserThread::PostTask(
+          content::BrowserThread::UI, FROM_HERE,
+          base::Bind(
+              [](EventSubscriber<HandlerType>* subscriber) {
+                delete subscriber;
+              },
+              ptr));
+    }
+  };
+
+  EventSubscriber(HandlerType* handler,
+                  v8::Isolate* isolate,
+                  v8::Local<v8::Object> emitter)
+      : EventSubscriberBase(isolate, emitter), handler_(handler) {
+    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
+  }
+
+  void On(const std::string& event, EventCallback callback) {
+    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
+    EventSubscriberBase::On(event);
+    callbacks_.insert(std::make_pair(event, callback));
+  }
+
+  void Off(const std::string& event) {
+    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
+    EventSubscriberBase::Off(event);
+    DCHECK(callbacks_.find(event) != callbacks_.end());
+    callbacks_.erase(callbacks_.find(event));
+  }
+
+  void RemoveAllListeners() {
+    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
+    EventSubscriberBase::RemoveAllListeners();
+    callbacks_.clear();
+  }
+
+ private:
+  void EventEmitted(const std::string& event_name,
+                    mate::Arguments* args) override {
+    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
+    base::AutoLock auto_lock(handler_lock_);
+    if (!handler_) {
+      // handler_ was probably destroyed by another thread and we should not
+      // access it.
+      return;
+    }
+    auto it = callbacks_.find(event_name);
+    if (it != callbacks_.end()) {
+      auto method = it->second;
+      (handler_->*method)(args);
+    }
+  }
+
+  HandlerType* handler_;
+  base::Lock handler_lock_;
+  std::map<std::string, EventCallback> callbacks_;
+};
+
+}  // namespace mate
+
+#endif  // ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_

+ 13 - 0
atom/browser/net/js_asker.h

@@ -71,6 +71,7 @@ class JsAsker : public RequestJob {
   void Start() override {
     std::unique_ptr<base::DictionaryValue> request_details(
         new base::DictionaryValue);
+    request_start_time_ = base::TimeTicks::Now();
     FillRequestDetails(request_details.get(), RequestJob::request());
     content::BrowserThread::PostTask(
         content::BrowserThread::UI, FROM_HERE,
@@ -86,6 +87,15 @@ class JsAsker : public RequestJob {
 
   int GetResponseCode() const override { return net::HTTP_OK; }
 
+  // NOTE: We have to implement this method or risk a crash in blink for
+  // redirects!
+  void GetLoadTimingInfo(net::LoadTimingInfo* load_timing_info) const override {
+    load_timing_info->send_start = request_start_time_;
+    load_timing_info->send_end = request_start_time_;
+    load_timing_info->request_start = request_start_time_;
+    load_timing_info->receive_headers_end = response_start_time_;
+  }
+
   void GetResponseInfo(net::HttpResponseInfo* info) override {
     info->headers = new net::HttpResponseHeaders("");
   }
@@ -93,6 +103,7 @@ class JsAsker : public RequestJob {
   // Called when the JS handler has sent the response, we need to decide whether
   // to start, or fail the job.
   void OnResponse(bool success, std::unique_ptr<base::Value> value) {
+    response_start_time_ = base::TimeTicks::Now();
     int error = net::ERR_NOT_IMPLEMENTED;
     if (success && value && !internal::IsErrorOptions(value.get(), &error)) {
       StartAsync(std::move(value));
@@ -105,6 +116,8 @@ class JsAsker : public RequestJob {
   v8::Isolate* isolate_;
   net::URLRequestContextGetter* request_context_getter_;
   JavaScriptHandler handler_;
+  base::TimeTicks request_start_time_;
+  base::TimeTicks response_start_time_;
 
   base::WeakPtrFactory<JsAsker> weak_factory_;
 

+ 204 - 0
atom/browser/net/url_request_stream_job.cc

@@ -0,0 +1,204 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#include <algorithm>
+#include <ostream>
+#include <string>
+
+#include "atom/browser/net/url_request_stream_job.h"
+#include "atom/common/api/event_emitter_caller.h"
+#include "atom/common/atom_constants.h"
+#include "atom/common/native_mate_converters/net_converter.h"
+#include "atom/common/node_includes.h"
+#include "base/strings/string_number_conversions.h"
+#include "base/strings/string_util.h"
+#include "base/time/time.h"
+#include "net/filter/gzip_source_stream.h"
+
+namespace atom {
+
+URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request,
+                                         net::NetworkDelegate* network_delegate)
+    : JsAsker<net::URLRequestJob>(request, network_delegate),
+      ended_(false),
+      errored_(false),
+      pending_io_buf_(nullptr),
+      pending_io_buf_size_(0),
+      response_headers_(nullptr),
+      weak_factory_(this) {}
+
+void URLRequestStreamJob::BeforeStartInUI(v8::Isolate* isolate,
+                                          v8::Local<v8::Value> value) {
+  if (value->IsNull() || value->IsUndefined() || !value->IsObject()) {
+    // Invalid opts.
+    ended_ = true;
+    errored_ = true;
+    return;
+  }
+
+  mate::Dictionary opts(isolate, v8::Local<v8::Object>::Cast(value));
+  int status_code;
+  if (!opts.Get("statusCode", &status_code)) {
+    // assume HTTP OK if statusCode is not passed.
+    status_code = 200;
+  }
+  std::string status("HTTP/1.1 ");
+  status.append(base::IntToString(status_code));
+  status.append(" ");
+  status.append(
+      net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code)));
+  status.append("\0\0", 2);
+  response_headers_ = new net::HttpResponseHeaders(status);
+
+  if (opts.Get("headers", &value)) {
+    mate::Converter<net::HttpResponseHeaders*>::FromV8(isolate, value,
+                                                       response_headers_.get());
+  }
+
+  if (!opts.Get("data", &value)) {
+    // Assume the opts is already a stream
+    value = opts.GetHandle();
+  } else if (value->IsNullOrUndefined()) {
+    // "data" was explicitly passed as null or undefined, assume the user wants
+    // to send an empty body.
+    ended_ = true;
+    return;
+  }
+
+  mate::Dictionary data(isolate, v8::Local<v8::Object>::Cast(value));
+  if (!data.Get("on", &value) || !value->IsFunction() ||
+      !data.Get("removeListener", &value) || !value->IsFunction()) {
+    // If data is passed but it is not a stream, signal an error.
+    ended_ = true;
+    errored_ = true;
+    return;
+  }
+
+  subscriber_.reset(new mate::EventSubscriber<URLRequestStreamJob>(
+      this, isolate, data.GetHandle()));
+  subscriber_->On("data", &URLRequestStreamJob::OnData);
+  subscriber_->On("end", &URLRequestStreamJob::OnEnd);
+  subscriber_->On("error", &URLRequestStreamJob::OnError);
+}
+
+void URLRequestStreamJob::StartAsync(std::unique_ptr<base::Value> options) {
+  NotifyHeadersComplete();
+}
+
+void URLRequestStreamJob::OnData(mate::Arguments* args) {
+  v8::Local<v8::Value> node_data;
+  args->GetNext(&node_data);
+  if (node_data->IsUint8Array()) {
+    const char* data = node::Buffer::Data(node_data);
+    size_t data_size = node::Buffer::Length(node_data);
+    std::copy(data, data + data_size, std::back_inserter(buffer_));
+  } else {
+    NOTREACHED();
+  }
+  if (pending_io_buf_) {
+    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
+  }
+}
+
+void URLRequestStreamJob::OnEnd(mate::Arguments* args) {
+  ended_ = true;
+  if (pending_io_buf_) {
+    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
+  }
+}
+
+void URLRequestStreamJob::OnError(mate::Arguments* args) {
+  errored_ = true;
+  if (pending_io_buf_) {
+    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
+  }
+}
+
+int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) {
+  content::BrowserThread::PostTask(
+      content::BrowserThread::UI, FROM_HERE,
+      base::Bind(&URLRequestStreamJob::CopyMoreData, weak_factory_.GetWeakPtr(),
+                 make_scoped_refptr(dest), dest_size));
+  return net::ERR_IO_PENDING;
+}
+
+void URLRequestStreamJob::DoneReading() {
+  subscriber_.reset();
+  buffer_.clear();
+  ended_ = true;
+}
+
+void URLRequestStreamJob::DoneReadingRedirectResponse() {
+  DoneReading();
+}
+
+void URLRequestStreamJob::CopyMoreDataDone(scoped_refptr<net::IOBuffer> io_buf,
+                                           int status) {
+  if (status <= 0) {
+    subscriber_.reset();
+  }
+  ReadRawDataComplete(status);
+  io_buf = nullptr;
+}
+
+void URLRequestStreamJob::CopyMoreData(scoped_refptr<net::IOBuffer> io_buf,
+                                       int io_buf_size) {
+  // reset any instance references to io_buf
+  pending_io_buf_ = nullptr;
+  pending_io_buf_size_ = 0;
+
+  int read_count = 0;
+  if (buffer_.size()) {
+    size_t count = std::min((size_t)io_buf_size, buffer_.size());
+    std::copy(buffer_.begin(), buffer_.begin() + count, io_buf->data());
+    buffer_.erase(buffer_.begin(), buffer_.begin() + count);
+    read_count = count;
+  } else if (!ended_ && !errored_) {
+    // No data available yet, save references to the IOBuffer, which will be
+    // passed back to this function when OnData/OnEnd/OnError are called
+    pending_io_buf_ = io_buf;
+    pending_io_buf_size_ = io_buf_size;
+  }
+
+  if (!pending_io_buf_) {
+    // Only call CopyMoreDataDone if we have read something.
+    int status = (errored_ && !read_count) ? net::ERR_FAILED : read_count;
+    content::BrowserThread::PostTask(
+        content::BrowserThread::IO, FROM_HERE,
+        base::Bind(&URLRequestStreamJob::CopyMoreDataDone,
+                   weak_factory_.GetWeakPtr(), io_buf, status));
+  }
+}
+
+std::unique_ptr<net::SourceStream> URLRequestStreamJob::SetUpSourceStream() {
+  std::unique_ptr<net::SourceStream> source =
+      net::URLRequestJob::SetUpSourceStream();
+  size_t i = 0;
+  std::string type;
+  while (response_headers_->EnumerateHeader(&i, "Content-Encoding", &type)) {
+    if (base::LowerCaseEqualsASCII(type, "gzip") ||
+        base::LowerCaseEqualsASCII(type, "x-gzip")) {
+      return net::GzipSourceStream::Create(std::move(source),
+                                           net::SourceStream::TYPE_GZIP);
+    } else if (base::LowerCaseEqualsASCII(type, "deflate")) {
+      return net::GzipSourceStream::Create(std::move(source),
+                                           net::SourceStream::TYPE_DEFLATE);
+    }
+  }
+  return source;
+}
+
+bool URLRequestStreamJob::GetMimeType(std::string* mime_type) const {
+  return response_headers_->GetMimeType(mime_type);
+}
+
+int URLRequestStreamJob::GetResponseCode() const {
+  return response_headers_->response_code();
+}
+
+void URLRequestStreamJob::GetResponseInfo(net::HttpResponseInfo* info) {
+  info->headers = response_headers_;
+}
+
+}  // namespace atom

+ 66 - 0
atom/browser/net/url_request_stream_job.h

@@ -0,0 +1,66 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#ifndef ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_
+#define ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_
+
+#include <deque>
+#include <string>
+
+#include "atom/browser/api/event_subscriber.h"
+#include "atom/browser/net/js_asker.h"
+#include "base/memory/ref_counted_memory.h"
+#include "native_mate/persistent_dictionary.h"
+#include "net/base/io_buffer.h"
+#include "net/http/http_status_code.h"
+#include "net/url_request/url_request_context_getter.h"
+#include "v8/include/v8.h"
+
+namespace atom {
+
+class URLRequestStreamJob : public JsAsker<net::URLRequestJob> {
+ public:
+  URLRequestStreamJob(net::URLRequest* request,
+                      net::NetworkDelegate* network_delegate);
+
+  void OnData(mate::Arguments* args);
+  void OnEnd(mate::Arguments* args);
+  void OnError(mate::Arguments* args);
+
+  // URLRequestJob
+  void GetResponseInfo(net::HttpResponseInfo* info) override;
+
+ protected:
+  // URLRequestJob
+  int ReadRawData(net::IOBuffer* buf, int buf_size) override;
+  void DoneReading() override;
+  void DoneReadingRedirectResponse() override;
+  std::unique_ptr<net::SourceStream> SetUpSourceStream() override;
+  bool GetMimeType(std::string* mime_type) const override;
+  int GetResponseCode() const override;
+
+ private:
+  // JSAsker
+  void BeforeStartInUI(v8::Isolate*, v8::Local<v8::Value>) override;
+  void StartAsync(std::unique_ptr<base::Value> options) override;
+  void OnResponse(bool success, std::unique_ptr<base::Value> value);
+
+  // Callback after data is asynchronously read from the file into |buf|.
+  void CopyMoreData(scoped_refptr<net::IOBuffer> io_buf, int io_buf_size);
+  void CopyMoreDataDone(scoped_refptr<net::IOBuffer> io_buf, int read_count);
+
+  std::deque<char> buffer_;
+  bool ended_;
+  bool errored_;
+  scoped_refptr<net::IOBuffer> pending_io_buf_;
+  int pending_io_buf_size_;
+  scoped_refptr<net::HttpResponseHeaders> response_headers_;
+  mate::EventSubscriber<URLRequestStreamJob>::SafePtr subscriber_;
+  base::WeakPtrFactory<URLRequestStreamJob> weak_factory_;
+
+  DISALLOW_COPY_AND_ASSIGN(URLRequestStreamJob);
+};
+}  // namespace atom
+
+#endif  // ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_

+ 16 - 16
atom/common/native_mate_converters/callback.cc

@@ -38,22 +38,6 @@ void CallTranslater(v8::Local<v8::External> external,
   delete holder;
 }
 
-// func.bind(func, arg1).
-// NB(zcbenz): Using C++11 version crashes VS.
-v8::Local<v8::Value> BindFunctionWith(v8::Isolate* isolate,
-                                      v8::Local<v8::Context> context,
-                                      v8::Local<v8::Function> func,
-                                      v8::Local<v8::Value> arg1,
-                                      v8::Local<v8::Value> arg2) {
-  v8::MaybeLocal<v8::Value> bind = func->Get(mate::StringToV8(isolate, "bind"));
-  CHECK(!bind.IsEmpty());
-  v8::Local<v8::Function> bind_func =
-      v8::Local<v8::Function>::Cast(bind.ToLocalChecked());
-  v8::Local<v8::Value> converted[] = { func, arg1, arg2 };
-  return bind_func->Call(
-      context, func, arraysize(converted), converted).ToLocalChecked();
-}
-
 }  // namespace
 
 // Destroy the class on UI thread when possible.
@@ -130,6 +114,22 @@ v8::Local<v8::Value> CreateFunctionFromTranslater(
                           v8::Object::New(isolate));
 }
 
+// func.bind(func, arg1).
+// NB(zcbenz): Using C++11 version crashes VS.
+v8::Local<v8::Value> BindFunctionWith(v8::Isolate* isolate,
+                                      v8::Local<v8::Context> context,
+                                      v8::Local<v8::Function> func,
+                                      v8::Local<v8::Value> arg1,
+                                      v8::Local<v8::Value> arg2) {
+  v8::MaybeLocal<v8::Value> bind = func->Get(mate::StringToV8(isolate, "bind"));
+  CHECK(!bind.IsEmpty());
+  v8::Local<v8::Function> bind_func =
+      v8::Local<v8::Function>::Cast(bind.ToLocalChecked());
+  v8::Local<v8::Value> converted[] = {func, arg1, arg2};
+  return bind_func->Call(context, func, arraysize(converted), converted)
+      .ToLocalChecked();
+}
+
 }  // namespace internal
 
 }  // namespace mate

+ 5 - 0
atom/common/native_mate_converters/callback.h

@@ -111,6 +111,11 @@ struct V8FunctionInvoker<ReturnType(ArgTypes...)> {
 using Translater = base::Callback<void(Arguments* args)>;
 v8::Local<v8::Value> CreateFunctionFromTranslater(
     v8::Isolate* isolate, const Translater& translater);
+v8::Local<v8::Value> BindFunctionWith(v8::Isolate* isolate,
+                                      v8::Local<v8::Context> context,
+                                      v8::Local<v8::Function> func,
+                                      v8::Local<v8::Value> arg1,
+                                      v8::Local<v8::Value> arg2);
 
 // Calls callback with Arguments.
 template <typename Sig>

+ 36 - 0
atom/common/native_mate_converters/net_converter.cc

@@ -165,6 +165,35 @@ v8::Local<v8::Value> Converter<net::HttpResponseHeaders*>::ToV8(
   return ConvertToV8(isolate, response_headers);
 }
 
+bool Converter<net::HttpResponseHeaders*>::FromV8(
+    v8::Isolate* isolate,
+    v8::Local<v8::Value> val,
+    net::HttpResponseHeaders* out) {
+  if (!val->IsObject()) {
+    return false;
+  }
+  auto context = isolate->GetCurrentContext();
+  auto headers = v8::Local<v8::Object>::Cast(val);
+  auto keys = headers->GetOwnPropertyNames();
+  for (uint32_t i = 0; i < keys->Length(); i++) {
+    v8::Local<v8::String> key, value;
+    if (!keys->Get(i)->ToString(context).ToLocal(&key)) {
+      return false;
+    }
+    if (!headers->Get(key)->ToString(context).ToLocal(&value)) {
+      return false;
+    }
+    v8::String::Utf8Value key_utf8(key);
+    v8::String::Utf8Value value_utf8(value);
+    std::string k(*key_utf8, key_utf8.length());
+    std::string v(*value_utf8, value_utf8.length());
+    std::ostringstream tmp;
+    tmp << k << ": " << v;
+    out->AddHeader(tmp.str());
+  }
+  return true;
+}
+
 }  // namespace mate
 
 namespace atom {
@@ -180,6 +209,13 @@ void FillRequestDetails(base::DictionaryValue* details,
   GetUploadData(list.get(), request);
   if (!list->empty())
     details->Set("uploadData", std::move(list));
+  std::unique_ptr<base::DictionaryValue> headers_value(
+      new base::DictionaryValue);
+  for (net::HttpRequestHeaders::Iterator it(request->extra_request_headers());
+       it.GetNext();) {
+    headers_value->SetString(it.name(), it.value());
+  }
+  details->Set("headers", std::move(headers_value));
 }
 
 void GetUploadData(base::ListValue* upload_data_list,

+ 3 - 0
atom/common/native_mate_converters/net_converter.h

@@ -49,6 +49,9 @@ template <>
 struct Converter<net::HttpResponseHeaders*> {
   static v8::Local<v8::Value> ToV8(v8::Isolate* isolate,
                                    net::HttpResponseHeaders* headers);
+  static bool FromV8(v8::Isolate* isolate,
+                     v8::Local<v8::Value> val,
+                     net::HttpResponseHeaders* out);
 };
 
 }  // namespace mate

+ 79 - 0
docs/api/protocol.md

@@ -194,6 +194,67 @@ request to have a different session you should set `session` to `null`.
 
 For POST requests the `uploadData` object must be provided.
 
+### `protocol.registerStreamProtocol(scheme, handler[, completion])`
+
+* `scheme` String
+* `handler` Function
+  * `request` Object
+    * `url` String
+    * `headers` Object
+    * `referrer` String
+    * `method` String
+    * `uploadData` [UploadData[]](structures/upload-data.md)
+  * `callback` Function
+    * `stream` (ReadableStream | [StreamProtocolResponse](structures/stream-protocol-response.md)) (optional)
+* `completion` Function (optional)
+  * `error` Error
+
+Registers a protocol of `scheme` that will send a `Readable` as a response.
+
+The usage is similar to the other `register{Any}Protocol`, except that the
+`callback` should be called with either a `Readable` object or an object that
+has the `data`, `statusCode`, and `headers` properties.
+
+Example:
+
+```javascript
+const {protocol} = require('electron')
+const {PassThrough} = require('stream')
+
+function createStream (text) {
+  const rv = new PassThrough()  // PassThrough is also a Readable stream
+  rv.push(text)
+  rv.push(null)
+  return rv
+}
+
+protocol.registerStreamProtocol('atom', (request, callback) => {
+  callback({
+    statusCode: 200,
+    headers: {
+      'content-type': 'text/html'
+    },
+    data: createStream('<h5>Response</h5>')
+  })
+}, (error) => {
+  if (error) console.error('Failed to register protocol')
+})
+```
+
+It is possible to pass any object that implements the readable stream API (emits
+`data`/`end`/`error` events). For example, here's how a file could be returned:
+
+```javascript
+const {protocol} = require('electron')
+const fs = require('fs')
+
+protocol.registerStreamProtocol('atom', (request, callback) => {
+  callback(fs.createReadStream('index.html'))
+}, (error) => {
+  if (error) console.error('Failed to register protocol')
+})
+```
+
 ### `protocol.unregisterProtocol(scheme[, completion])`
 
 * `scheme` String
@@ -285,6 +346,24 @@ which sends a `Buffer` as a response.
 Intercepts `scheme` protocol and uses `handler` as the protocol's new handler
 which sends a new HTTP request as a response.
 
+### `protocol.interceptStreamProtocol(scheme, handler[, completion])`
+
+* `scheme` String
+* `handler` Function
+  * `request` Object
+    * `url` String
+    * `headers` Object
+    * `referrer` String
+    * `method` String
+    * `uploadData` [UploadData[]](structures/upload-data.md)
+  * `callback` Function
+    * `stream` (ReadableStream | [StreamProtocolResponse](structures/stream-protocol-response.md)) (optional)
+* `completion` Function (optional)
+  * `error` Error
+
+Same as `protocol.registerStreamProtocol`, except that it replaces an existing
+protocol handler.
+
 ### `protocol.uninterceptProtocol(scheme[, completion])`
 
 * `scheme` String

+ 5 - 0
docs/api/structures/stream-protocol-response.md

@@ -0,0 +1,5 @@
+# StreamProtocolResponse Object
+
+* `statusCode` Number - The HTTP response code
+* `headers` Object - An object containing the response headers
+* `data` ReadableStream - A Node.js readable stream representing the response body

+ 4 - 0
filenames.gypi

@@ -163,6 +163,8 @@
       'atom/browser/api/event.h',
       'atom/browser/api/event_emitter.cc',
       'atom/browser/api/event_emitter.h',
+      'atom/browser/api/event_subscriber.cc',
+      'atom/browser/api/event_subscriber.h',
       'atom/browser/api/trackable_object.cc',
       'atom/browser/api/trackable_object.h',
       'atom/browser/api/frame_subscriber.cc',
@@ -272,6 +274,8 @@
       'atom/browser/net/url_request_buffer_job.h',
       'atom/browser/net/url_request_fetch_job.cc',
       'atom/browser/net/url_request_fetch_job.h',
+      'atom/browser/net/url_request_stream_job.cc',
+      'atom/browser/net/url_request_stream_job.h',
       'atom/browser/node_debugger.cc',
       'atom/browser/node_debugger.h',
       'atom/browser/relauncher_linux.cc',

+ 220 - 0
spec/api-protocol-spec.js

@@ -5,6 +5,10 @@ const qs = require('querystring')
 const {closeWindow} = require('./window-helpers')
 const {remote} = require('electron')
 const {BrowserWindow, ipcMain, protocol, session, webContents} = remote
+// The RPC API doesn't seem to support calling methods on remote objects very
+// well. In order to test stream protocol, we must work around this limitation
+// and use Stream instances created in the browser process.
+const stream = remote.require('stream')
 
 describe('protocol module', () => {
   const protocolName = 'sp'
@@ -14,6 +18,33 @@ describe('protocol module', () => {
     type: 'string'
   }
 
+  function delay (ms) {
+    return new Promise((resolve) => {
+      setTimeout(resolve, ms)
+    })
+  }
+
+  function getStream (chunkSize = text.length, data = text) {
+    const body = stream.PassThrough()
+
+    async function sendChunks () {
+      let buf = new Buffer(data)
+      for (;;) {
+        body.push(buf.slice(0, chunkSize))
+        buf = buf.slice(chunkSize)
+        if (!buf.length) {
+          break
+        }
+        // emulate network delay
+        await delay(50)
+      }
+      body.push(null)
+    }
+
+    sendChunks()
+    return body
+  }
+
   afterEach((done) => {
     protocol.unregisterProtocol(protocolName, () => {
       protocol.uninterceptProtocol('http', () => done())
@@ -443,6 +474,120 @@ describe('protocol module', () => {
     })
   })
 
+  describe('protocol.registerStreamProtocol', () => {
+    it('sends Stream as response', (done) => {
+      const handler = (request, callback) => callback(getStream())
+      protocol.registerStreamProtocol(protocolName, handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: protocolName + '://fake-host',
+          cache: false,
+          success: (data) => {
+            assert.equal(data, text)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('sends object as response', (done) => {
+      const handler = (request, callback) => callback({data: getStream()})
+      protocol.registerStreamProtocol(protocolName, handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: protocolName + '://fake-host',
+          cache: false,
+          success: (data, _, request) => {
+            assert.equal(request.status, 200)
+            assert.equal(data, text)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('sends custom response headers', (done) => {
+      const handler = (request, callback) => callback({
+        data: getStream(3),
+        headers: {
+          'x-electron': ['a', 'b']
+        }
+      })
+      protocol.registerStreamProtocol(protocolName, handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: protocolName + '://fake-host',
+          cache: false,
+          success: (data, _, request) => {
+            assert.equal(request.status, 200)
+            assert.equal(request.getResponseHeader('x-electron'), 'a,b')
+            assert.equal(data, text)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('sends custom status code', (done) => {
+      const handler = (request, callback) => callback({
+        statusCode: 204,
+        data: null
+      })
+      protocol.registerStreamProtocol(protocolName, handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: protocolName + '://fake-host',
+          cache: false,
+          success: (data, _, request) => {
+            assert.equal(request.status, 204)
+            assert.equal(data, undefined)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('receives request headers', (done) => {
+      const handler = (request, callback) => {
+        callback({
+          headers: {
+            'content-type': 'application/json'
+          },
+          data: getStream(5, JSON.stringify(Object.assign({}, request.headers)))
+        })
+      }
+      protocol.registerStreamProtocol(protocolName, handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: protocolName + '://fake-host',
+          headers: {
+            'x-return-headers': 'yes'
+          },
+          cache: false,
+          success: (data) => {
+            assert.equal(data['x-return-headers'], 'yes')
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+  })
+
   describe('protocol.isProtocolHandled', () => {
     it('returns true for about:', (done) => {
       protocol.isProtocolHandled('about', (result) => {
@@ -722,6 +867,81 @@ describe('protocol module', () => {
     })
   })
 
+  describe('protocol.interceptStreamProtocol', () => {
+    it('can intercept http protocol', (done) => {
+      const handler = (request, callback) => callback(getStream())
+      protocol.interceptStreamProtocol('http', handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: 'http://fake-host',
+          cache: false,
+          success: (data) => {
+            assert.equal(data, text)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('can receive post data', (done) => {
+      const handler = (request, callback) => {
+        callback(getStream(3, request.uploadData[0].bytes.toString()))
+      }
+      protocol.interceptStreamProtocol('http', handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: 'http://fake-host',
+          cache: false,
+          type: 'POST',
+          data: postData,
+          success: (data) => {
+            assert.deepEqual(qs.parse(data), postData)
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+
+    it('can execute redirects', (done) => {
+      const handler = (request, callback) => {
+        if (request.url.indexOf('http://fake-host') === 0) {
+          setTimeout(() => {
+            callback({
+              data: null,
+              statusCode: 302,
+              headers: {
+                Location: 'http://fake-redirect'
+              }
+            })
+          }, 300)
+        } else {
+          assert.equal(request.url.indexOf('http://fake-redirect'), 0)
+          callback(getStream(1, 'redirect'))
+        }
+      }
+      protocol.interceptStreamProtocol('http', handler, (error) => {
+        if (error) return done(error)
+        $.ajax({
+          url: 'http://fake-host',
+          cache: false,
+          success: (data) => {
+            assert.equal(data, 'redirect')
+            done()
+          },
+          error: (xhr, errorType, error) => {
+            done(error || new Error(`Request failed: ${xhr.status}`))
+          }
+        })
+      })
+    })
+  })
+
   describe('protocol.uninterceptProtocol', () => {
     it('returns error when scheme does not exist', (done) => {
       protocol.uninterceptProtocol('not-exist', (error) => {