Browse Source

refactor: Clean up the implementation of the registerStreamProtocol (#11357)

* Use weak pointer to avoid race condition

* Use DeleteSoon to delete pointer across threads

* Simplify EventSubscriber

* No need to manually mange V8 convertions

* Fix cpplint warning

We should update cpplint for this, but let's do it in other PR.

* Move UI thread operations to EventSubscriber

* Less and more assertions

Some methods are now private so no more need to assert threads.

* Fix cpplint warnings

* No longer needs the EventEmitted

* EventSubscriber => StreamSubscriber

* Reduce the copies when passing data

* Fix cpplint warnings
Cheng Zhao 6 years ago
parent
commit
d3ae541397

+ 0 - 124
atom/browser/api/event_subscriber.cc

@@ -1,124 +0,0 @@
-// Copyright (c) 2017 GitHub, Inc.
-// Use of this source code is governed by the MIT license that can be
-// found in the LICENSE file.
-
-#include "atom/browser/api/event_subscriber.h"
-
-#include <string>
-#include <utility>
-
-#include "atom/common/native_mate_converters/callback.h"
-
-namespace {
-
-// A FunctionTemplate lifetime is bound to the v8 context, so it can be safely
-// stored as a global here since there's only one for the main process.
-v8::Global<v8::FunctionTemplate> g_cached_template;
-
-struct JSHandlerData {
-  JSHandlerData(v8::Isolate* isolate,
-                mate::internal::EventSubscriberBase* subscriber)
-      : handle_(isolate, v8::External::New(isolate, this)),
-        subscriber_(subscriber) {
-    handle_.SetWeak(this, GC, v8::WeakCallbackType::kFinalizer);
-  }
-
-  static void GC(const v8::WeakCallbackInfo<JSHandlerData>& data) {
-    delete data.GetParameter();
-  }
-
-  v8::Global<v8::External> handle_;
-  mate::internal::EventSubscriberBase* subscriber_;
-};
-
-void InvokeCallback(const v8::FunctionCallbackInfo<v8::Value>& info) {
-  v8::Locker locker(info.GetIsolate());
-  v8::HandleScope handle_scope(info.GetIsolate());
-  v8::Local<v8::Context> context = info.GetIsolate()->GetCurrentContext();
-  v8::Context::Scope context_scope(context);
-  mate::Arguments args(info);
-  v8::Local<v8::Value> handler, event;
-  args.GetNext(&handler);
-  args.GetNext(&event);
-  DCHECK(handler->IsExternal());
-  DCHECK(event->IsString());
-  JSHandlerData* handler_data = static_cast<JSHandlerData*>(
-      v8::Local<v8::External>::Cast(handler)->Value());
-  handler_data->subscriber_->EventEmitted(mate::V8ToString(event), &args);
-}
-
-}  // namespace
-
-namespace mate {
-
-namespace internal {
-
-EventSubscriberBase::EventSubscriberBase(v8::Isolate* isolate,
-                                         v8::Local<v8::Object> emitter)
-    : isolate_(isolate), emitter_(isolate, emitter) {
-  if (g_cached_template.IsEmpty()) {
-    g_cached_template = v8::Global<v8::FunctionTemplate>(
-        isolate_, v8::FunctionTemplate::New(isolate_, InvokeCallback));
-  }
-}
-
-EventSubscriberBase::~EventSubscriberBase() {
-  if (!isolate_) {
-    return;
-  }
-  RemoveAllListeners();
-  emitter_.Reset();
-  DCHECK_EQ(js_handlers_.size(), 0u);
-}
-
-void EventSubscriberBase::On(const std::string& event_name) {
-  DCHECK(js_handlers_.find(event_name) == js_handlers_.end());
-  v8::Locker locker(isolate_);
-  v8::Isolate::Scope isolate_scope(isolate_);
-  v8::HandleScope handle_scope(isolate_);
-  auto fn_template = g_cached_template.Get(isolate_);
-  auto event = mate::StringToV8(isolate_, event_name);
-  auto* js_handler_data = new JSHandlerData(isolate_, this);
-  v8::Local<v8::Value> fn = internal::BindFunctionWith(
-      isolate_, isolate_->GetCurrentContext(), fn_template->GetFunction(),
-      js_handler_data->handle_.Get(isolate_), event);
-  js_handlers_.insert(
-      std::make_pair(event_name, v8::Global<v8::Value>(isolate_, fn)));
-  internal::ValueVector converted_args = {event, fn};
-  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on",
-                               &converted_args);
-}
-
-void EventSubscriberBase::Off(const std::string& event_name) {
-  v8::Locker locker(isolate_);
-  v8::Isolate::Scope isolate_scope(isolate_);
-  v8::HandleScope handle_scope(isolate_);
-  auto js_handler = js_handlers_.find(event_name);
-  DCHECK(js_handler != js_handlers_.end());
-  RemoveListener(js_handler);
-}
-
-void EventSubscriberBase::RemoveAllListeners() {
-  v8::Locker locker(isolate_);
-  v8::Isolate::Scope isolate_scope(isolate_);
-  v8::HandleScope handle_scope(isolate_);
-  while (!js_handlers_.empty()) {
-    RemoveListener(js_handlers_.begin());
-  }
-}
-
-std::map<std::string, v8::Global<v8::Value>>::iterator
-EventSubscriberBase::RemoveListener(
-    std::map<std::string, v8::Global<v8::Value>>::iterator it) {
-  internal::ValueVector args = {StringToV8(isolate_, it->first),
-                                it->second.Get(isolate_)};
-  internal::CallMethodWithArgs(
-      isolate_, v8::Local<v8::Object>::Cast(emitter_.Get(isolate_)),
-      "removeListener", &args);
-  it->second.Reset();
-  return js_handlers_.erase(it);
-}
-
-}  // namespace internal
-
-}  // namespace mate

