Skip to content

Transfer to notebook #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 76 additions & 29 deletions examples/river_kmeans.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"metadata": {},
"outputs": [],
"source": [
"import functools\n",
"import random\n",
"import time\n",
"\n",
"import pandas as pd\n",
"\n",
Expand All @@ -17,6 +19,7 @@
"from river import cluster\n",
"import holoviews as hv\n",
"from panel.pane.holoviews import HoloViews\n",
"import panel as pn\n",
"hv.extension('bokeh')"
]
},
Expand All @@ -29,6 +32,7 @@
"source": [
"model = cluster.KMeans(n_clusters=3, sigma=0.1, mu=0.5)\n",
"centres = [[random.random(), random.random()] for _ in range(3)]\n",
"count = [0]\n",
"\n",
"def gen(move_chance=0.05):\n",
" centre = int(random.random() * 3) # 3x faster than random.randint(0, 2)\n",
Expand All @@ -37,6 +41,7 @@
" centres[centre][1] += random.random() / 5 - 0.1\n",
" value = {'x': random.random() / 20 + centres[centre][0],\n",
" 'y': random.random() / 20 + centres[centre][1]}\n",
" count[0] += 1\n",
" return value\n",
"\n",
"\n",
Expand All @@ -53,18 +58,55 @@
"metadata": {},
"outputs": [],
"source": [
"s = Stream.from_periodic(gen, 0.03)\n",
"cadance = 0.01\n",
"\n",
"ex = pd.DataFrame({'x': [0.5], 'y': [0.5]})\n",
"pipe_in = hv.streams.Pipe(data=ex)\n",
"pipe_out = hv.streams.Pipe(data=ex)\n",
"\n",
"# setup pipes\n",
"s = Stream.from_periodic(gen, cadance)\n",
"\n",
"# Branch 0: Input/Observations\n",
"obs = s.map(lambda x: pd.DataFrame([x]))\n",
"\n",
"# Branch 1: Output/River ML clusters\n",
"km = RiverTrain(model, pass_model=True)\n",
"s.map(lambda x: (x,)).connect(km) # learn takes a tuple of (x,[ y[, w]])\n",
"ex = pd.DataFrame({'x': [0.5], 'y': [0.5]})\n",
"ooo = s.map(lambda x: pd.DataFrame([x])).to_dataframe(example=ex)\n",
"out = km.map(get_clusters)\n",
"clusters = km.map(get_clusters)\n",
"\n",
"concat = functools.partial(pd.concat, ignore_index=True)\n",
"\n",
"def accumulate(previous, new, last_lines=50):\n",
" return concat([previous, new]).iloc[-last_lines:, :]\n",
"\n",
"partition_obs = 10\n",
"particion_clusters = 10\n",
"backlog_obs = 100\n",
"\n",
"# .partition is used to gather x number of points\n",
"# before sending them to the plots\n",
"# .accumulate allows to generate a backlog\n",
"\n",
"(\n",
" obs\n",
" .partition(partition_obs)\n",
" .map(concat)\n",
" .accumulate(functools.partial(accumulate, last_lines=backlog_obs))\n",
" .sink(pipe_in.send)\n",
")\n",
"(\n",
" clusters\n",
" .partition(particion_clusters)\n",
" .map(pd.concat)\n",
" .sink(pipe_out.send)\n",
")\n",
"\n",
"# start things\n",
"s.emit(gen()) # set initial model\n",
"for i, (x, y) in enumerate(centres):\n",
" model.centers[i]['x'] = x\n",
" model.centers[i]['y'] = y\n"
" model.centers[i]['y'] = y"
]
},
{
Expand All @@ -74,31 +116,36 @@
"metadata": {},
"outputs": [],
"source": [
"pout = out.to_dataframe(example=ex)\n",
"pl = (ooo.hvplot.scatter('x', 'y', color=\"blue\", backlog=50) *\n",
" pout.hvplot.scatter('x', 'y', color=\"red\", backlog=3))\n",
"button_start = pn.widgets.Button(name='Start')\n",
"button_stop = pn.widgets.Button(name='Stop')\n",
"\n",
"t0 = 0\n",
"\n",
"def start(event):\n",
" s.start()\n",
" global t0\n",
" t0 = time.time()\n",
"\n",
"def stop(event):\n",
" print(count, \"events\")\n",
" global t0\n",
" t_spent = time.time() - t0\n",
" print(\"frequency\", count[0] / t_spent, \"Hz\")\n",
" print(\"Current centres\", centres)\n",
" print(\"Output centres\", [list(c.values()) for c in model.centers.values()])\n",
" s.stop()\n",
"\n",
"button_start.on_click(start)\n",
"button_stop.on_click(stop)\n",
"\n",
"scatter_dmap_input = hv.DynamicMap(hv.Scatter, streams=[pipe_in]).opts(color=\"blue\")\n",
"scatter_dmap_output = hv.DynamicMap(hv.Scatter, streams=[pipe_out]).opts(color=\"red\")\n",
"pl = scatter_dmap_input * scatter_dmap_output\n",
"pl.opts(xlim=(-0.2, 1.2), ylim=(-0.2, 1.2), height=600, width=600)\n",
"pl"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c24d2363",
"metadata": {},
"outputs": [],
"source": [
"s.start()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18cfd94e",
"metadata": {},
"outputs": [],
"source": [
"s.stop()"
"\n",
"pan = HoloViews(pl)\n",
"app = pn.Row(pn.Column(button_start, button_stop), pan)\n",
"app"
]
},
{
Expand Down
8 changes: 4 additions & 4 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ def test_from_file():

source.start()

yield await_for(lambda: len(L) == 3, timeout=5)
yield await_for(lambda: len(L) == 3, timeout=15)

assert L == [1, 2, 3]

Expand All @@ -1309,11 +1309,11 @@ def test_from_file_end():
out = source.sink_to_list()
source.start()
assert out == []
yield await_for(lambda: source.started, 2, period=0.02)
yield await_for(lambda: source.started, 12, period=0.02)

f.write('data2\n')
f.flush()
yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1)
yield await_for(lambda: out == ['data2\n'], timeout=15, period=0.1)


@gen_test()
Expand Down Expand Up @@ -1754,5 +1754,5 @@ def run(self):
source.connect(sout)
source.start()

wait_for(lambda: L == [0], 0.01)
wait_for(lambda: L == [0], 1, period=0.01)
assert len(source_list) > 0
54 changes: 27 additions & 27 deletions streamz/tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def predicate():
return b'kafka entered RUNNING state' in out
except subprocess.CalledProcessError:
pass
wait_for(predicate, 10, period=0.1)
wait_for(predicate, 20, period=0.1)
return cid


Expand Down Expand Up @@ -123,12 +123,12 @@ def test_from_kafka():
kafka.produce(TOPIC, b'value-%d' % i)
kafka.flush()
# it takes some time for messages to come back out of kafka
wait_for(lambda: len(out) == 10, 10, period=0.1)
wait_for(lambda: len(out) == 10, 15, period=0.1)
assert out[-1] == b'value-9'

kafka.produce(TOPIC, b'final message')
kafka.flush()
wait_for(lambda: out[-1] == b'final message', 10, period=0.1)
wait_for(lambda: out[-1] == b'final message', 15, period=0.1)

stream._close_consumer()
kafka.produce(TOPIC, b'lost message')
Expand All @@ -154,7 +154,7 @@ def test_to_kafka():

source.emit('final message')
kafka.flush()
wait_for(lambda: len(out) == 11, 10, period=0.1)
wait_for(lambda: len(out) == 11, 15, period=0.1)
assert out[-1] == b'final message'


Expand All @@ -175,12 +175,12 @@ def test_from_kafka_thread():
kafka.produce(TOPIC, b'value-%d' % i)
kafka.flush()
# it takes some time for messages to come back out of kafka
yield await_for(lambda: len(out) == 10, 10, period=0.1)
yield await_for(lambda: len(out) == 15, 10, period=0.1)

assert out[-1] == b'value-9'
kafka.produce(TOPIC, b'final message')
kafka.flush()
yield await_for(lambda: out[-1] == b'final message', 10, period=0.1)
yield await_for(lambda: out[-1] == b'final message', 15, period=0.1)

stream._close_consumer()
kafka.produce(TOPIC, b'lost message')
Expand All @@ -205,12 +205,12 @@ def test_kafka_batch():
stream = Stream.from_kafka_batched(TOPIC, ARGS, max_batch_size=4, keys=True)
out = stream.sink_to_list()
stream.start()
wait_for(lambda: stream.upstream.started, 10, 0.1)
wait_for(lambda: stream.upstream.started, 15, 0.1)
for i in range(10):
kafka.produce(TOPIC, b'value-%d' % i, b'%d' % i)
kafka.flush()
# out may still be empty or first item of out may be []
wait_for(lambda: any(out) and out[-1][-1]['value'] == b'value-9', 10, period=0.2)
wait_for(lambda: any(out) and out[-1][-1]['value'] == b'value-9', 15, period=0.2)
assert out[-1][-1]['key'] == b'9'
# max_batch_size checks
assert len(out[0]) == len(out[1]) == 4 and len(out) == 3
Expand All @@ -233,7 +233,7 @@ async def test_kafka_dask_batch(c, s, w1, w2):
for i in range(10):
kafka.produce(TOPIC, b'value-%d' % i)
kafka.flush()
await await_for(lambda: any(out), 10, period=0.2)
await await_for(lambda: any(out), 15, period=0.2)
assert {'key': None, 'value': b'value-1'} in out[0]
stream.stop()
await asyncio.sleep(0)
Expand Down Expand Up @@ -280,17 +280,17 @@ def test_kafka_batch_npartitions():
npartitions=1)
out2 = stream2.gather().sink_to_list()
stream2.start()
wait_for(lambda: stream2.upstream.started, 10, 0.1)
wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 10, 0.1)
wait_for(lambda: stream2.upstream.started, 15, 0.1)
wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 15, 0.1)
stream2.upstream.stopped = True

stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
asynchronous=True,
npartitions=4)
out3 = stream3.gather().sink_to_list()
stream3.start()
wait_for(lambda: stream3.upstream.started, 10, 0.1)
wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 10, 0.1)
wait_for(lambda: stream3.upstream.started, 15, 0.1)
wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 15, 0.1)
stream3.upstream.stopped = True


Expand Down Expand Up @@ -322,8 +322,8 @@ def test_kafka_refresh_partitions():
poll_interval='2s')
out = stream.gather().sink_to_list()
stream.start()
wait_for(lambda: stream.upstream.started, 10, 0.1)
wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 10, 0.1)
wait_for(lambda: stream.upstream.started, 15, 0.1)
wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 15, 0.1)

subprocess.call(shlex.split("docker exec streamz-kafka "
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
Expand All @@ -338,7 +338,7 @@ def test_kafka_refresh_partitions():
kafka.flush()

wait_for(lambda: len(out) == 4 and (len(out[2]) + len(out[3])) == 10
and out[3][4] == b'value-19', 10, 0.1)
and out[3][4] == b'value-19', 15, 0.1)
stream.upstream.stopped = True


Expand Down Expand Up @@ -367,7 +367,7 @@ def test_kafka_batch_checkpointing_sync_nodes():
stream1 = Stream.from_kafka_batched(TOPIC, ARGS1)
out1 = stream1.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list()
stream1.start()
wait_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2)
wait_for(lambda: any(out1) and out1[-1][-1] == 9, 15, period=0.2)
stream1.upstream.stopped = True
stream2 = Stream.from_kafka_batched(TOPIC, ARGS1)
out2 = stream2.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list()
Expand All @@ -378,7 +378,7 @@ def test_kafka_batch_checkpointing_sync_nodes():
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2)
out3 = stream3.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list()
stream3.start()
wait_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2)
wait_for(lambda: any(out3) and out3[-1][-1] == 9, 15, period=0.2)
stream3.upstream.stopped = True


