import type {
Request,
PostponedState,
ErrorInfo,
PostponeInfo,
} from 'react-server/src/ReactFizzServer';
import type {ReactNodeList, ReactFormState} from 'shared/ReactTypes';
import type {Writable} from 'stream';
import type {
BootstrapScriptDescriptor,
HeadersDescriptor,
} from 'react-dom-bindings/src/server/ReactFizzConfigDOM';
import type {Destination} from 'react-server/src/ReactServerStreamConfigNode';
import type {ImportMap} from '../shared/ReactDOMTypes';
import ReactVersion from 'shared/ReactVersion';
import {
createRequest,
resumeRequest,
startWork,
startFlowing,
stopFlowing,
abort,
prepareForStartFlowingIfBeforeAllReady,
} from 'react-server/src/ReactFizzServer';
import {
createResumableState,
createRenderState,
resumeRenderState,
createRootFormatContext,
} from 'react-dom-bindings/src/server/ReactFizzConfigDOM';
import {textEncoder} from 'react-server/src/ReactServerStreamConfigNode';
import {ensureCorrectIsomorphicReactVersion} from '../shared/ensureCorrectIsomorphicReactVersion';
ensureCorrectIsomorphicReactVersion();
function createDrainHandler(destination: Destination, request: Request) {
return () => startFlowing(request, destination);
}
function createCancelHandler(request: Request, reason: string) {
return () => {
stopFlowing(request);
abort(request, new Error(reason));
};
}
type NonceOption =
| string
| {
script?: string,
style?: string,
};
type Options = {
identifierPrefix?: string,
namespaceURI?: string,
nonce?: NonceOption,
bootstrapScriptContent?: string,
bootstrapScripts?: Array<string | BootstrapScriptDescriptor>,
bootstrapModules?: Array<string | BootstrapScriptDescriptor>,
progressiveChunkSize?: number,
onShellReady?: () => void,
onShellError?: (error: mixed) => void,
onAllReady?: () => void,
onError?: (error: mixed, errorInfo: ErrorInfo) => ?string,
onPostpone?: (reason: string, postponeInfo: PostponeInfo) => void,
unstable_externalRuntimeSrc?: string | BootstrapScriptDescriptor,
importMap?: ImportMap,
formState?: ReactFormState<any, any> | null,
onHeaders?: (headers: HeadersDescriptor) => void,
maxHeadersLength?: number,
};
type ResumeOptions = {
nonce?: NonceOption,
onShellReady?: () => void,
onShellError?: (error: mixed) => void,
onAllReady?: () => void,
onError?: (error: mixed, errorInfo: ErrorInfo) => ?string,
onPostpone?: (reason: string, postponeInfo: PostponeInfo) => void,
};
type PipeableStream = {
abort(reason: mixed): void,
pipe<T: Writable>(destination: T): T,
};
function createRequestImpl(children: ReactNodeList, options: void | Options) {
const resumableState = createResumableState(
options ? options.identifierPrefix : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.bootstrapScriptContent : undefined,
options ? options.bootstrapScripts : undefined,
options ? options.bootstrapModules : undefined,
);
return createRequest(
children,
resumableState,
createRenderState(
resumableState,
options ? options.nonce : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.importMap : undefined,
options ? options.onHeaders : undefined,
options ? options.maxHeadersLength : undefined,
),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
options ? options.onAllReady : undefined,
options ? options.onShellReady : undefined,
options ? options.onShellError : undefined,
undefined,
options ? options.onPostpone : undefined,
options ? options.formState : undefined,
);
}
function renderToPipeableStream(
children: ReactNodeList,
options?: Options,
): PipeableStream {
const request = createRequestImpl(children, options);
let hasStartedFlowing = false;
startWork(request);
return {
pipe<T: Writable>(destination: T): T {
if (hasStartedFlowing) {
throw new Error(
'React currently only supports piping to one writable stream.',
);
}
hasStartedFlowing = true;
prepareForStartFlowingIfBeforeAllReady(request);
startFlowing(request, destination);
destination.on('drain', createDrainHandler(destination, request));
destination.on(
'error',
createCancelHandler(
request,
'The destination stream errored while writing data.',
),
);
destination.on(
'close',
createCancelHandler(request, 'The destination stream closed early.'),
);
return destination;
},
abort(reason: mixed) {
abort(request, reason);
},
};
}
function createFakeWritableFromReadableStreamController(
controller: ReadableStreamController,
): Writable {
return ({
write(chunk: string | Uint8Array) {
if (typeof chunk === 'string') {
chunk = textEncoder.encode(chunk);
}
controller.enqueue(chunk);
return true;
},
end() {
controller.close();
},
destroy(error) {
if (typeof controller.error === 'function') {
controller.error(error);
} else {
controller.close();
}
},
}: any);
}
type ReactDOMServerReadableStream = ReadableStream & {
allReady: Promise<void>,
};
type WebStreamsOptions = Omit<
Options,
'onShellReady' | 'onShellError' | 'onAllReady' | 'onHeaders',
> & {signal: AbortSignal, onHeaders?: (headers: Headers) => void};
function renderToReadableStream(
children: ReactNodeList,
options?: WebStreamsOptions,
): Promise<ReactDOMServerReadableStream> {
return new Promise((resolve, reject) => {
let onFatalError;
let onAllReady;
const allReady = new Promise<void>((res, rej) => {
onAllReady = res;
onFatalError = rej;
});
function onShellReady() {
let writable: Writable;
const stream: ReactDOMServerReadableStream = (new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
{highWaterMark: 0},
): any);
stream.allReady = allReady;
resolve(stream);
}
function onShellError(error: mixed) {
allReady.catch(() => {});
reject(error);
}
const onHeaders = options ? options.onHeaders : undefined;
let onHeadersImpl;
if (onHeaders) {
onHeadersImpl = (headersDescriptor: HeadersDescriptor) => {
onHeaders(new Headers(headersDescriptor));
};
}
const resumableState = createResumableState(
options ? options.identifierPrefix : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.bootstrapScriptContent : undefined,
options ? options.bootstrapScripts : undefined,
options ? options.bootstrapModules : undefined,
);
const request = createRequest(
children,
resumableState,
createRenderState(
resumableState,
options ? options.nonce : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.importMap : undefined,
onHeadersImpl,
options ? options.maxHeadersLength : undefined,
),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
onAllReady,
onShellReady,
onShellError,
onFatalError,
options ? options.onPostpone : undefined,
options ? options.formState : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
function resumeRequestImpl(
children: ReactNodeList,
postponedState: PostponedState,
options: void | ResumeOptions,
) {
return resumeRequest(
children,
postponedState,
resumeRenderState(
postponedState.resumableState,
options ? options.nonce : undefined,
),
options ? options.onError : undefined,
options ? options.onAllReady : undefined,
options ? options.onShellReady : undefined,
options ? options.onShellError : undefined,
undefined,
options ? options.onPostpone : undefined,
);
}
function resumeToPipeableStream(
children: ReactNodeList,
postponedState: PostponedState,
options?: ResumeOptions,
): PipeableStream {
const request = resumeRequestImpl(children, postponedState, options);
let hasStartedFlowing = false;
startWork(request);
return {
pipe<T: Writable>(destination: T): T {
if (hasStartedFlowing) {
throw new Error(
'React currently only supports piping to one writable stream.',
);
}
hasStartedFlowing = true;
startFlowing(request, destination);
destination.on('drain', createDrainHandler(destination, request));
destination.on(
'error',
createCancelHandler(
request,
'The destination stream errored while writing data.',
),
);
destination.on(
'close',
createCancelHandler(request, 'The destination stream closed early.'),
);
return destination;
},
abort(reason: mixed) {
abort(request, reason);
},
};
}
type WebStreamsResumeOptions = Omit<
Options,
'onShellReady' | 'onShellError' | 'onAllReady',
> & {signal: AbortSignal};
function resume(
children: ReactNodeList,
postponedState: PostponedState,
options?: WebStreamsResumeOptions,
): Promise<ReactDOMServerReadableStream> {
return new Promise((resolve, reject) => {
let onFatalError;
let onAllReady;
const allReady = new Promise<void>((res, rej) => {
onAllReady = res;
onFatalError = rej;
});
function onShellReady() {
let writable: Writable;
const stream: ReactDOMServerReadableStream = (new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
{highWaterMark: 0},
): any);
stream.allReady = allReady;
resolve(stream);
}
function onShellError(error: mixed) {
allReady.catch(() => {});
reject(error);
}
const request = resumeRequest(
children,
postponedState,
resumeRenderState(
postponedState.resumableState,
options ? options.nonce : undefined,
),
options ? options.onError : undefined,
onAllReady,
onShellReady,
onShellError,
onFatalError,
options ? options.onPostpone : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
export {
renderToPipeableStream,
renderToReadableStream,
resumeToPipeableStream,
resume,
ReactVersion as version,
};