+ 0 - 141
atom/browser/api/event_subscriber.h

@@ -1,141 +0,0 @@
-// Copyright (c) 2017 GitHub, Inc.
-// Use of this source code is governed by the MIT license that can be
-// found in the LICENSE file.
-
-#ifndef ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_
-#define ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_
-
-#include <map>
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "atom/common/api/event_emitter_caller.h"
-#include "base/synchronization/lock.h"
-#include "content/public/browser/browser_thread.h"
-#include "native_mate/arguments.h"
-
-namespace mate {
-
-namespace internal {
-
-class EventSubscriberBase {
- public:
-  EventSubscriberBase(v8::Isolate* isolate, v8::Local<v8::Object> emitter);
-  virtual ~EventSubscriberBase();
-  virtual void EventEmitted(const std::string& event_name,
-                            mate::Arguments* args) = 0;
-
- protected:
-  void On(const std::string& event_name);
-  void Off(const std::string& event_name);
-  void RemoveAllListeners();
-
- private:
-  std::map<std::string, v8::Global<v8::Value>>::iterator RemoveListener(
-      std::map<std::string, v8::Global<v8::Value>>::iterator it);
-
-  v8::Isolate* isolate_;
-  v8::Global<v8::Object> emitter_;
-  std::map<std::string, v8::Global<v8::Value>> js_handlers_;
-
-  DISALLOW_COPY_AND_ASSIGN(EventSubscriberBase);
-};
-
-}  // namespace internal
-
-template <typename HandlerType>
-class EventSubscriber : internal::EventSubscriberBase {
- public:
-  using EventCallback = void (HandlerType::*)(mate::Arguments* args);
-  // Alias to unique_ptr with deleter.
-  using unique_ptr = std::unique_ptr<EventSubscriber<HandlerType>,
-                                     void (*)(EventSubscriber<HandlerType>*)>;
-  // EventSubscriber should only be created/deleted in the main thread since it
-  // communicates with the V8 engine. This smart pointer makes it simpler to
-  // bind the lifetime of EventSubscriber with a class whose lifetime is managed
-  // by a non-UI thread.
-  class SafePtr : public unique_ptr {
-   public:
-    SafePtr() : SafePtr(nullptr) {}
-    explicit SafePtr(EventSubscriber<HandlerType>* ptr)
-        : unique_ptr(ptr, Deleter) {}
-
-   private:
-    // Custom deleter that schedules destructor invocation to the main thread.
-    static void Deleter(EventSubscriber<HandlerType>* ptr) {
-      DCHECK(
-          !::content::BrowserThread::CurrentlyOn(::content::BrowserThread::UI));
-      DCHECK(ptr);
-      // Acquire handler lock and reset handler_ to ensure that any new events
-      // emitted will be ignored after this function returns
-      base::AutoLock auto_lock(ptr->handler_lock_);
-      ptr->handler_ = nullptr;
-      content::BrowserThread::PostTask(
-          content::BrowserThread::UI, FROM_HERE,
-          base::BindOnce(
-              [](EventSubscriber<HandlerType>* subscriber) {
-                {
-                  // It is possible that this function will execute in the UI
-                  // thread before the outer function has returned and destroyed
-                  // its auto_lock. We need to acquire the lock before deleting
-                  // or risk a crash.
-                  base::AutoLock auto_lock(subscriber->handler_lock_);
-                }
-                delete subscriber;
-              },
-              ptr));
-    }
-  };
-
-  EventSubscriber(HandlerType* handler,
-                  v8::Isolate* isolate,
-                  v8::Local<v8::Object> emitter)
-      : EventSubscriberBase(isolate, emitter), handler_(handler) {
-    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
-  }
-
-  void On(const std::string& event, EventCallback callback) {
-    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
-    EventSubscriberBase::On(event);
-    callbacks_.insert(std::make_pair(event, callback));
-  }
-
-  void Off(const std::string& event) {
-    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
-    EventSubscriberBase::Off(event);
-    DCHECK(callbacks_.find(event) != callbacks_.end());
-    callbacks_.erase(callbacks_.find(event));
-  }
-
-  void RemoveAllListeners() {
-    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
-    EventSubscriberBase::RemoveAllListeners();
-    callbacks_.clear();
-  }
-
- private:
-  void EventEmitted(const std::string& event_name,
-                    mate::Arguments* args) override {
-    DCHECK_CURRENTLY_ON(::content::BrowserThread::UI);
-    base::AutoLock auto_lock(handler_lock_);
-    if (!handler_) {
-      // handler_ was probably destroyed by another thread and we should not
-      // access it.
-      return;
-    }
-    auto it = callbacks_.find(event_name);
-    if (it != callbacks_.end()) {
-      auto method = it->second;
-      (handler_->*method)(args);
-    }
-  }
-
-  HandlerType* handler_;
-  base::Lock handler_lock_;
-  std::map<std::string, EventCallback> callbacks_;
-};
-
-}  // namespace mate
-
-#endif  // ATOM_BROWSER_API_EVENT_SUBSCRIBER_H_

