Skip to content

Commit b989224

Browse files
CuriousGeorgiyalyapunov
authored andcommitted
box: sending arbitrary IPROTO packets
Add translation table for `box.iproto.key` constants encoding to simplify packet assembly. Add new `box.iproto.send` method to Lua and `box_iproto_send` function to C API, which allow sending arbitrary IPROTO packets, using active IPROTO sessions. Packets are sent asynchronously using Kharon. Add `xregion_join` to the `xalloc` API. Change gh-7894 test: instead of simply comparing `box.iproto` table to the reference table, iterate over `box.iproto` and check that corresponding non-{function, thread, userdata} type values exist in the reference table. Closes #7897 @TarantoolBot document Title: Document sending arbitrary IPROTO packets feature For the API description and usage examples, see: * [design document](https://www.notion.so/tarantool/box-iproto-override-44935a6ac7e04fb5a2c81ca713ed1bce#a2cc04da89d34fad8f8564c150cd9977); * #7897.
1 parent 92fb07b commit b989224

18 files changed

+585
-4
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## feature/box
2+
3+
* Added API for sending arbitrary IPROTO packets from server to client:
4+
`box.iproto.send` in Lua and `box_iproto_send` in C (gh-7897).

extra/exports

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ box_index_min
7070
box_index_random
7171
box_index_tuple_position
7272
box_insert
73+
box_iproto_send
7374
box_iterator_free
7475
box_iterator_next
7576
box_key_def_delete

src/box/box.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3250,6 +3250,24 @@ box_session_id(void)
32503250
return current_session()->id;
32513251
}
32523252

3253+
API_EXPORT int
3254+
box_iproto_send(uint64_t sid,
3255+
const char *header, const char *header_end,
3256+
const char *body, const char *body_end)
3257+
{
3258+
struct session *session = session_find(sid);
3259+
if (session == NULL) {
3260+
diag_set(ClientError, ER_NO_SUCH_SESSION, sid);
3261+
return -1;
3262+
}
3263+
if (session->type != SESSION_TYPE_BINARY) {
3264+
diag_set(ClientError, ER_WRONG_SESSION_TYPE,
3265+
session_type_strs[session->type]);
3266+
return -1;
3267+
}
3268+
return iproto_session_send(session, header, header_end, body, body_end);
3269+
}
3270+
32533271
static inline void
32543272
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
32553273
{

src/box/box.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,25 @@ box_session_push(const char *data, const char *data_end);
601601
API_EXPORT uint64_t
602602
box_session_id(void);
603603

604+
/**
605+
* Sends a packet with the given header and body over the IPROTO session's
606+
* socket.
607+
*
608+
* NB: yields.
609+
*
610+
* \param sid IPROTO session identifier
611+
* \param header MsgPack encoded header
612+
* \param header_end MsgPack encoded header end
613+
* \param body MsgPack encoded body
614+
* \param body_end MsgPack encoded body end
615+
* \retval -1 on error (check box_error_last())
616+
* \retval 0 on success
617+
*/
618+
API_EXPORT int
619+
box_iproto_send(uint64_t sid,
620+
const char *header, const char *header_end,
621+
const char *body, const char *body_end);
622+
604623
/** \endcond public */
605624

606625
/**

src/box/errcode.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ struct errcode_record {
306306
/*251 */_(ER_INVALID_AUTH_REQUEST, "Invalid '%s' request: %s") \
307307
/*252 */_(ER_WEAK_PASSWORD, "Password doesn't meet security requirements: %s") \
308308
/*253 */_(ER_OLD_PASSWORD, "Password must differ from last %d passwords") \
309+
/*254 */_(ER_NO_SUCH_SESSION, "Session %llu does not exist") \
310+
/*255 */_(ER_WRONG_SESSION_TYPE, "Session '%s' is not supported") \
309311

310312
/*
311313
* !IMPORTANT! Please follow instructions at start of the file

src/box/iproto.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3299,6 +3299,42 @@ iproto_set_msg_max(int new_iproto_msg_max)
32993299
}
33003300
}
33013301

3302+
int
3303+
iproto_session_send(struct session *session,
3304+
const char *header, const char *header_end,
3305+
const char *body, const char *body_end)
3306+
{
3307+
assert(session->type == SESSION_TYPE_BINARY);
3308+
struct iproto_connection *con =
3309+
(struct iproto_connection *)session->meta.connection;
3310+
if (con->state != IPROTO_CONNECTION_ALIVE) {
3311+
diag_set(ClientError, ER_SESSION_CLOSED);
3312+
return -1;
3313+
}
3314+
3315+
ptrdiff_t header_size = header_end - header;
3316+
ptrdiff_t body_size = body_end - body;
3317+
ptrdiff_t packet_size = 5 + header_size + body_size;
3318+
char *buf = (char *)obuf_alloc(con->tx.p_obuf, packet_size);
3319+
if (buf == NULL) {
3320+
diag_set(OutOfMemory, packet_size, "obuf_alloc", "buf");
3321+
return -1;
3322+
}
3323+
char *p = buf;
3324+
*(p++) = INT8_C(0xce);
3325+
p = mp_store_u32(p, packet_size - 5);
3326+
memcpy(p, header, header_size);
3327+
p += header_size;
3328+
memcpy(p, body, body_size);
3329+
tx_push(con);
3330+
/*
3331+
* The control yield is solely for enforcing the fact this function
3332+
* yields — in the future we may implement back pressure based on this.
3333+
*/
3334+
fiber_sleep(0);
3335+
return 0;
3336+
}
3337+
33023338
void
33033339
iproto_free(void)
33043340
{

src/box/iproto.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <stddef.h>
3535

3636
struct uri_set;
37+
struct session;
3738

3839
#if defined(__cplusplus)
3940
extern "C" {
@@ -131,6 +132,18 @@ iproto_listen(const struct uri_set *uri_set);
131132
void
132133
iproto_set_msg_max(int iproto_msg_max);
133134

135+
/**
136+
* Sends a packet with the given header and body over the IPROTO session's
137+
* socket.
138+
* On success, a packet is written to the session's output buffer, which is
139+
* flushed asynchronously using Kharon.
140+
* Returns 0 on success, a non-zero value otherwise (diagnostic is set).
141+
*/
142+
int
143+
iproto_session_send(struct session *session,
144+
const char *header, const char *header_end,
145+
const char *body, const char *body_end);
146+
134147
void
135148
iproto_free(void);
136149

src/box/lua/init.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,3 +585,9 @@ box_lua_init(struct lua_State *L)
585585

586586
assert(lua_gettop(L) == 0);
587587
}
588+
589+
void
590+
box_lua_free(void)
591+
{
592+
box_lua_iproto_free();
593+
}

src/box/lua/init.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ struct lua_State;
3939
void
4040
box_lua_init(struct lua_State *L);
4141

42+
/**
43+
* Frees `box` module.
44+
*/
45+
void
46+
box_lua_free(void);
47+
4248
#if defined(__cplusplus)
4349
} /* extern "C" */
4450
#endif /* defined(__cplusplus) */

