Browse Source

fix: make StreamSubscriber ref counted (#17267)

It is owned by URLRequestStreamJob on the IO thread once request starts,
but if the ownership was abondoned while transfering it to IO thread
which is possible when a request is aborted, then we need to make sure
its destroyed on the right thread to avoid lock in v8.
Robo 6 years ago
parent
commit
8fd91cc35b

+ 9 - 6
atom/browser/api/stream_subscriber.cc

@@ -17,12 +17,15 @@ namespace mate {
 StreamSubscriber::StreamSubscriber(
     v8::Isolate* isolate,
     v8::Local<v8::Object> emitter,
-    base::WeakPtr<atom::URLRequestStreamJob> url_job)
-    : isolate_(isolate),
+    base::WeakPtr<atom::URLRequestStreamJob> url_job,
+    scoped_refptr<base::SequencedTaskRunner> ui_task_runner)
+    : base::RefCountedDeleteOnSequence<StreamSubscriber>(ui_task_runner),
+      isolate_(isolate),
       emitter_(isolate, emitter),
       url_job_(url_job),
       weak_factory_(this) {
-  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  DCHECK(ui_task_runner->RunsTasksInCurrentSequence());
+
   auto weak_self = weak_factory_.GetWeakPtr();
   On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
   On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
@@ -30,13 +33,12 @@ StreamSubscriber::StreamSubscriber(
 }
 
 StreamSubscriber::~StreamSubscriber() {
-  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
   RemoveAllListeners();
 }
 
 void StreamSubscriber::On(const std::string& event,
                           EventCallback&& callback) {  // NOLINT
-  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
   DCHECK(js_handlers_.find(event) == js_handlers_.end());
 
   v8::Locker locker(isolate_);
@@ -50,7 +52,7 @@ void StreamSubscriber::On(const std::string& event,
 }
 
 void StreamSubscriber::Off(const std::string& event) {
-  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
+  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
   DCHECK(js_handlers_.find(event) != js_handlers_.end());
 
   v8::Locker locker(isolate_);
@@ -96,6 +98,7 @@ void StreamSubscriber::OnError(mate::Arguments* args) {
 }
 
 void StreamSubscriber::RemoveAllListeners() {
+  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
   v8::Locker locker(isolate_);
   v8::Isolate::Scope isolate_scope(isolate_);
   v8::HandleScope handle_scope(isolate_);

+ 13 - 3
atom/browser/api/stream_subscriber.h

@@ -11,6 +11,8 @@
 #include <vector>
 
 #include "base/callback.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/ref_counted_delete_on_sequence.h"
 #include "base/memory/weak_ptr.h"
 #include "content/public/browser/browser_thread.h"
 #include "v8/include/v8.h"
@@ -23,17 +25,25 @@ namespace mate {
 
 class Arguments;
 
-class StreamSubscriber {
+class StreamSubscriber
+    : public base::RefCountedDeleteOnSequence<StreamSubscriber> {
  public:
+  REQUIRE_ADOPTION_FOR_REFCOUNTED_TYPE();
+
   StreamSubscriber(v8::Isolate* isolate,
                    v8::Local<v8::Object> emitter,
-                   base::WeakPtr<atom::URLRequestStreamJob> url_job);
-  ~StreamSubscriber();
+                   base::WeakPtr<atom::URLRequestStreamJob> url_job,
+                   scoped_refptr<base::SequencedTaskRunner> ui_task_runner);
 
  private:
+  friend class base::DeleteHelper<StreamSubscriber>;
+  friend class base::RefCountedDeleteOnSequence<StreamSubscriber>;
+
   using JSHandlersMap = std::map<std::string, v8::Global<v8::Value>>;
   using EventCallback = base::Callback<void(mate::Arguments* args)>;
 
+  ~StreamSubscriber();
+
   void On(const std::string& event, EventCallback&& callback);  // NOLINT
   void Off(const std::string& event);
 

+ 14 - 14
atom/browser/net/url_request_stream_job.cc

@@ -13,14 +13,16 @@
 #include "atom/common/api/event_emitter_caller.h"
 #include "atom/common/atom_constants.h"
 #include "atom/common/native_mate_converters/net_converter.h"
-#include "atom/common/node_includes.h"
 #include "base/strings/string_number_conversions.h"
 #include "base/strings/string_util.h"
+#include "base/threading/thread_task_runner_handle.h"
 #include "base/time/time.h"
 #include "native_mate/dictionary.h"
 #include "net/base/net_errors.h"
 #include "net/filter/gzip_source_stream.h"
 
+#include "atom/common/node_includes.h"
+
 namespace atom {
 
 namespace {
@@ -82,14 +84,14 @@ void BeforeStartInUI(base::WeakPtr<URLRequestStreamJob> job,
     return;
   }
 
-  auto subscriber = std::make_unique<mate::StreamSubscriber>(
-      args->isolate(), data.GetHandle(), job);
+  auto subscriber = base::MakeRefCounted<mate::StreamSubscriber>(
+      args->isolate(), data.GetHandle(), job,
+      base::ThreadTaskRunnerHandle::Get());
 
   content::BrowserThread::PostTask(
       content::BrowserThread::IO, FROM_HERE,
-      base::BindOnce(&URLRequestStreamJob::StartAsync, job,
-                     std::move(subscriber), base::RetainedRef(response_headers),
-                     ended, error));
+      base::BindOnce(&URLRequestStreamJob::StartAsync, job, subscriber,
+                     base::RetainedRef(response_headers), ended, error));
 }
 
 }  // namespace
@@ -104,10 +106,7 @@ URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request,
       weak_factory_(this) {}
 
 URLRequestStreamJob::~URLRequestStreamJob() {
-  if (subscriber_) {
-    content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
-                                       std::move(subscriber_));
-  }
+  DCHECK(!subscriber_ || subscriber_->HasOneRef());
 }
 
 void URLRequestStreamJob::Start() {
@@ -121,7 +120,7 @@ void URLRequestStreamJob::Start() {
 }
 
 void URLRequestStreamJob::StartAsync(
-    std::unique_ptr<mate::StreamSubscriber> subscriber,
+    scoped_refptr<mate::StreamSubscriber> subscriber,
     scoped_refptr<net::HttpResponseHeaders> response_headers,
     bool ended,
     int error) {
@@ -133,7 +132,7 @@ void URLRequestStreamJob::StartAsync(
 
   ended_ = ended;
   response_headers_ = response_headers;
-  subscriber_ = std::move(subscriber);
+  subscriber_ = subscriber;
   request_start_time_ = base::TimeTicks::Now();
   NotifyHeadersComplete();
 }
@@ -192,12 +191,13 @@ int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) {
 }
 
 void URLRequestStreamJob::DoneReading() {
-  content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
-                                     std::move(subscriber_));
   write_buffer_.clear();
 }
 
 void URLRequestStreamJob::DoneReadingRedirectResponse() {
+  if (subscriber_) {
+    subscriber_ = nullptr;
+  }
   DoneReading();
 }
 

+ 2 - 2
atom/browser/net/url_request_stream_job.h

@@ -24,7 +24,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
                       net::NetworkDelegate* network_delegate);
   ~URLRequestStreamJob() override;
 
-  void StartAsync(std::unique_ptr<mate::StreamSubscriber> subscriber,
+  void StartAsync(scoped_refptr<mate::StreamSubscriber> subscriber,
                   scoped_refptr<net::HttpResponseHeaders> response_headers,
                   bool ended,
                   int error);
@@ -62,7 +62,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
   base::TimeTicks request_start_time_;
   base::TimeTicks response_start_time_;
   scoped_refptr<net::HttpResponseHeaders> response_headers_;
-  std::unique_ptr<mate::StreamSubscriber> subscriber_;
+  scoped_refptr<mate::StreamSubscriber> subscriber_;
 
   base::WeakPtrFactory<URLRequestStreamJob> weak_factory_;