+ 113 - 0
atom/browser/api/stream_subscriber.cc

@@ -0,0 +1,113 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#include "atom/browser/api/stream_subscriber.h"
+
+#include <string>
+
+#include "atom/browser/net/url_request_stream_job.h"
+#include "atom/common/api/event_emitter_caller.h"
+#include "atom/common/native_mate_converters/callback.h"
+
+#include "atom/common/node_includes.h"
+
+namespace mate {
+
+StreamSubscriber::StreamSubscriber(
+    v8::Isolate* isolate,
+    v8::Local<v8::Object> emitter,
+    base::WeakPtr<atom::URLRequestStreamJob> url_job)
+    : isolate_(isolate),
+      emitter_(isolate, emitter),
+      url_job_(url_job),
+      weak_factory_(this) {
+  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  auto weak_self = weak_factory_.GetWeakPtr();
+  On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
+  On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
+  On("error", base::Bind(&StreamSubscriber::OnError, weak_self));
+}
+
+StreamSubscriber::~StreamSubscriber() {
+  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  RemoveAllListeners();
+}
+
+void StreamSubscriber::On(const std::string& event, EventCallback&& callback) {  // NOLINT
+  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  DCHECK(js_handlers_.find(event) == js_handlers_.end());
+
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  // emitter.on(event, EventEmitted)
+  auto fn = CallbackToV8(isolate_, callback);
+  js_handlers_[event] = v8::Global<v8::Value>(isolate_, fn);
+  internal::ValueVector args = { StringToV8(isolate_, event), fn };
+  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on", &args);
+}
+
+void StreamSubscriber::Off(const std::string& event) {
+  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  DCHECK(js_handlers_.find(event) != js_handlers_.end());
+
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  auto js_handler = js_handlers_.find(event);
+  DCHECK(js_handler != js_handlers_.end());
+  RemoveListener(js_handler);
+}
+
+void StreamSubscriber::OnData(mate::Arguments* args) {
+  v8::Local<v8::Value> buf;
+  args->GetNext(&buf);
+  if (!node::Buffer::HasInstance(buf)) {
+    args->ThrowError("data must be Buffer");
+    return;
+  }
+
+  const char* data = node::Buffer::Data(buf);
+  size_t length = node::Buffer::Length(buf);
+  if (length == 0)
+    return;
+
+  // Pass the data to the URLJob in IO thread.
+  std::vector<char> buffer(data, data + length);
+  content::BrowserThread::PostTask(
+      content::BrowserThread::IO, FROM_HERE,
+      base::Bind(&atom::URLRequestStreamJob::OnData,
+                 url_job_, base::Passed(&buffer)));
+}
+
+void StreamSubscriber::OnEnd(mate::Arguments* args) {
+  content::BrowserThread::PostTask(
+      content::BrowserThread::IO, FROM_HERE,
+      base::Bind(&atom::URLRequestStreamJob::OnEnd, url_job_));
+}
+
+void StreamSubscriber::OnError(mate::Arguments* args) {
+  content::BrowserThread::PostTask(
+      content::BrowserThread::IO, FROM_HERE,
+      base::Bind(&atom::URLRequestStreamJob::OnError, url_job_));
+}
+
+void StreamSubscriber::RemoveAllListeners() {
+  v8::Locker locker(isolate_);
+  v8::Isolate::Scope isolate_scope(isolate_);
+  v8::HandleScope handle_scope(isolate_);
+  while (!js_handlers_.empty()) {
+    RemoveListener(js_handlers_.begin());
+  }
+}
+
+void StreamSubscriber::RemoveListener(JSHandlersMap::iterator it) {
+  internal::ValueVector args = { StringToV8(isolate_, it->first),
+                                 it->second.Get(isolate_) };
+  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_),
+                               "removeListener", &args);
+  js_handlers_.erase(it);
+}
+
+}  // namespace mate

