Skip to content

Nexus handler near complete implementation #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
39,869 changes: 12,629 additions & 27,240 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@temporalio/common": "file:packages/common",
"@temporalio/create": "file:packages/create-project",
"@temporalio/interceptors-opentelemetry": "file:packages/interceptors-opentelemetry",
"@temporalio/nexus": "file:packages/nexus",
"@temporalio/nyc-test-coverage": "file:packages/nyc-test-coverage",
"@temporalio/proto": "file:packages/proto",
"@temporalio/test": "file:packages/test",
Expand Down
28 changes: 28 additions & 0 deletions packages/client/src/internal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { temporal } from '@temporalio/proto';

// A key used internally to pass "hidden options to the WorkflowClient.start() call.
export const InternalWorkflowStartOptionsKey = Symbol.for('__temporal_client_internal_workflow_start_options');

// Hidden internal workflow start options, used by the temporal nexus helpers.
export interface InternalWorkflowStartOptions {
requestId?: string;
/**
* Callbacks to be called by the server when this workflow reaches a terminal state.
* If the workflow continues-as-new, these callbacks will be carried over to the new execution.
* Callback addresses must be whitelisted in the server's dynamic configuration.
*/
completionCallbacks?: temporal.api.common.v1.ICallback[];
/** Links to be associated with the workflow. */
links?: temporal.api.common.v1.ILink[];
/**
* Backlink copied by the client from the StartWorkflowExecutionResponse. Only populated in servers newer than 1.27.
*/
backLink?: temporal.api.common.v1.ILink;

/**
* Conflict options for when USE_EXISTING is specified.
*
* Used by the nexus WorkflowRunOperations to attach to a callback to a running workflow.
*/
onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions;
}
14 changes: 12 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import {
} from './base-client';
import { mapAsyncIterable } from './iterators-utils';
import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage';
import { InternalWorkflowStartOptions, InternalWorkflowStartOptionsKey } from './internal';

const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;

