stream_subscriber.cc 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. // Copyright (c) 2017 GitHub, Inc.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. #include "atom/browser/api/stream_subscriber.h"
  5. #include <string>
  6. #include "atom/browser/net/url_request_stream_job.h"
  7. #include "atom/common/api/event_emitter_caller.h"
  8. #include "atom/common/native_mate_converters/callback.h"
  9. #include "atom/common/node_includes.h"
  10. #include "base/task/post_task.h"
  11. #include "content/public/browser/browser_task_traits.h"
  12. namespace mate {
  13. StreamSubscriber::StreamSubscriber(
  14. v8::Isolate* isolate,
  15. v8::Local<v8::Object> emitter,
  16. base::WeakPtr<atom::URLRequestStreamJob> url_job,
  17. scoped_refptr<base::SequencedTaskRunner> ui_task_runner)
  18. : base::RefCountedDeleteOnSequence<StreamSubscriber>(ui_task_runner),
  19. isolate_(isolate),
  20. emitter_(isolate, emitter),
  21. url_job_(url_job),
  22. weak_factory_(this) {
  23. DCHECK(ui_task_runner->RunsTasksInCurrentSequence());
  24. auto weak_self = weak_factory_.GetWeakPtr();
  25. On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
  26. On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
  27. On("error", base::Bind(&StreamSubscriber::OnError, weak_self));
  28. }
  29. StreamSubscriber::~StreamSubscriber() {
  30. RemoveAllListeners();
  31. }
  32. void StreamSubscriber::On(const std::string& event,
  33. EventCallback&& callback) { // NOLINT
  34. DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  35. DCHECK(js_handlers_.find(event) == js_handlers_.end());
  36. v8::Locker locker(isolate_);
  37. v8::Isolate::Scope isolate_scope(isolate_);
  38. v8::HandleScope handle_scope(isolate_);
  39. // emitter.on(event, EventEmitted)
  40. auto fn = CallbackToV8(isolate_, callback);
  41. js_handlers_[event] = v8::Global<v8::Value>(isolate_, fn);
  42. internal::ValueVector args = {StringToV8(isolate_, event), fn};
  43. internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on", &args);
  44. }
  45. void StreamSubscriber::Off(const std::string& event) {
  46. DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  47. DCHECK(js_handlers_.find(event) != js_handlers_.end());
  48. v8::Locker locker(isolate_);
  49. v8::Isolate::Scope isolate_scope(isolate_);
  50. v8::HandleScope handle_scope(isolate_);
  51. auto js_handler = js_handlers_.find(event);
  52. DCHECK(js_handler != js_handlers_.end());
  53. RemoveListener(js_handler);
  54. }
  55. void StreamSubscriber::OnData(mate::Arguments* args) {
  56. v8::Local<v8::Value> buf;
  57. args->GetNext(&buf);
  58. if (!node::Buffer::HasInstance(buf)) {
  59. args->ThrowError("data must be Buffer");
  60. return;
  61. }
  62. const char* data = node::Buffer::Data(buf);
  63. size_t length = node::Buffer::Length(buf);
  64. if (length == 0)
  65. return;
  66. // Pass the data to the URLJob in IO thread.
  67. std::vector<char> buffer(data, data + length);
  68. base::PostTaskWithTraits(FROM_HERE, {content::BrowserThread::IO},
  69. base::Bind(&atom::URLRequestStreamJob::OnData,
  70. url_job_, base::Passed(&buffer)));
  71. }
  72. void StreamSubscriber::OnEnd(mate::Arguments* args) {
  73. base::PostTaskWithTraits(
  74. FROM_HERE, {content::BrowserThread::IO},
  75. base::Bind(&atom::URLRequestStreamJob::OnEnd, url_job_));
  76. }
  77. void StreamSubscriber::OnError(mate::Arguments* args) {
  78. base::PostTaskWithTraits(FROM_HERE, {content::BrowserThread::IO},
  79. base::Bind(&atom::URLRequestStreamJob::OnError,
  80. url_job_, net::ERR_FAILED));
  81. }
  82. void StreamSubscriber::RemoveAllListeners() {
  83. DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  84. v8::Locker locker(isolate_);
  85. v8::Isolate::Scope isolate_scope(isolate_);
  86. v8::HandleScope handle_scope(isolate_);
  87. while (!js_handlers_.empty()) {
  88. RemoveListener(js_handlers_.begin());
  89. }
  90. }
  91. void StreamSubscriber::RemoveListener(JSHandlersMap::iterator it) {
  92. internal::ValueVector args = {StringToV8(isolate_, it->first),
  93. it->second.Get(isolate_)};
  94. internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_),
  95. "removeListener", &args);
  96. js_handlers_.erase(it);
  97. }
  98. } // namespace mate