+ 58 - 0
atom/browser/api/stream_subscriber.h

@@ -0,0 +1,58 @@
+// Copyright (c) 2017 GitHub, Inc.
+// Use of this source code is governed by the MIT license that can be
+// found in the LICENSE file.
+
+#ifndef ATOM_BROWSER_API_STREAM_SUBSCRIBER_H_
+#define ATOM_BROWSER_API_STREAM_SUBSCRIBER_H_
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "base/callback.h"
+#include "base/memory/weak_ptr.h"
+#include "content/public/browser/browser_thread.h"
+#include "v8/include/v8.h"
+
+namespace atom {
+class URLRequestStreamJob;
+}
+
+namespace mate {
+
+class Arguments;
+
+class StreamSubscriber {
+ public:
+  StreamSubscriber(v8::Isolate* isolate,
+                  v8::Local<v8::Object> emitter,
+                  base::WeakPtr<atom::URLRequestStreamJob> url_job);
+  ~StreamSubscriber();
+
+ private:
+  using JSHandlersMap = std::map<std::string, v8::Global<v8::Value>>;
+  using EventCallback = base::Callback<void(mate::Arguments* args)>;
+
+  void On(const std::string& event, EventCallback&& callback);  // NOLINT
+  void Off(const std::string& event);
+
+  void OnData(mate::Arguments* args);
+  void OnEnd(mate::Arguments* args);
+  void OnError(mate::Arguments* args);
+
+  void RemoveAllListeners();
+  void RemoveListener(JSHandlersMap::iterator it);
+
+  v8::Isolate* isolate_;
+  v8::Global<v8::Object> emitter_;
+  base::WeakPtr<atom::URLRequestStreamJob> url_job_;
+
+  JSHandlersMap js_handlers_;
+
+  base::WeakPtrFactory<StreamSubscriber> weak_factory_;
+};
+
+}  // namespace mate
+
+#endif  // ATOM_BROWSER_API_STREAM_SUBSCRIBER_H_

+ 64 - 74
atom/browser/net/url_request_stream_job.cc

@@ -17,6 +17,7 @@
 #include "base/strings/string_number_conversions.h"
 #include "base/strings/string_util.h"
 #include "base/time/time.h"