Expand Down Expand Up @@ -407,7 +407,7 @@ async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2):
dask=True)
out1 = stream1.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list()
stream1.start()
await await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2)
await await_for(lambda: any(out1) and out1[-1][-1] == 9, 15, period=0.2)
stream1.upstream.stopped = True
stream2 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True,
dask=True)
Expand All @@ -420,7 +420,7 @@ async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2):
dask=True)
out3 = stream3.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list()
stream3.start()
await await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2)
await await_for(lambda: any(out3) and out3[-1][-1] == 9, 15, period=0.2)
stream3.upstream.stopped = True


Expand Down Expand Up @@ -449,7 +449,7 @@ def test_kafka_batch_checkpointing_async_nodes_1():
stream2 = Stream.from_kafka_batched(TOPIC, ARGS)
out2 = stream2.partition(2).sliding_window(2, return_partial=False).sink_to_list()
stream2.start()
wait_for(lambda: stream2.upstream.started, 10, 0.1)
wait_for(lambda: stream2.upstream.started, 15, 0.1)
for i in range(2,6):
kafka.produce(TOPIC, b'value-%d' % i)
kafka.flush()
Expand All @@ -462,9 +462,9 @@ def test_kafka_batch_checkpointing_async_nodes_1():
stream3 = Stream.from_kafka_batched(TOPIC, ARGS)
out3 = stream3.sink_to_list()
stream3.start()
wait_for(lambda: stream3.upstream.started, 10, 0.1)
wait_for(lambda: stream3.upstream.started, 15, 0.1)
#Stream picks up from where it left before, i.e., from the last committed offset.
wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 10, 0.1)
wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 15, 0.1)
stream3.upstream.stopped = True
stream3.destroy()

Expand Down Expand Up @@ -586,7 +586,7 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
out1 = stream1.map(split).gather().sink_to_list()
stream1.start()
wait_for(lambda: stream1.upstream.started, 10, period=0.1)
wait_for(lambda: stream1.upstream.started, 15, period=0.1)

'''
Stream has started, so these are read.
Expand All @@ -596,7 +596,7 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
kafka.flush()

wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30,
10, period=0.1)
15, period=0.1)
'''
Stream stops but checkpoint has been created.
'''
Expand All @@ -617,12 +617,12 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
Stream restarts here.
'''
stream2.start()
wait_for(lambda: stream2.upstream.started, 10, 0.1)
wait_for(lambda: stream2.upstream.started, 15, 0.1)

for i in range(30):
kafka.produce(TOPIC, b'value-%d' % i)
kafka.flush()

wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30,
10, period=0.1)
15, period=0.1)
stream2.upstream.stopped = True
Loading