6
6
use React \EventLoop \LoopInterface ;
7
7
use RuntimeException ;
8
8
use Rx \Observable ;
9
- use function Rx \p ;
10
9
use Rx \Scheduler ;
11
10
use Rx \Subject \Subject ;
12
11
use Rx \Websocket \WebsocketErrorException ;
13
12
use Throwable ;
13
+ use function Rx \p ;
14
14
15
15
final class AsyncClient
16
16
{
@@ -38,19 +38,32 @@ final class AsyncClient
38
38
*/
39
39
private $ client ;
40
40
41
+ /**
42
+ * @var string|null
43
+ */
44
+ private $ authEndpoint ;
45
+
46
+ /**
47
+ * @var array|null
48
+ */
49
+ private $ authEndpointHeaders ;
50
+
41
51
/**
42
52
* @var Observable
43
53
*/
44
54
private $ connected ;
45
55
46
56
/**
47
- * @internal
48
- * @param Subject $client
57
+ * @param Subject $client
58
+ *
49
59
* @throws \InvalidArgumentException
60
+ * @internal
50
61
*/
51
- public function __construct (Subject $ client )
62
+ public function __construct (Subject $ client, string $ authEndpoint = null , array $ authEndpointHeaders = null )
52
63
{
53
- $ this ->client = $ client ;
64
+ $ this ->client = $ client ;
65
+ $ this ->authEndpoint = $ authEndpoint ;
66
+ $ this ->authEndpointHeaders = $ authEndpointHeaders ;
54
67
55
68
/** @var Observable $events */
56
69
$ events = $ client
@@ -77,17 +90,21 @@ public function __construct(Subject $client)
77
90
}
78
91
79
92
/**
80
- * @param LoopInterface $loop
81
- * @param string $app Application ID
82
- * @param Resolver $resolver Optional DNS resolver
83
- * @throws \InvalidArgumentException
93
+ * @param LoopInterface $loop
94
+ * @param string $app Application ID
95
+ * @param Resolver $resolver Optional DNS resolver
96
+ *
84
97
* @return AsyncClient
98
+ * @throws \InvalidArgumentException
85
99
*/
86
100
public static function create (
87
101
LoopInterface $ loop ,
88
102
string $ app ,
89
103
Resolver $ resolver = null ,
90
- string $ cluster = null
104
+ string $ cluster = null ,
105
+ string $ host = null ,
106
+ string $ authEndpoint = null ,
107
+ array $ authEndpointHeaders = null ,
91
108
): AsyncClient {
92
109
try {
93
110
Scheduler::setDefaultFactory (function () use ($ loop ) {
@@ -97,16 +114,19 @@ public static function create(
97
114
}
98
115
99
116
return new self (
100
- WebSocket::createFactory (ApiSettings::createUrl ($ app , $ cluster ), false , [], $ loop , $ resolver )
117
+ WebSocket::createFactory (ApiSettings::createUrl ($ app , $ cluster , $ host ), false , [], $ loop , $ resolver ),
118
+ $ authEndpoint ,
119
+ $ authEndpointHeaders
101
120
);
102
121
}
103
122
104
123
/**
105
124
* Listen on a channel.
106
125
*
107
- * @param string $channel Channel to listen on
108
- * @throws \InvalidArgumentException
126
+ * @param string $channel Channel to listen on
127
+ *
109
128
* @return Observable
129
+ * @throws \InvalidArgumentException
110
130
*/
111
131
public function channel (string $ channel ): Observable
112
132
{
@@ -121,9 +141,15 @@ public function channel(string $channel): Observable
121
141
});
122
142
123
143
$ subscribe = $ this ->connected
124
- ->do (function () use ($ channel ): void {
144
+ ->do (function (Event $ event ) use ($ channel ): void {
145
+ $ authKey = $ channelData = null ;
146
+
147
+ if (str_starts_with ($ channel , 'private- ' ) || str_starts_with ($ channel , 'presence- ' )) {
148
+ [$ authKey , $ channelData ] = $ this ->generateAuthToken ($ channel , $ event ->getData ()['socket_id ' ]);
149
+ }
150
+
125
151
// Subscribe to pusher channel after connected
126
- $ this ->send (Event::subscribeOn ($ channel ));
152
+ $ this ->send (Event::subscribeOn ($ channel, $ authKey , $ channelData ));
127
153
})
128
154
->flatMapTo (Observable::empty ());
129
155
@@ -152,7 +178,6 @@ public function channel(string $channel): Observable
152
178
* Send a message through the client.
153
179
*
154
180
* @param array $message Message to send, will be json encoded
155
- *
156
181
*/
157
182
public function send (array $ message ): void
158
183
{
@@ -163,7 +188,8 @@ public function send(array $message): void
163
188
* Returns an observable of TimeoutException.
164
189
* The timeout observable will get cancelled every time a new event is received.
165
190
*
166
- * @param Observable $events
191
+ * @param Observable $events
192
+ *
167
193
* @return Observable
168
194
*/
169
195
private function timeout (Observable $ events ): Observable
@@ -184,20 +210,22 @@ private function timeout(Observable $events): Observable
184
210
}
185
211
186
212
return Observable::never ()
187
- ->timeout ($ time )
188
- ->catch (function () use ($ time ) {
189
- // ping (do something that causes incoming stream to get a message)
190
- $ this ->send (Event::ping ());
191
- // this timeout will actually timeout with a TimeoutException - causing
192
- // everything above this to dispose
193
- return Observable::never ()->timeout ($ time );
194
- });
213
+ ->timeout ($ time )
214
+ ->catch (function () use ($ time ) {
215
+ // ping (do something that causes incoming stream to get a message)
216
+ $ this ->send (Event::ping ());
217
+ // this timeout will actually timeout with a TimeoutException - causing
218
+ // everything above this to dispose
219
+ return Observable::never ()->timeout ($ time );
220
+ });
195
221
});
196
222
}
197
223
198
224
/**
199
225
* Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
200
- * @param Throwable $throwable
226
+ *
227
+ * @param Throwable $throwable
228
+ *
201
229
* @return Observable
202
230
*/
203
231
private function handleLowLevelError (Throwable $ throwable ): Observable
@@ -233,4 +261,44 @@ private function handleLowLevelError(Throwable $throwable): Observable
233
261
234
262
return Observable::timer ($ this ->delay );
235
263
}
264
+
265
+ /**
266
+ * @throws \Exception
267
+ */
268
+ private function generateAuthToken (string $ channel , string $ socketId ): array
269
+ {
270
+ if (!$ this ->authEndpoint ) {
271
+ throw new \Exception ('No auth endpoint is configured to connect private or presence channel. ' );
272
+ }
273
+
274
+ $ curl = curl_init ();
275
+
276
+ curl_setopt_array ($ curl , [
277
+ CURLOPT_URL => $ this ->authEndpoint ,
278
+ CURLOPT_RETURNTRANSFER => true ,
279
+ CURLOPT_MAXREDIRS => 10 ,
280
+ CURLOPT_TIMEOUT => 5 ,
281
+ CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1 ,
282
+ CURLOPT_CUSTOMREQUEST => 'POST ' ,
283
+ CURLOPT_POSTFIELDS => ['channel_name ' => $ channel , 'socket_id ' => $ socketId , 'user_data ' => []],
284
+ CURLOPT_HTTPHEADER => $ this ->authEndpointHeaders ,
285
+ ]);
286
+
287
+ $ response = curl_exec ($ curl );
288
+ $ responseCode = curl_getinfo ($ curl , CURLINFO_RESPONSE_CODE );
289
+
290
+ curl_close ($ curl );
291
+
292
+ if ($ responseCode !== 200 ) {
293
+ throw new \Exception ('Can \'t generate auth token for ' . $ channel . '. Response code ' . $ responseCode );
294
+ }
295
+
296
+ $ response = json_decode ($ response , true );
297
+
298
+ if (!isset ($ response ['auth ' ])) {
299
+ throw new \Exception ('Invalid response for auth token. ' );
300
+ }
301
+
302
+ return [$ response ['auth ' ], $ response ['channel_data ' ] ?? null ];
303
+ }
236
304
}
0 commit comments