+#include "net/base/net_errors.h"
 #include "net/filter/gzip_source_stream.h"
 
 namespace atom {
@@ -24,16 +25,26 @@ namespace atom {
 URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request,
                                          net::NetworkDelegate* network_delegate)
     : JsAsker<net::URLRequestJob>(request, network_delegate),
+      pending_buf_(nullptr),
+      pending_buf_size_(0),
+      ended_(false),
+      has_error_(false),
+      response_headers_(nullptr),
       weak_factory_(this) {}
 
-URLRequestStreamJob::~URLRequestStreamJob() = default;
+URLRequestStreamJob::~URLRequestStreamJob() {
+  if (subscriber_) {
+    content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
+                                       std::move(subscriber_));
+  }
+}
 
 void URLRequestStreamJob::BeforeStartInUI(v8::Isolate* isolate,
                                           v8::Local<v8::Value> value) {
   if (value->IsNull() || value->IsUndefined() || !value->IsObject()) {
     // Invalid opts.
     ended_ = true;
-    errored_ = true;
+    has_error_ = true;
     return;
   }
 
@@ -71,107 +82,79 @@ void URLRequestStreamJob::BeforeStartInUI(v8::Isolate* isolate,
       !data.Get("removeListener", &value) || !value->IsFunction()) {
     // If data is passed but it is not a stream, signal an error.
     ended_ = true;
-    errored_ = true;
+    has_error_ = true;
     return;
   }
 
-  subscriber_.reset(new mate::EventSubscriber<URLRequestStreamJob>(
-      this, isolate, data.GetHandle()));
-  subscriber_->On("data", &URLRequestStreamJob::OnData);
-  subscriber_->On("end", &URLRequestStreamJob::OnEnd);
-  subscriber_->On("error", &URLRequestStreamJob::OnError);
+  subscriber_.reset(new mate::StreamSubscriber(isolate, data.GetHandle(),
+                                              weak_factory_.GetWeakPtr()));
 }
 
 void URLRequestStreamJob::StartAsync(std::unique_ptr<base::Value> options) {
+  if (has_error_) {
+    OnError();
+    return;
+  }
   NotifyHeadersComplete();
 }
 