src/box/lua/iproto.c

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,29 @@
66

77
#include "box/lua/iproto.h"
88

9+
#include "box/box.h"
910
#include "box/iproto_constants.h"
1011
#include "box/iproto_features.h"
1112

12-
#include <lua.h>
13+
#include "core/assoc.h"
14+
#include "core/fiber.h"
15+
#include "core/tt_static.h"
16+
17+
#include "lua/msgpack.h"
18+
#include "lua/utils.h"
19+
20+
#include "mpstream/mpstream.h"
21+
22+
#include "small/region.h"
23+
24+
#include <ctype.h>
25+
#include <lauxlib.h>
26+
27+
/**
28+
* Translation table for `box.iproto.key` constants encoding: used in
29+
* `luamp_encode_with_translation`.
30+
*/
31+
static struct mh_strnu32_t *iproto_key_translation;
1332

1433
/**
1534
* IPROTO constant from `src/box/iproto_{constants, features}.h`.
@@ -119,6 +138,22 @@ push_iproto_key_enum(struct lua_State *L)
119138
{"VCLOCK_SYNC", IPROTO_VCLOCK_SYNC},
120139
};
121140
push_iproto_constant_subnamespace(L, "key", keys, lengthof(keys));
141+
for (size_t i = 0; i < lengthof(keys); ++i) {
142+
size_t len = strlen(keys[i].name);
143+
char *lowercase = strtolowerdup(keys[i].name);
144+
struct mh_strnu32_node_t translation = {
145+
.str = lowercase,
146+
.len = len,
147+
.hash = lua_hash(lowercase, len),
148+
.val = keys[i].val,
149+
};
150+
mh_strnu32_put(iproto_key_translation, &translation,
151+
NULL, NULL);
152+
translation.str = xstrdup(keys[i].name);
153+
translation.hash = lua_hash(translation.str, len);
154+
mh_strnu32_put(iproto_key_translation, &translation,
155+
NULL, NULL);
156+
}
122157
}
123158

124159
/**
@@ -268,16 +303,112 @@ push_iproto_protocol_features(struct lua_State *L)
268303
lengthof(features));
269304
}
270305

306+
/**
307+
* Encodes a packet header/body argument to MsgPack: if the argument is a
308+
* string, then no encoding is needed — otherwise the argument must be a Lua
309+
* table. The Lua table is encoded to MsgPack using IPROTO key translation
310+
* table.
311+
* In both cases, the result is stored on the fiber region.
312+
*/
313+
static const char *
314+
encode_packet(struct lua_State *L, int idx, size_t *mp_len)
315+
{
316+
int packet_part_type = lua_type(L, idx);
317+
struct region *gc = &fiber()->gc;
318+
if (packet_part_type == LUA_TSTRING) {
319+
const char *arg = lua_tolstring(L, idx, mp_len);
320+
char *mp = xregion_alloc(gc, *mp_len);
321+
return memcpy(mp, arg, *mp_len);
322+
}
323+
assert(packet_part_type == LUA_TTABLE);
324+
/*
325+
* FIXME(gh-7939): `luamp_error` and `luamp_encode_with_translation` can
326+
* throw a Lua exception, which we cannot catch, and cause the fiber
327+
* region to leak.
328+
*/
329+
struct mpstream stream;
330+
mpstream_init(&stream, gc, region_reserve_cb, region_alloc_cb,
331+
luamp_error, L);
332+
size_t used = region_used(gc);
333+
luamp_encode_with_translation(L, luaL_msgpack_default, &stream, idx,
334+
iproto_key_translation);
335+
mpstream_flush(&stream);
336+
*mp_len = region_used(gc) - used;
337+
return xregion_join(gc, *mp_len);
338+
}
339+
340+
/**
341+
* Sends an IPROTO packet consisting of a header (second argument) and an
342+
* optional body (third argument) over the IPROTO session identified by first
343+
* argument.
344+
*/
345+
static int
346+
lbox_iproto_send(struct lua_State *L)
347+
{
348+
int n_args = lua_gettop(L);
349+
if (n_args < 2 || n_args > 3)
350+
return luaL_error(L, "Usage: "
351+
"box.iproto.send(sid, header[, body])");
352+
uint64_t sid = luaL_checkuint64(L, 1);
353+
int header_type = lua_type(L, 2);
354+
if (header_type != LUA_TSTRING && header_type != LUA_TTABLE)
355+
return luaL_error(L, "expected table or string as 2 argument");
356+
if (n_args == 3) {
357+
int body_type = lua_type(L, 3);
358+
if (body_type != LUA_TSTRING && body_type != LUA_TTABLE)
359+
return luaL_error(
360+
L, "expected table or string as 3 argument");
361+
}
362+
363+
struct region *gc = &fiber()->gc;
364+
size_t used = region_used(gc);
365+
size_t header_len;
366+
const char *header = encode_packet(L, 2, &header_len);
367+
size_t body_len = 0;
368+
const char *body = NULL;
369+
if (n_args == 3)
370+
body = encode_packet(L, 3, &body_len);
371+
int rc = box_iproto_send(sid,
372+
header, header + header_len,
373+
body, body + body_len);
374+
region_truncate(gc, used);
375+
return (rc == 0) ? 0 : luaT_error(L);
376+
}
377+
271378
/**
272379
* Initializes module for working with Tarantool's network subsystem.
273380
*/
274381
void
275382
box_lua_iproto_init(struct lua_State *L)
276383
{
384+
iproto_key_translation = mh_strnu32_new();
385+
277386
lua_getfield(L, LUA_GLOBALSINDEX, "box");
278387
lua_newtable(L);
388+
279389
push_iproto_constants(L);
280390
push_iproto_protocol_features(L);
391+
392+
const struct luaL_Reg iproto_methods[] = {
393+
{"send", lbox_iproto_send},
394+
{NULL, NULL}
395+
};
396+
luaL_register(L, NULL, iproto_methods);
397+
281398
lua_setfield(L, -2, "iproto");
282399
lua_pop(L, 1);
283400
}
401+
402+
/**
403+
* Deletes the IPROTO key translation and all its dynamically allocated key
404+
* strings.
405+
*/
406+
void
407+
box_lua_iproto_free(void)
408+
{
409+
struct mh_strnu32_t *h = iproto_key_translation;
410+
mh_int_t k;
411+
mh_foreach(h, k)
412+
free((void *)mh_strnu32_node(h, k)->str);
413+
mh_strnu32_delete(iproto_key_translation);
414+
}

