Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | /** * Phase 7 — Integration Bus (Event Pub/Sub) * ----------------------------------------- * ناقل أحداث بسيط بين الوحدات/العقد: * - on(topic, handler) * - once(topic, handler) * - off(topic, handler) * - emit(topic, data) * - request(topic, req) <-> reply(topic: `${topic}:reply`) * * يدعم wildcards بالشكل البسيط: "cfp:*" يطابق "cfp:..." . * يدعم تسجيل middleware (قبل التسليم) + لوج اختياري JSONL. */ import fs from 'node:fs'; import path from 'node:path'; type Handler<T = unknown> = (data: T, meta: { topic: string; ts: string }) => void | Promise<void>; interface Subscription { topic: string; handler: Handler<any>; once?: boolean; } type Middleware = (evt: { topic: string; data: any; ts: string; }) => { topic: string; data: any; ts: string } | void; export class IntegrationBus { private subs: Subscription[] = []; private mws: Middleware[] = []; private logging = false; private logPath = './storage/integration_bus.log.jsonl'; enableLogging(pathOverride?: string) { this.logging = true; if (pathOverride) this.logPath = pathOverride; fs.mkdirSync(path.dirname(this.logPath), { recursive: true }); } disableLogging() { this.logging = false; } use(mw: Middleware) { this.mws.push(mw); } on<T = unknown>(topic: string, handler: Handler<T>) { this.subs.push({ topic, handler }); return () => this.off(topic, handler); } once<T = unknown>(topic: string, handler: Handler<T>) { this.subs.push({ topic, handler, once: true }); return () => this.off(topic, handler); } off<T = unknown>(topic: string, handler: Handler<T>) { this.subs = this.subs.filter((s) => !(s.topic === topic && s.handler === handler)); } /** * emit: يرسل حدثاً لكل المشتركين المتوافقين مع الموضوع (مع دعم wildcard). */ async emit<T = unknown>(topic: string, data: T) { const ts = new Date().toISOString(); let evt = { topic, data, ts }; // middlewares for (const mw of this.mws) { const out = mw(evt); if (out) evt = out; } // logging if (this.logging) { fs.appendFileSync(this.logPath, JSON.stringify(evt) + '\n'); } const matches = this.subs.filter((s) => this.topicMatch(s.topic, evt.topic)); for (const s of matches) { await s.handler(evt.data, { topic: evt.topic, ts: evt.ts }); } // clean once this.subs = this.subs.filter((s) => !(s.once && matches.includes(s))); } /** * request-response نمط بسيط: يرجع Promise ينتظر أول رد على `${topic}:reply` */ request<TReq = unknown, TRes = unknown>( topic: string, req: TReq, timeoutMs = 3000 ): Promise<TRes> { return new Promise(async (resolve, reject) => { const replyTopic = `${topic}:reply`; const off = this.once<TRes>(replyTopic, (res) => { off(); resolve(res); }); // أرسل الطلب await this.emit(topic, req); const t = setTimeout(() => { off(); reject(new Error(`IntegrationBus.request timeout on ${topic}`)); }, timeoutMs); // safety: clear if resolved earlier (resolve as any).finally?.(() => clearTimeout(t)); }); } /** * مساعدة: تطابق Wildcard بسيط (suffix = "*") */ private topicMatch(subTopic: string, emittedTopic: string) { if (subTopic === emittedTopic) return true; if (subTopic.endsWith('*')) { const prefix = subTopic.slice(0, -1); return emittedTopic.startsWith(prefix); } return false; } } // Singleton افتراضي للاستخدام السريع export const Bus = new IntegrationBus(); |