Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 3x 3x 3x 3x 2x 1x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 1x 3x 3x 3x 3x | import { Effect, Stream, Schema } from 'effect';
import { createOrpcClient } from '@/libs/orpcClient';
// Services
import { auth } from '@/services/firebase';
// Types
import { EventType, SubscribeEvent, WSReactionData } from '@/types/meal';
// Constants
import { ERROR_MESSAGES } from '@/constants/messages';
export const SubscribePayload = Schema.Struct({
id: Schema.String,
likes: Schema.Number,
dislikes: Schema.Number,
});
type SubscribePayload = typeof SubscribePayload.Type;
export const subscribeService = (origin?: string) =>
Effect.scoped(
Effect.gen(function* (_) {
const user = auth.currentUser;
const token = user
? yield* _(Effect.promise(() => user.getIdToken(true)))
: undefined;
const client = createOrpcClient(token, origin);
// Wrap AsyncGenerator in Promise.resolve so Effect.promise accepts it
const iterator: AsyncGenerator<SubscribeEvent> = yield* _(
Effect.promise(() =>
Promise.resolve(client.socket.subscribeLunchMenu(undefined, {})),
),
);
// Convert AsyncGenerator → Stream and map events
return Stream.fromAsyncIterable<SubscribeEvent, Error>(iterator, (e) =>
e instanceof Error ? e : new Error(ERROR_MESSAGES.DEFAULT),
).pipe(
Stream.flatMap((event) =>
event.type === EventType.LUNCH_MENU
? Effect.flatMap(
// Validate and decode payload using Schema
Schema.decodeUnknown(SubscribePayload)(event.data),
(parsed) =>
Effect.succeed<WSReactionData>({
[parsed.id]: {
likes: parsed.likes,
dislikes: parsed.dislikes,
},
}),
)
: Effect.succeed<WSReactionData>({}),
),
);
}),
);
|