src/box/lua/iproto.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,19 @@ extern "C" {
1212

1313
struct lua_State;
1414

15+
/**
16+
* Initializes `box.iproto` submodule for working with Tarantool network
17+
* subsystem.
18+
*/
1519
void
1620
box_lua_iproto_init(struct lua_State *L);
1721

22+
/**
23+
* Frees `box.iproto` submodule.
24+
*/
25+
void
26+
box_lua_iproto_free(void);
27+
1828
#if defined(__cplusplus)
1929
} /* extern "C" */
2030
#endif /* defined(__cplusplus) */

src/main.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,7 @@ tarantool_free(void)
557557
/* Shutdown worker pool. Waits until threads terminate. */
558558
coio_shutdown();
559559

560+
box_lua_free();
560561
box_free();
561562

562563
title_free(main_argc, main_argv);

src/trivia/util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ strnindex(const char *const *haystack, const char *needle, uint32_t len,
133133
#define xregion_alloc_array(p, T, count, size) \
134134
xalloc_impl(sizeof(T) * (count), region_alloc_array, (p), T, \
135135
(count), (size))
136+
#define xregion_join(p, size) xalloc_impl((size), region_join, (p), (size))
136137
#define xibuf_alloc(p, size) xalloc_impl((size), ibuf_alloc, (p), (size))
137138
#define xibuf_reserve(p, size) xalloc_impl((size), ibuf_reserve, (p), (size))
138139

0 commit comments

Comments
 (0)