|
| 1 | +use std::{io, thread}; |
| 2 | + |
| 3 | +use futures_lite::future; |
| 4 | +use lambda_runtime::{service_fn, Error, LambdaEvent}; |
| 5 | +use serde::{Deserialize, Serialize}; |
| 6 | +use tokio::runtime::Builder; |
| 7 | + |
| 8 | +/// This is also a made-up example. Requests come into the runtime as unicode |
| 9 | +/// strings in json format, which can map to any structure that implements `serde::Deserialize` |
| 10 | +/// The runtime pays no attention to the contents of the request payload. |
| 11 | +#[derive(Deserialize)] |
| 12 | +struct Request { |
| 13 | + command: String, |
| 14 | +} |
| 15 | + |
| 16 | +/// This is a made-up example of what a response structure may look like. |
| 17 | +/// There is no restriction on what it can be. The runtime requires responses |
| 18 | +/// to be serialized into json. The runtime pays no attention |
| 19 | +/// to the contents of the response payload. |
| 20 | +#[derive(Serialize)] |
| 21 | +struct Response { |
| 22 | + req_id: String, |
| 23 | + msg: String, |
| 24 | +} |
| 25 | + |
| 26 | +fn main() -> Result<(), io::Error> { |
| 27 | + // required to enable CloudWatch error logging by the runtime |
| 28 | + tracing_subscriber::fmt() |
| 29 | + .with_max_level(tracing::Level::INFO) |
| 30 | + // disable printing the name of the module in every log line. |
| 31 | + .with_target(false) |
| 32 | + // this needs to be set to false, otherwise ANSI color codes will |
| 33 | + // show up in a confusing manner in CloudWatch logs. |
| 34 | + .with_ansi(false) |
| 35 | + // disabling time is handy because CloudWatch will add the ingestion time. |
| 36 | + .without_time() |
| 37 | + .init(); |
| 38 | + |
| 39 | + // Create a channel used to send and receive outputs from our lambda handler. Realistically, this would be either an unbounded channel |
| 40 | + // or a bounded channel with a higher capacity as needed. |
| 41 | + let (lambda_tx, lambda_rx) = async_channel::bounded(1); |
| 42 | + |
| 43 | + // Create a bounded channel used to communicate our shutdown signal across threads. |
| 44 | + let (shutdown_tx, shutdown_rx) = async_channel::bounded(1); |
| 45 | + |
| 46 | + // Build a single-threaded (or multi-threaded using Builder::new_multi_thread) runtime to spawn our lambda work onto. |
| 47 | + let tokio_runtime = Builder::new_current_thread() |
| 48 | + .thread_name("lambda-runtime") |
| 49 | + .enable_all() |
| 50 | + .build() |
| 51 | + .expect("build lambda runtime"); |
| 52 | + |
| 53 | + // Run the lambda runtime worker thread to completion. The response is sent to the other "runtime" to be processed as needed. |
| 54 | + thread::spawn(move || { |
| 55 | + let func = service_fn(my_handler); |
| 56 | + if let Ok(response) = tokio_runtime.block_on(lambda_runtime::run(func)) { |
| 57 | + lambda_tx.send_blocking(response).expect("send lambda result"); |
| 58 | + }; |
| 59 | + }); |
| 60 | + |
| 61 | + // Run the mock runtime to completion. |
| 62 | + my_runtime(move || future::block_on(app_runtime_task(lambda_rx.clone(), shutdown_tx.clone()))); |
| 63 | + |
| 64 | + // Block the main thread until a shutdown signal is received. |
| 65 | + future::block_on(shutdown_rx.recv()).map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err))) |
| 66 | +} |
| 67 | + |
| 68 | +pub(crate) async fn my_handler(event: LambdaEvent<Request>) -> Result<Response, Error> { |
| 69 | + // extract some useful info from the request |
| 70 | + let command = event.payload.command; |
| 71 | + |
| 72 | + // prepare the response |
| 73 | + let resp = Response { |
| 74 | + req_id: event.context.request_id, |
| 75 | + msg: format!("Command {} executed.", command), |
| 76 | + }; |
| 77 | + |
| 78 | + // return `Response` (it will be serialized to JSON automatically by the runtime) |
| 79 | + Ok(resp) |
| 80 | +} |
| 81 | + |
| 82 | +/// A task to be ran on the custom runtime. Once a response from the lambda runtime is received then a shutdown signal |
| 83 | +/// is sent to the main thread notifying the process to exit. |
| 84 | +pub(crate) async fn app_runtime_task(lambda_rx: async_channel::Receiver<()>, shutdown_tx: async_channel::Sender<()>) { |
| 85 | + loop { |
| 86 | + // Receive the response sent by the lambda handle and process as needed. |
| 87 | + if let Ok(result) = lambda_rx.recv().await { |
| 88 | + tracing::debug!(?result); |
| 89 | + // We're ready to shutdown our app. Send the shutdown signal notifying the main thread to exit the process. |
| 90 | + shutdown_tx.send(()).await.expect("send shutdown signal"); |
| 91 | + break; |
| 92 | + } |
| 93 | + |
| 94 | + // more app logic would be here... |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +/// Construct the mock runtime worker thread(s) to spawn some work onto. |
| 99 | +fn my_runtime(func: impl Fn() + Send + 'static) { |
| 100 | + thread::Builder::new() |
| 101 | + .name("my-runtime".into()) |
| 102 | + .spawn(func) |
| 103 | + .expect("spawn my_runtime worker"); |
| 104 | +} |
0 commit comments