-void URLRequestStreamJob::OnData(mate::Arguments* args) {
-  v8::Local<v8::Value> node_data;
-  args->GetNext(&node_data);
-  if (node_data->IsUint8Array()) {
-    const char* data = node::Buffer::Data(node_data);
-    size_t data_size = node::Buffer::Length(node_data);
-    std::copy(data, data + data_size, std::back_inserter(buffer_));
+void URLRequestStreamJob::OnData(std::vector<char>&& buffer) {  // NOLINT
+  if (write_buffer_.empty()) {
+    // Quick branch without copying.
+    write_buffer_ = std::move(buffer);
   } else {
-    NOTREACHED();
+    // write_buffer_ += buffer
+    size_t len = write_buffer_.size();
+    write_buffer_.resize(len + buffer.size());
+    std::copy(buffer.begin(), buffer.end(), write_buffer_.begin() + len);
   }
-  if (pending_io_buf_) {
-    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
+
+  // Copy to output.
+  if (pending_buf_) {
+    int len = BufferCopy(&write_buffer_, pending_buf_.get(), pending_buf_size_);
+    write_buffer_.erase(write_buffer_.begin(), write_buffer_.begin() + len);
+    ReadRawDataComplete(len);
   }
 }
 
-void URLRequestStreamJob::OnEnd(mate::Arguments* args) {
+void URLRequestStreamJob::OnEnd() {
   ended_ = true;
-  if (pending_io_buf_) {
-    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
-  }
+  ReadRawDataComplete(0);
 }
 
-void URLRequestStreamJob::OnError(mate::Arguments* args) {
-  errored_ = true;
-  if (pending_io_buf_) {
-    CopyMoreData(pending_io_buf_, pending_io_buf_size_);
-  }
+void URLRequestStreamJob::OnError() {
+  NotifyStartError(net::URLRequestStatus(net::URLRequestStatus::FAILED,
+                                         net::ERR_FAILED));
 }
 
 int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) {
-  content::BrowserThread::PostTask(
-      content::BrowserThread::UI, FROM_HERE,
-      base::BindOnce(&URLRequestStreamJob::CopyMoreData,
-                     weak_factory_.GetWeakPtr(), WrapRefCounted(dest),
-                     dest_size));
-  return net::ERR_IO_PENDING;
+  if (ended_)
+    return 0;
+
+  // When write_buffer_ is empty, there is no data valable yet, we have to save
+  // the dest buffer util DataAvailable.
+  if (write_buffer_.empty()) {
+    pending_buf_ = dest;
+    pending_buf_size_ = dest_size;
+    return net::ERR_IO_PENDING;
+  }
+
+  // Read from the write buffer and clear them after reading.
+  int len = BufferCopy(&write_buffer_, dest, dest_size);
+  write_buffer_.erase(write_buffer_.begin(), write_buffer_.begin() + len);
+  return len;
 }
 
 void URLRequestStreamJob::DoneReading() {
-  subscriber_.reset();
-  buffer_.clear();
-  ended_ = true;
+  content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
+                                     std::move(subscriber_));
+  write_buffer_.clear();
 }
 
 void URLRequestStreamJob::DoneReadingRedirectResponse() {
   DoneReading();
 }
 
-void URLRequestStreamJob::CopyMoreDataDone(scoped_refptr<net::IOBuffer> io_buf,
-                                           int status) {
-  if (status <= 0) {
-    subscriber_.reset();
-  }
-  ReadRawDataComplete(status);
-  io_buf = nullptr;
-}
-
-void URLRequestStreamJob::CopyMoreData(scoped_refptr<net::IOBuffer> io_buf,
-                                       int io_buf_size) {
-  // reset any instance references to io_buf
-  pending_io_buf_ = nullptr;
-  pending_io_buf_size_ = 0;
-
-  int read_count = 0;
-  if (buffer_.size()) {
-    size_t count = std::min((size_t)io_buf_size, buffer_.size());
-    std::copy(buffer_.begin(), buffer_.begin() + count, io_buf->data());
-    buffer_.erase(buffer_.begin(), buffer_.begin() + count);
-    read_count = count;
-  } else if (!ended_ && !errored_) {
-    // No data available yet, save references to the IOBuffer, which will be
-    // passed back to this function when OnData/OnEnd/OnError are called
-    pending_io_buf_ = io_buf;
-    pending_io_buf_size_ = io_buf_size;
-  }
-
-  if (!pending_io_buf_) {
-    // Only call CopyMoreDataDone if we have read something.
-    int status = (errored_ && !read_count) ? net::ERR_FAILED : read_count;
-    content::BrowserThread::PostTask(
-        content::BrowserThread::IO, FROM_HERE,
-        base::BindOnce(&URLRequestStreamJob::CopyMoreDataDone,
-                       weak_factory_.GetWeakPtr(), io_buf, status));
-  }
-}
-
 std::unique_ptr<net::SourceStream> URLRequestStreamJob::SetUpSourceStream() {
   std::unique_ptr<net::SourceStream> source =
       net::URLRequestJob::SetUpSourceStream();
@@ -202,4 +185,11 @@ void URLRequestStreamJob::GetResponseInfo(net::HttpResponseInfo* info) {
   info->headers = response_headers_;
 }
 
+int URLRequestStreamJob::BufferCopy(std::vector<char>* source,
+                                    net::IOBuffer* target, int target_size) {
+  int bytes_written = std::min(static_cast<int>(source->size()), target_size);
+  memcpy(target->data(), source->data(), bytes_written);
+  return bytes_written;
+}
+
 }  // namespace atom

+ 18 - 14
atom/browser/net/url_request_stream_job.h

@@ -5,11 +5,11 @@
 #ifndef ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_
 #define ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_
 
-#include <deque>
 #include <memory>
 #include <string>
+#include <vector>
 
-#include "atom/browser/api/event_subscriber.h"
+#include "atom/browser/api/stream_subscriber.h"
 #include "atom/browser/net/js_asker.h"
 #include "base/memory/ref_counted_memory.h"
 #include "native_mate/persistent_dictionary.h"
@@ -26,9 +26,9 @@ class URLRequestStreamJob : public JsAsker<net::URLRequestJob> {
                       net::NetworkDelegate* network_delegate);
   ~URLRequestStreamJob() override;
 
-  void OnData(mate::Arguments* args);
-  void OnEnd(mate::Arguments* args);
-  void OnError(mate::Arguments* args);
+  void OnData(std::vector<char>&& buffer);  // NOLINT
+  void OnEnd();
+  void OnError();
 
   // URLRequestJob
   void GetResponseInfo(net::HttpResponseInfo* info) override;
@@ -48,17 +48,21 @@ class URLRequestStreamJob : public JsAsker<net::URLRequestJob> {
   void StartAsync(std::unique_ptr<base::Value> options) override;
   void OnResponse(bool success, std::unique_ptr<base::Value> value);
 
-  // Callback after data is asynchronously read from the file into |buf|.
-  void CopyMoreData(scoped_refptr<net::IOBuffer> io_buf, int io_buf_size);
-  void CopyMoreDataDone(scoped_refptr<net::IOBuffer> io_buf, int read_count);
+  int BufferCopy(std::vector<char>* source,
+                 net::IOBuffer* target, int target_size);
 
-  std::deque<char> buffer_;
-  bool ended_ = false;
-  bool errored_ = false;
-  scoped_refptr<net::IOBuffer> pending_io_buf_;
-  int pending_io_buf_size_ = 0;
+  // Saved arguments passed to ReadRawData.
+  scoped_refptr<net::IOBuffer> pending_buf_;
+  int pending_buf_size_;
+
+  // Saved arguments passed to OnData.
+  std::vector<char> write_buffer_;
+
+  bool ended_;
+  bool has_error_;
   scoped_refptr<net::HttpResponseHeaders> response_headers_;
-  mate::EventSubscriber<URLRequestStreamJob>::SafePtr subscriber_;
+  std::unique_ptr<mate::StreamSubscriber> subscriber_;
+
   base::WeakPtrFactory<URLRequestStreamJob> weak_factory_;
 
   DISALLOW_COPY_AND_ASSIGN(URLRequestStreamJob);

+ 42 - 13
atom/common/native_mate_converters/callback.cc

@@ -4,6 +4,8 @@
 
 #include "atom/common/native_mate_converters/callback.h"
 
+#include "native_mate/dictionary.h"
+
 using content::BrowserThread;
 
 namespace mate {
@@ -13,6 +15,22 @@ namespace internal {
 namespace {
 
 struct TranslaterHolder {
+  explicit TranslaterHolder(v8::Isolate* isolate)
+      : handle(isolate, v8::External::New(isolate, this)) {
+    handle.SetWeak(this, &GC, v8::WeakCallbackType::kFinalizer);
+  }
+  ~TranslaterHolder() {
+    if (!handle.IsEmpty()) {
+      handle.ClearWeak();
+      handle.Reset();
+    }
+  }
+
+  static void GC(const v8::WeakCallbackInfo<TranslaterHolder>& data) {
+    delete data.GetParameter();
+  }
+
+  v8::Global<v8::External> handle;
   Translater translater;
 };
 
@@ -22,20 +40,27 @@ v8::Persistent<v8::FunctionTemplate> g_call_translater;
 void CallTranslater(v8::Local<v8::External> external,
                     v8::Local<v8::Object> state,
                     mate::Arguments* args) {
+  // Whether the callback should only be called for once.
   v8::Isolate* isolate = args->isolate();
+  bool one_time = state->Has(mate::StringToSymbol(isolate, "oneTime"));
 
   // Check if the callback has already been called.
-  v8::Local<v8::String> called_symbol = mate::StringToSymbol(isolate, "called");
-  if (state->Has(called_symbol)) {
-    args->ThrowError("callback can only be called for once");
-    return;
-  } else {
-    state->Set(called_symbol, v8::Boolean::New(isolate, true));
+  if (one_time) {
+    auto called_symbol = mate::StringToSymbol(isolate, "called");
+    if (state->Has(called_symbol)) {
+      args->ThrowError("callback can only be called for once");
+      return;
+    } else {
+      state->Set(called_symbol, v8::Boolean::New(isolate, true));
+    }
   }
 
   TranslaterHolder* holder = static_cast<TranslaterHolder*>(external->Value());
   holder->translater.Run(args);
-  delete holder;
+
+  // Free immediately for one-time callback.
+  if (one_time)
+    delete holder;
 }
 
 }  // namespace
@@ -90,8 +115,7 @@ v8::Local<v8::Function> SafeV8Function::NewHandle(v8::Isolate* isolate) const {
 }
 
 v8::Local<v8::Value> CreateFunctionFromTranslater(
-    v8::Isolate* isolate,
-    const Translater& translater) {
+    v8::Isolate* isolate, const Translater& translater, bool one_time) {
   // The FunctionTemplate is cached.
   if (g_call_translater.IsEmpty())
     g_call_translater.Reset(isolate, mate::CreateFunctionTemplate(
@@ -99,11 +123,16 @@ v8::Local<v8::Value> CreateFunctionFromTranslater(
 
   v8::Local<v8::FunctionTemplate> call_translater =
       v8::Local<v8::FunctionTemplate>::New(isolate, g_call_translater);
-  auto* holder = new TranslaterHolder;
+  auto* holder = new TranslaterHolder(isolate);
   holder->translater = translater;
-  return BindFunctionWith(
-      isolate, isolate->GetCurrentContext(), call_translater->GetFunction(),
-      v8::External::New(isolate, holder), v8::Object::New(isolate));
+  Dictionary state = mate::Dictionary::CreateEmpty(isolate);
+  if (one_time)
+    state.Set("oneTime", true);
+  return BindFunctionWith(isolate,
+                          isolate->GetCurrentContext(),
+                          call_translater->GetFunction(),
+                          holder->handle.Get(isolate),
+                          state.GetHandle());
 }
 
 // func.bind(func, arg1).

+ 17 - 5
atom/common/native_mate_converters/callback.h

@@ -108,8 +108,8 @@ struct V8FunctionInvoker<ReturnType(ArgTypes...)> {
 
 // Helper to pass a C++ funtion to JavaScript.
 using Translater = base::Callback<void(Arguments* args)>;
-v8::Local<v8::Value> CreateFunctionFromTranslater(v8::Isolate* isolate,
-                                                  const Translater& translater);
+v8::Local<v8::Value> CreateFunctionFromTranslater(
+    v8::Isolate* isolate, const Translater& translater, bool one_time);
 v8::Local<v8::Value> BindFunctionWith(v8::Isolate* isolate,
                                       v8::Local<v8::Context> context,
                                       v8::Local<v8::Function> func,
@@ -152,9 +152,11 @@ struct Converter<base::RepeatingCallback<Sig>> {
                                    const base::RepeatingCallback<Sig>& val) {
     // We don't use CreateFunctionTemplate here because it creates a new
     // FunctionTemplate everytime, which is cached by V8 and causes leaks.
-    internal::Translater translater =
-        base::BindRepeating(&internal::NativeFunctionInvoker<Sig>::Go, val);
-    return internal::CreateFunctionFromTranslater(isolate, translater);
+    internal::Translater translater = base::Bind(
+        &internal::NativeFunctionInvoker<Sig>::Go, val);
+    // To avoid memory leak, we ensure that the callback can only be called
+    // for once.
+    return internal::CreateFunctionFromTranslater(isolate, translater, true);
   }
   static bool FromV8(v8::Isolate* isolate,
                      v8::Local<v8::Value> val,
@@ -168,6 +170,16 @@ struct Converter<base::RepeatingCallback<Sig>> {
   }
 };
 
+// Convert a callback to V8 without the call number limitation, this can easily
+// cause memory leaks so use it with caution.
+template<typename Sig>
+v8::Local<v8::Value> CallbackToV8(v8::Isolate* isolate,
+                                  const base::Callback<Sig>& val) {
+  internal::Translater translater = base::Bind(
+      &internal::NativeFunctionInvoker<Sig>::Go, val);
+  return internal::CreateFunctionFromTranslater(isolate, translater, false);
+}
+
 }  // namespace mate
 
 #endif  // ATOM_COMMON_NATIVE_MATE_CONVERTERS_CALLBACK_H_

+ 2 - 2
filenames.gni

@@ -185,8 +185,8 @@ filenames = {
     "atom/browser/api/event.h",
     "atom/browser/api/event_emitter.cc",
     "atom/browser/api/event_emitter.h",
-    "atom/browser/api/event_subscriber.cc",
-    "atom/browser/api/event_subscriber.h",
+    "atom/browser/api/stream_subscriber.cc",
+    "atom/browser/api/stream_subscriber.h",
     "atom/browser/api/trackable_object.cc",
     "atom/browser/api/trackable_object.h",
     "atom/browser/api/frame_subscriber.cc",