message_handler.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /**
  2. * @licstart The following is the entire license notice for the
  3. * Javascript code in this page
  4. *
  5. * Copyright 2021 Mozilla Foundation
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License");
  8. * you may not use this file except in compliance with the License.
  9. * You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS,
  15. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. * See the License for the specific language governing permissions and
  17. * limitations under the License.
  18. *
  19. * @licend The above is the entire license notice for the
  20. * Javascript code in this page
  21. */
  22. "use strict";
  23. Object.defineProperty(exports, "__esModule", {
  24. value: true
  25. });
  26. exports.MessageHandler = void 0;
  27. var _util = require("./util.js");
  28. const CallbackKind = {
  29. UNKNOWN: 0,
  30. DATA: 1,
  31. ERROR: 2
  32. };
  33. const StreamKind = {
  34. UNKNOWN: 0,
  35. CANCEL: 1,
  36. CANCEL_COMPLETE: 2,
  37. CLOSE: 3,
  38. ENQUEUE: 4,
  39. ERROR: 5,
  40. PULL: 6,
  41. PULL_COMPLETE: 7,
  42. START_COMPLETE: 8
  43. };
  44. function wrapReason(reason) {
  45. if (typeof reason !== "object" || reason === null) {
  46. return reason;
  47. }
  48. switch (reason.name) {
  49. case "AbortException":
  50. return new _util.AbortException(reason.message);
  51. case "MissingPDFException":
  52. return new _util.MissingPDFException(reason.message);
  53. case "UnexpectedResponseException":
  54. return new _util.UnexpectedResponseException(reason.message, reason.status);
  55. case "UnknownErrorException":
  56. return new _util.UnknownErrorException(reason.message, reason.details);
  57. default:
  58. return new _util.UnknownErrorException(reason.message, reason.toString());
  59. }
  60. }
  61. class MessageHandler {
  62. constructor(sourceName, targetName, comObj) {
  63. this.sourceName = sourceName;
  64. this.targetName = targetName;
  65. this.comObj = comObj;
  66. this.callbackId = 1;
  67. this.streamId = 1;
  68. this.postMessageTransfers = true;
  69. this.streamSinks = Object.create(null);
  70. this.streamControllers = Object.create(null);
  71. this.callbackCapabilities = Object.create(null);
  72. this.actionHandler = Object.create(null);
  73. this._onComObjOnMessage = event => {
  74. const data = event.data;
  75. if (data.targetName !== this.sourceName) {
  76. return;
  77. }
  78. if (data.stream) {
  79. this._processStreamMessage(data);
  80. return;
  81. }
  82. if (data.callback) {
  83. const callbackId = data.callbackId;
  84. const capability = this.callbackCapabilities[callbackId];
  85. if (!capability) {
  86. throw new Error(`Cannot resolve callback ${callbackId}`);
  87. }
  88. delete this.callbackCapabilities[callbackId];
  89. if (data.callback === CallbackKind.DATA) {
  90. capability.resolve(data.data);
  91. } else if (data.callback === CallbackKind.ERROR) {
  92. capability.reject(wrapReason(data.reason));
  93. } else {
  94. throw new Error("Unexpected callback case");
  95. }
  96. return;
  97. }
  98. const action = this.actionHandler[data.action];
  99. if (!action) {
  100. throw new Error(`Unknown action from worker: ${data.action}`);
  101. }
  102. if (data.callbackId) {
  103. const cbSourceName = this.sourceName;
  104. const cbTargetName = data.sourceName;
  105. new Promise(function (resolve) {
  106. resolve(action(data.data));
  107. }).then(function (result) {
  108. comObj.postMessage({
  109. sourceName: cbSourceName,
  110. targetName: cbTargetName,
  111. callback: CallbackKind.DATA,
  112. callbackId: data.callbackId,
  113. data: result
  114. });
  115. }, function (reason) {
  116. comObj.postMessage({
  117. sourceName: cbSourceName,
  118. targetName: cbTargetName,
  119. callback: CallbackKind.ERROR,
  120. callbackId: data.callbackId,
  121. reason: wrapReason(reason)
  122. });
  123. });
  124. return;
  125. }
  126. if (data.streamId) {
  127. this._createStreamSink(data);
  128. return;
  129. }
  130. action(data.data);
  131. };
  132. comObj.addEventListener("message", this._onComObjOnMessage);
  133. }
  134. on(actionName, handler) {
  135. const ah = this.actionHandler;
  136. if (ah[actionName]) {
  137. throw new Error(`There is already an actionName called "${actionName}"`);
  138. }
  139. ah[actionName] = handler;
  140. }
  141. send(actionName, data, transfers) {
  142. this._postMessage({
  143. sourceName: this.sourceName,
  144. targetName: this.targetName,
  145. action: actionName,
  146. data
  147. }, transfers);
  148. }
  149. sendWithPromise(actionName, data, transfers) {
  150. const callbackId = this.callbackId++;
  151. const capability = (0, _util.createPromiseCapability)();
  152. this.callbackCapabilities[callbackId] = capability;
  153. try {
  154. this._postMessage({
  155. sourceName: this.sourceName,
  156. targetName: this.targetName,
  157. action: actionName,
  158. callbackId,
  159. data
  160. }, transfers);
  161. } catch (ex) {
  162. capability.reject(ex);
  163. }
  164. return capability.promise;
  165. }
  166. sendWithStream(actionName, data, queueingStrategy, transfers) {
  167. const streamId = this.streamId++;
  168. const sourceName = this.sourceName;
  169. const targetName = this.targetName;
  170. const comObj = this.comObj;
  171. return new ReadableStream({
  172. start: controller => {
  173. const startCapability = (0, _util.createPromiseCapability)();
  174. this.streamControllers[streamId] = {
  175. controller,
  176. startCall: startCapability,
  177. pullCall: null,
  178. cancelCall: null,
  179. isClosed: false
  180. };
  181. this._postMessage({
  182. sourceName,
  183. targetName,
  184. action: actionName,
  185. streamId,
  186. data,
  187. desiredSize: controller.desiredSize
  188. }, transfers);
  189. return startCapability.promise;
  190. },
  191. pull: controller => {
  192. const pullCapability = (0, _util.createPromiseCapability)();
  193. this.streamControllers[streamId].pullCall = pullCapability;
  194. comObj.postMessage({
  195. sourceName,
  196. targetName,
  197. stream: StreamKind.PULL,
  198. streamId,
  199. desiredSize: controller.desiredSize
  200. });
  201. return pullCapability.promise;
  202. },
  203. cancel: reason => {
  204. (0, _util.assert)(reason instanceof Error, "cancel must have a valid reason");
  205. const cancelCapability = (0, _util.createPromiseCapability)();
  206. this.streamControllers[streamId].cancelCall = cancelCapability;
  207. this.streamControllers[streamId].isClosed = true;
  208. comObj.postMessage({
  209. sourceName,
  210. targetName,
  211. stream: StreamKind.CANCEL,
  212. streamId,
  213. reason: wrapReason(reason)
  214. });
  215. return cancelCapability.promise;
  216. }
  217. }, queueingStrategy);
  218. }
  219. _createStreamSink(data) {
  220. const self = this;
  221. const action = this.actionHandler[data.action];
  222. const streamId = data.streamId;
  223. const sourceName = this.sourceName;
  224. const targetName = data.sourceName;
  225. const comObj = this.comObj;
  226. const streamSink = {
  227. enqueue(chunk, size = 1, transfers) {
  228. if (this.isCancelled) {
  229. return;
  230. }
  231. const lastDesiredSize = this.desiredSize;
  232. this.desiredSize -= size;
  233. if (lastDesiredSize > 0 && this.desiredSize <= 0) {
  234. this.sinkCapability = (0, _util.createPromiseCapability)();
  235. this.ready = this.sinkCapability.promise;
  236. }
  237. self._postMessage({
  238. sourceName,
  239. targetName,
  240. stream: StreamKind.ENQUEUE,
  241. streamId,
  242. chunk
  243. }, transfers);
  244. },
  245. close() {
  246. if (this.isCancelled) {
  247. return;
  248. }
  249. this.isCancelled = true;
  250. comObj.postMessage({
  251. sourceName,
  252. targetName,
  253. stream: StreamKind.CLOSE,
  254. streamId
  255. });
  256. delete self.streamSinks[streamId];
  257. },
  258. error(reason) {
  259. (0, _util.assert)(reason instanceof Error, "error must have a valid reason");
  260. if (this.isCancelled) {
  261. return;
  262. }
  263. this.isCancelled = true;
  264. comObj.postMessage({
  265. sourceName,
  266. targetName,
  267. stream: StreamKind.ERROR,
  268. streamId,
  269. reason: wrapReason(reason)
  270. });
  271. },
  272. sinkCapability: (0, _util.createPromiseCapability)(),
  273. onPull: null,
  274. onCancel: null,
  275. isCancelled: false,
  276. desiredSize: data.desiredSize,
  277. ready: null
  278. };
  279. streamSink.sinkCapability.resolve();
  280. streamSink.ready = streamSink.sinkCapability.promise;
  281. this.streamSinks[streamId] = streamSink;
  282. new Promise(function (resolve) {
  283. resolve(action(data.data, streamSink));
  284. }).then(function () {
  285. comObj.postMessage({
  286. sourceName,
  287. targetName,
  288. stream: StreamKind.START_COMPLETE,
  289. streamId,
  290. success: true
  291. });
  292. }, function (reason) {
  293. comObj.postMessage({
  294. sourceName,
  295. targetName,
  296. stream: StreamKind.START_COMPLETE,
  297. streamId,
  298. reason: wrapReason(reason)
  299. });
  300. });
  301. }
  302. _processStreamMessage(data) {
  303. const streamId = data.streamId;
  304. const sourceName = this.sourceName;
  305. const targetName = data.sourceName;
  306. const comObj = this.comObj;
  307. switch (data.stream) {
  308. case StreamKind.START_COMPLETE:
  309. if (data.success) {
  310. this.streamControllers[streamId].startCall.resolve();
  311. } else {
  312. this.streamControllers[streamId].startCall.reject(wrapReason(data.reason));
  313. }
  314. break;
  315. case StreamKind.PULL_COMPLETE:
  316. if (data.success) {
  317. this.streamControllers[streamId].pullCall.resolve();
  318. } else {
  319. this.streamControllers[streamId].pullCall.reject(wrapReason(data.reason));
  320. }
  321. break;
  322. case StreamKind.PULL:
  323. if (!this.streamSinks[streamId]) {
  324. comObj.postMessage({
  325. sourceName,
  326. targetName,
  327. stream: StreamKind.PULL_COMPLETE,
  328. streamId,
  329. success: true
  330. });
  331. break;
  332. }
  333. if (this.streamSinks[streamId].desiredSize <= 0 && data.desiredSize > 0) {
  334. this.streamSinks[streamId].sinkCapability.resolve();
  335. }
  336. this.streamSinks[streamId].desiredSize = data.desiredSize;
  337. const {
  338. onPull
  339. } = this.streamSinks[data.streamId];
  340. new Promise(function (resolve) {
  341. resolve(onPull && onPull());
  342. }).then(function () {
  343. comObj.postMessage({
  344. sourceName,
  345. targetName,
  346. stream: StreamKind.PULL_COMPLETE,
  347. streamId,
  348. success: true
  349. });
  350. }, function (reason) {
  351. comObj.postMessage({
  352. sourceName,
  353. targetName,
  354. stream: StreamKind.PULL_COMPLETE,
  355. streamId,
  356. reason: wrapReason(reason)
  357. });
  358. });
  359. break;
  360. case StreamKind.ENQUEUE:
  361. (0, _util.assert)(this.streamControllers[streamId], "enqueue should have stream controller");
  362. if (this.streamControllers[streamId].isClosed) {
  363. break;
  364. }
  365. this.streamControllers[streamId].controller.enqueue(data.chunk);
  366. break;
  367. case StreamKind.CLOSE:
  368. (0, _util.assert)(this.streamControllers[streamId], "close should have stream controller");
  369. if (this.streamControllers[streamId].isClosed) {
  370. break;
  371. }
  372. this.streamControllers[streamId].isClosed = true;
  373. this.streamControllers[streamId].controller.close();
  374. this._deleteStreamController(streamId);
  375. break;
  376. case StreamKind.ERROR:
  377. (0, _util.assert)(this.streamControllers[streamId], "error should have stream controller");
  378. this.streamControllers[streamId].controller.error(wrapReason(data.reason));
  379. this._deleteStreamController(streamId);
  380. break;
  381. case StreamKind.CANCEL_COMPLETE:
  382. if (data.success) {
  383. this.streamControllers[streamId].cancelCall.resolve();
  384. } else {
  385. this.streamControllers[streamId].cancelCall.reject(wrapReason(data.reason));
  386. }
  387. this._deleteStreamController(streamId);
  388. break;
  389. case StreamKind.CANCEL:
  390. if (!this.streamSinks[streamId]) {
  391. break;
  392. }
  393. const {
  394. onCancel
  395. } = this.streamSinks[data.streamId];
  396. new Promise(function (resolve) {
  397. resolve(onCancel && onCancel(wrapReason(data.reason)));
  398. }).then(function () {
  399. comObj.postMessage({
  400. sourceName,
  401. targetName,
  402. stream: StreamKind.CANCEL_COMPLETE,
  403. streamId,
  404. success: true
  405. });
  406. }, function (reason) {
  407. comObj.postMessage({
  408. sourceName,
  409. targetName,
  410. stream: StreamKind.CANCEL_COMPLETE,
  411. streamId,
  412. reason: wrapReason(reason)
  413. });
  414. });
  415. this.streamSinks[streamId].sinkCapability.reject(wrapReason(data.reason));
  416. this.streamSinks[streamId].isCancelled = true;
  417. delete this.streamSinks[streamId];
  418. break;
  419. default:
  420. throw new Error("Unexpected stream case");
  421. }
  422. }
  423. async _deleteStreamController(streamId) {
  424. await Promise.allSettled([this.streamControllers[streamId].startCall, this.streamControllers[streamId].pullCall, this.streamControllers[streamId].cancelCall].map(function (capability) {
  425. return capability && capability.promise;
  426. }));
  427. delete this.streamControllers[streamId];
  428. }
  429. _postMessage(message, transfers) {
  430. if (transfers && this.postMessageTransfers) {
  431. this.comObj.postMessage(message, transfers);
  432. } else {
  433. this.comObj.postMessage(message);
  434. }
  435. }
  436. destroy() {
  437. this.comObj.removeEventListener("message", this._onComObjOnMessage);
  438. }
  439. }
  440. exports.MessageHandler = MessageHandler;