Expand Down Expand Up @@ -1252,7 +1253,14 @@ export class WorkflowClient extends BaseClient {
const req = await this.createStartWorkflowRequest(input);
const { options: opts, workflowType } = input;
try {
return (await this.workflowService.startWorkflowExecution(req)).runId;
const response = await this.workflowService.startWorkflowExecution(req);
const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as
| InternalWorkflowStartOptions
| undefined;
if (internalOptions != null) {
internalOptions.backLink = response.link ?? undefined;
}
return response.runId;
} catch (err: any) {
if (err.code === grpcStatus.ALREADY_EXISTS) {
throw new WorkflowExecutionAlreadyStartedError(
Expand All @@ -1268,11 +1276,12 @@ export class WorkflowClient extends BaseClient {
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
const { options: opts, workflowType, headers } = input;
const { identity, namespace } = this.options;
const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as InternalWorkflowStartOptions | undefined;

return {
namespace,
identity,
requestId: uuid4(),
requestId: internalOptions?.requestId ?? uuid4(),
workflowId: opts.workflowId,
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy),
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy),
Expand All @@ -1298,6 +1307,7 @@ export class WorkflowClient extends BaseClient {
header: { fields: headers },
priority: opts.priority ? compilePriority(opts.priority) : undefined,
versioningOverride: opts.versioningOverride ?? undefined,
...internalOptions,
};
}

Expand Down
1 change: 1 addition & 0 deletions packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"@temporalio/proto": "file:../proto",
"long": "^5.2.3",
"ms": "^3.0.0-canary.1",
"nexus-rpc": "file:../../../nexus-sdk-typescript",
"proto3-json-serializer": "^2.0.0"
},
"devDependencies": {
Expand Down
101 changes: 97 additions & 4 deletions packages/common/src/converter/failure-converter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import * as nexus from 'nexus-rpc';
import Long from 'long';
import type { temporal } from '@temporalio/proto';
import {
ActivityFailure,
ApplicationFailure,
Expand All @@ -8,16 +11,42 @@ import {
encodeRetryState,
encodeTimeoutType,
FAILURE_SOURCE,
NexusOperationFailure,
ProtoFailure,
ServerFailure,
TemporalFailure,
TerminatedFailure,
TimeoutFailure,
} from '../failure';
import { makeProtoEnumConverters } from '../internal-workflow';
import { isError } from '../type-helpers';
import { msOptionalToTs } from '../time';
import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter';

// Can't import enums into the workflow sandbox, use this helper type and enum converter instead.
const NexusHandlerErrorRetryBehavior = {
RETRYABLE: 'RETRYABLE',
NON_RETRYABLE: 'NON_RETRYABLE',
} as const;

type NexusHandlerErrorRetryBehavior =
(typeof NexusHandlerErrorRetryBehavior)[keyof typeof NexusHandlerErrorRetryBehavior];

const [encodeNexusHandlerErrorRetryBehavior, decodeNexusHandlerErrorRetryBehavior] = makeProtoEnumConverters<
temporal.api.enums.v1.NexusHandlerErrorRetryBehavior,
typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior,
keyof typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior,
typeof NexusHandlerErrorRetryBehavior,
'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_'
>(
{
UNSPECIFIED: 0,
[NexusHandlerErrorRetryBehavior.RETRYABLE]: 1,
[NexusHandlerErrorRetryBehavior.NON_RETRYABLE]: 2,
} as const,
'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_'
);

function combineRegExp(...regexps: RegExp[]): RegExp {
return new RegExp(regexps.map((x) => `(?:${x.source})`).join('|'));
}
Expand All @@ -28,6 +57,8 @@ function combineRegExp(...regexps: RegExp[]): RegExp {
const CUTOFF_STACK_PATTERNS = combineRegExp(
/** Activity execution */
/\s+at Activity\.execute \(.*[\\/]worker[\\/](?:src|lib)[\\/]activity\.[jt]s:\d+:\d+\)/,
/** Nexus execution */
/\s+at( async)? NexusHandler\.invokeUserCode \(.*[\\/]worker[\\/](?:src|lib)[\\/]nexus\/index\.[jt]s:\d+:\d+\)/,
/** Workflow activation */
/\s+at Activator\.\S+NextHandler \(.*[\\/]workflow[\\/](?:src|lib)[\\/]internals\.[jt]s:\d+:\d+\)/,
/** Workflow run anything in context */
Expand Down Expand Up @@ -120,7 +151,7 @@ export class DefaultFailureConverter implements FailureConverter {
*
* Does not set common properties, that is done in {@link failureToError}.
*/
failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): TemporalFailure {
failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): Error {
if (failure.applicationFailureInfo) {
return new ApplicationFailure(
failure.message ?? undefined,
Expand Down Expand Up @@ -192,6 +223,38 @@ export class DefaultFailureConverter implements FailureConverter {
this.optionalFailureToOptionalError(failure.cause, payloadConverter)
);
}
if (failure.nexusHandlerFailureInfo) {
if (failure.cause == null) {
throw new TypeError('Missing failure cause on nexusHandlerFailureInfo');
}
let retryable: boolean | undefined = undefined;
const retryBehavior = decodeNexusHandlerErrorRetryBehavior(failure.nexusHandlerFailureInfo.retryBehavior);
switch (retryBehavior) {
case 'RETRYABLE':
retryable = true;
break;
case 'NON_RETRYABLE':
retryable = false;
break;
}

return new nexus.HandlerError({
type: (failure.nexusHandlerFailureInfo.type as nexus.HandlerErrorType) ?? 'INTERNAL',
cause: this.failureToError(failure.cause, payloadConverter),
retryable,
});
}
if (failure.nexusOperationExecutionFailureInfo) {
return new NexusOperationFailure(
failure.nexusOperationExecutionFailureInfo.scheduledEventId?.toNumber(),
// We assume these will always be set or gracefully set to empty strings.
failure.nexusOperationExecutionFailureInfo.endpoint ?? '',
failure.nexusOperationExecutionFailureInfo.service ?? '',
failure.nexusOperationExecutionFailureInfo.operation ?? '',
failure.nexusOperationExecutionFailureInfo.operationToken ?? undefined,
this.optionalFailureToOptionalError(failure.cause, payloadConverter)
);
}
return new TemporalFailure(
failure.message ?? undefined,
this.optionalFailureToOptionalError(failure.cause, payloadConverter)
Expand All @@ -216,7 +279,9 @@ export class DefaultFailureConverter implements FailureConverter {
}
const err = this.failureToErrorInner(failure, payloadConverter);
err.stack = failure.stackTrace ?? '';
err.failure = failure;
if (err instanceof TemporalFailure) {
err.failure = failure;
}
return err;
}

Expand All @@ -232,8 +297,8 @@ export class DefaultFailureConverter implements FailureConverter {
}

errorToFailureInner(err: unknown, payloadConverter: PayloadConverter): ProtoFailure {
if (err instanceof TemporalFailure) {
if (err.failure) return err.failure;
if (err instanceof TemporalFailure || err instanceof nexus.HandlerError) {
if (err instanceof TemporalFailure && err.failure) return err.failure;
const base = {
message: err.message,
stackTrace: cutoffStackTrace(err.stack),
Expand Down Expand Up @@ -310,6 +375,34 @@ export class DefaultFailureConverter implements FailureConverter {
terminatedFailureInfo: {},
};
}
if (err instanceof nexus.HandlerError) {
let retryBehavior: temporal.api.enums.v1.NexusHandlerErrorRetryBehavior | undefined = undefined;
if (err.retryable === true) {
retryBehavior = encodeNexusHandlerErrorRetryBehavior('RETRYABLE');
} else if (err.retryable === false) {
retryBehavior = encodeNexusHandlerErrorRetryBehavior('NON_RETRYABLE');
}

return {
...base,
nexusHandlerFailureInfo: {
type: err.type,
retryBehavior,
},
};
}
if (err instanceof NexusOperationFailure) {
return {
...base,
nexusOperationExecutionFailureInfo: {
scheduledEventId: err.scheduledEventId ? Long.fromNumber(err.scheduledEventId) : undefined,
endpoint: err.endpoint,
service: err.service,
operation: err.operation,
operationToken: err.operationToken,
},
};
}
// Just a TemporalFailure
return base;
}
Expand Down
14 changes: 14 additions & 0 deletions packages/common/src/failure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,20 @@ export class ChildWorkflowFailure extends TemporalFailure {
}
}

@SymbolBasedInstanceOfError('NexusOperationFailure')
export class NexusOperationFailure extends TemporalFailure {
public constructor(
public readonly scheduledEventId: number | undefined,
public readonly endpoint: string,
public readonly service: string,
public readonly operation: string,
public readonly operationToken: string | undefined,
cause?: Error
) {
super('Nexus Operation completed unsuccessfully', cause);
}
}

/**
* This exception is thrown in the following cases:
* - Workflow with the same Workflow ID is currently running and the {@link WorkflowOptions.workflowIdConflictPolicy} is `WORKFLOW_ID_CONFLICT_POLICY_FAIL`
Expand Down
11 changes: 11 additions & 0 deletions packages/common/src/internal-non-workflow/codec-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ export async function decodeArrayFromPayloads(
return arrayFromPayloads(payloadConverter, await decodeOptional(payloadCodecs, payloads));
}

/**
* Decode `payloads` and then return {@link arrayFromPayloads}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring is wrong

*/
export async function decodeFromPayload(converter: LoadedDataConverter, payloads?: Payload | null): Promise<unknown> {
if (payloads == null) {
return undefined;
}
const { payloadConverter, payloadCodecs } = converter;
return payloadConverter.fromPayload(await decodeSingle(payloadCodecs, payloads));
}

/**
* Decode `payloads` and then return {@link fromPayloadsAtIndex}.
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/common/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export enum SdkComponent {
*/
activity = 'activity',

/**
* Component name for messages emited from a nexus operation handler, using the nexus context logger.
* The SDK itself never publishes messages with this component name.
*/
nexus = 'nexus',

/**
* Component name for messages emited from a Temporal Worker instance.
*
Expand Down
33 changes: 18 additions & 15 deletions packages/common/src/proto-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,32 @@ export function historyFromJSON(history: unknown): History {
* string that adheres to the same norm as JSON history files produced by other Temporal tools.
*/
export function historyToJSON(history: History): string {
// toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the
// output, after which JSON.stringify() would convert them to an array of numbers. As a workaround,
// recursively walk the object and convert all Buffer instances to base64 strings. Note this only
// works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance
// to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103.
function fixBuffers<T>(e: T): T {
if (e && typeof e === 'object') {
if (e instanceof Buffer) return e.toString('base64') as any;
if (Array.isArray(e)) return e.map(fixBuffers) as T;
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T;
}
return e;
}

const protoJson = toProto3JSON(proto.temporal.api.history.v1.History.fromObject(history) as any);
return JSON.stringify(fixBuffers(protoJson), null, 2);
}

/**
* toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the
* output, after which JSON.stringify() would convert them to an array of numbers. As a workaround,
* recursively walk the object and convert all Buffer instances to base64 strings. Note this only
* works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance
* to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103.
*/
export function fixBuffers<T>(e: T): T {
if (e && typeof e === 'object') {
if (e instanceof Buffer) return e.toString('base64') as any;
if (e instanceof Uint8Array) return Buffer.from(e).toString('base64') as any;
if (Array.isArray(e)) return e.map(fixBuffers) as T;
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T;
}
return e;
}

/**
* Convert from protobuf payload to JSON
*/
export function payloadToJSON(payload: Payload): JSONPayload {
return toProto3JSON(patched.temporal.api.common.v1.Payload.create(payload)) as any;
return fixBuffers(toProto3JSON(patched.temporal.api.common.v1.Payload.create(payload))) as any;
}

/**
Expand Down
Loading
Loading