message_handler_spec.js 10 KB


  1. /**
  2. * @licstart The following is the entire license notice for the
  3. * Javascript code in this page
  4. *
  5. * Copyright 2021 Mozilla Foundation
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License");
  8. * you may not use this file except in compliance with the License.
  9. * You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS,
  15. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. * See the License for the specific language governing permissions and
  17. * limitations under the License.
  18. *
  19. * @licend The above is the entire license notice for the
  20. * Javascript code in this page
  21. */
  22. "use strict";
  23. var _util = require("../../shared/util.js");
  24. var _api = require("../../display/api.js");
  25. var _message_handler = require("../../shared/message_handler.js");
  26. describe("message_handler", function () {
  27. function sleep(ticks) {
  28. return Promise.resolve().then(() => {
  29. return ticks && sleep(ticks - 1);
  30. });
  31. }
  32. describe("sendWithStream", function () {
  33. it("should return a ReadableStream", function () {
  34. const port = new _api.LoopbackPort();
  35. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  36. const readable = messageHandler1.sendWithStream("fakeHandler");
  37. expect(typeof readable).toEqual("object");
  38. expect(typeof readable.getReader).toEqual("function");
  39. });
  40. it("should read using a reader", async function () {
  41. let log = "";
  42. const port = new _api.LoopbackPort();
  43. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  44. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  45. messageHandler2.on("fakeHandler", (data, sink) => {
  46. sink.onPull = function () {
  47. log += "p";
  48. };
  49. sink.onCancel = function (reason) {
  50. log += "c";
  51. };
  52. sink.ready.then(() => {
  53. sink.enqueue("hi");
  54. return sink.ready;
  55. }).then(() => {
  56. sink.close();
  57. });
  58. return sleep(5);
  59. });
  60. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  61. highWaterMark: 1,
  62. size() {
  63. return 1;
  64. }
  65. });
  66. const reader = readable.getReader();
  67. await sleep(10);
  68. expect(log).toEqual("");
  69. let result = await reader.read();
  70. expect(log).toEqual("p");
  71. expect(result.value).toEqual("hi");
  72. expect(result.done).toEqual(false);
  73. await sleep(10);
  74. result = await reader.read();
  75. expect(result.value).toEqual(undefined);
  76. expect(result.done).toEqual(true);
  77. });
  78. it("should not read any data when cancelled", async function () {
  79. let log = "";
  80. const port = new _api.LoopbackPort();
  81. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  82. messageHandler2.on("fakeHandler", (data, sink) => {
  83. sink.onPull = function () {
  84. log += "p";
  85. };
  86. sink.onCancel = function (reason) {
  87. log += "c";
  88. };
  89. log += "0";
  90. sink.ready.then(() => {
  91. log += "1";
  92. sink.enqueue([1, 2, 3, 4], 4);
  93. return sink.ready;
  94. }).then(() => {
  95. log += "2";
  96. sink.enqueue([5, 6, 7, 8], 4);
  97. return sink.ready;
  98. }).then(() => {
  99. log += "3";
  100. sink.close();
  101. }, () => {
  102. log += "4";
  103. });
  104. });
  105. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  106. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  107. highWaterMark: 4,
  108. size(arr) {
  109. return arr.length;
  110. }
  111. });
  112. const reader = readable.getReader();
  113. await sleep(10);
  114. expect(log).toEqual("01");
  115. const result = await reader.read();
  116. expect(result.value).toEqual([1, 2, 3, 4]);
  117. expect(result.done).toEqual(false);
  118. await sleep(10);
  119. expect(log).toEqual("01p2");
  120. await reader.cancel(new _util.AbortException("reader cancelled."));
  121. expect(log).toEqual("01p2c4");
  122. });
  123. it("should not read when errored", async function () {
  124. let log = "";
  125. const port = new _api.LoopbackPort();
  126. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  127. messageHandler2.on("fakeHandler", (data, sink) => {
  128. sink.onPull = function () {
  129. log += "p";
  130. };
  131. sink.onCancel = function (reason) {
  132. log += "c";
  133. };
  134. log += "0";
  135. sink.ready.then(() => {
  136. log += "1";
  137. sink.enqueue([1, 2, 3, 4], 4);
  138. return sink.ready;
  139. }).then(() => {
  140. log += "e";
  141. sink.error(new Error("should not read when errored"));
  142. });
  143. });
  144. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  145. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  146. highWaterMark: 4,
  147. size(arr) {
  148. return arr.length;
  149. }
  150. });
  151. const reader = readable.getReader();
  152. await sleep(10);
  153. expect(log).toEqual("01");
  154. const result = await reader.read();
  155. expect(result.value).toEqual([1, 2, 3, 4]);
  156. expect(result.done).toEqual(false);
  157. try {
  158. await reader.read();
  159. expect(false).toEqual(true);
  160. } catch (reason) {
  161. expect(log).toEqual("01pe");
  162. expect(reason instanceof _util.UnknownErrorException).toEqual(true);
  163. expect(reason.message).toEqual("should not read when errored");
  164. }
  165. });
  166. it("should read data with blocking promise", async function () {
  167. let log = "";
  168. const port = new _api.LoopbackPort();
  169. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  170. messageHandler2.on("fakeHandler", (data, sink) => {
  171. sink.onPull = function () {
  172. log += "p";
  173. };
  174. sink.onCancel = function (reason) {
  175. log += "c";
  176. };
  177. log += "0";
  178. sink.ready.then(() => {
  179. log += "1";
  180. sink.enqueue([1, 2, 3, 4], 4);
  181. return sink.ready;
  182. }).then(() => {
  183. log += "2";
  184. sink.enqueue([5, 6, 7, 8], 4);
  185. return sink.ready;
  186. }).then(() => {
  187. sink.close();
  188. });
  189. });
  190. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  191. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  192. highWaterMark: 4,
  193. size(arr) {
  194. return arr.length;
  195. }
  196. });
  197. const reader = readable.getReader();
  198. await sleep(10);
  199. expect(log).toEqual("01");
  200. let result = await reader.read();
  201. expect(result.value).toEqual([1, 2, 3, 4]);
  202. expect(result.done).toEqual(false);
  203. await sleep(10);
  204. expect(log).toEqual("01p2");
  205. result = await reader.read();
  206. expect(result.value).toEqual([5, 6, 7, 8]);
  207. expect(result.done).toEqual(false);
  208. await sleep(10);
  209. expect(log).toEqual("01p2p");
  210. result = await reader.read();
  211. expect(result.value).toEqual(undefined);
  212. expect(result.done).toEqual(true);
  213. });
  214. it("should read data with blocking promise and buffer whole data" + " into stream", async function () {
  215. let log = "";
  216. const port = new _api.LoopbackPort();
  217. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  218. messageHandler2.on("fakeHandler", (data, sink) => {
  219. sink.onPull = function () {
  220. log += "p";
  221. };
  222. sink.onCancel = function (reason) {
  223. log += "c";
  224. };
  225. log += "0";
  226. sink.ready.then(() => {
  227. log += "1";
  228. sink.enqueue([1, 2, 3, 4], 4);
  229. return sink.ready;
  230. }).then(() => {
  231. log += "2";
  232. sink.enqueue([5, 6, 7, 8], 4);
  233. return sink.ready;
  234. }).then(() => {
  235. sink.close();
  236. });
  237. return sleep(10);
  238. });
  239. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  240. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  241. highWaterMark: 8,
  242. size(arr) {
  243. return arr.length;
  244. }
  245. });
  246. const reader = readable.getReader();
  247. await sleep(10);
  248. expect(log).toEqual("012");
  249. let result = await reader.read();
  250. expect(result.value).toEqual([1, 2, 3, 4]);
  251. expect(result.done).toEqual(false);
  252. await sleep(10);
  253. expect(log).toEqual("012p");
  254. result = await reader.read();
  255. expect(result.value).toEqual([5, 6, 7, 8]);
  256. expect(result.done).toEqual(false);
  257. await sleep(10);
  258. expect(log).toEqual("012p");
  259. result = await reader.read();
  260. expect(result.value).toEqual(undefined);
  261. expect(result.done).toEqual(true);
  262. });
  263. it("should ignore any pull after close is called", async function () {
  264. let log = "";
  265. const port = new _api.LoopbackPort();
  266. const capability = (0, _util.createPromiseCapability)();
  267. const messageHandler2 = new _message_handler.MessageHandler("worker", "main", port);
  268. messageHandler2.on("fakeHandler", (data, sink) => {
  269. sink.onPull = function () {
  270. log += "p";
  271. };
  272. sink.onCancel = function (reason) {
  273. log += "c";
  274. };
  275. log += "0";
  276. sink.ready.then(() => {
  277. log += "1";
  278. sink.enqueue([1, 2, 3, 4], 4);
  279. });
  280. return capability.promise.then(() => {
  281. sink.close();
  282. });
  283. });
  284. const messageHandler1 = new _message_handler.MessageHandler("main", "worker", port);
  285. const readable = messageHandler1.sendWithStream("fakeHandler", {}, {
  286. highWaterMark: 10,
  287. size(arr) {
  288. return arr.length;
  289. }
  290. });
  291. const reader = readable.getReader();
  292. await sleep(10);
  293. expect(log).toEqual("01");
  294. capability.resolve();
  295. await capability.promise;
  296. let result = await reader.read();
  297. expect(result.value).toEqual([1, 2, 3, 4]);
  298. expect(result.done).toEqual(false);
  299. await sleep(10);
  300. expect(log).toEqual("01");
  301. result = await reader.read();
  302. expect(result.value).toEqual(undefined);
  303. expect(result.done).toEqual(true);
  304. });
  305. });
  306. });