Skip to content

Commit 0b8c364

Browse files
committed
develop
1 parent 4a7a8d1 commit 0b8c364

9 files changed

+69
-27
lines changed

bin/test

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ waitForService redis 6379 50
3636
waitForService beanstalkd 11300 50
3737
waitForService gearmand 4730 50
3838
waitForService kafka 9092 50
39+
waitForService mongo 27017 50
3940

4041
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
4142
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@
145145
{
146146
"type": "path",
147147
"url": "pkg/async-event-dispatcher"
148-
}
148+
},
149149
{
150150
"type": "path",
151151
"url": "pkg/mongodb"

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ services:
4545
- RDKAFKA_PORT=9092
4646
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
4747
- GCLOUD_PROJECT=mqdev
48-
- MONGO_DSN=mongodb://127.0.0.1/
48+
- MONGO_DSN=mongodb://mongo
4949

5050
rabbitmq:
5151
image: 'enqueue/rabbitmq:latest'

pkg/mongodb/MongodbConnectionFactory.php

+41-3
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,23 @@ class MongodbConnectionFactory implements PsrConnectionFactory
1212
*/
1313
private $config;
1414

15-
public function __construct(array $params)
15+
public function __construct($config = 'mongodb:')
1616
{
17-
$this->config = array_replace([
17+
if (empty($config)) {
18+
$config = $this->parseDsn('mongodb:');
19+
} elseif (is_string($config)) {
20+
$config = $this->parseDsn($config);
21+
} elseif (is_array($config)) {
22+
} else {
23+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
24+
}
25+
$config = array_replace([
1826
'uri' => 'mongodb://127.0.0.1/',
1927
'uriOptions' => [],
2028
'driverOptions' => [],
21-
], $params);
29+
], $config);
30+
31+
$this->config = $config;
2232
}
2333

2434
public function createContext()
@@ -27,4 +37,32 @@ public function createContext()
2737

2838
return new MongodbContext($client, $this->config);
2939
}
40+
41+
private function parseDsn($dsn)
42+
{
43+
if (false === parse_url($dsn)) {
44+
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
45+
}
46+
47+
$schema = parse_url($dsn, PHP_URL_SCHEME);
48+
if (empty($schema)) {
49+
throw new \LogicException('Schema is empty');
50+
}
51+
52+
$supported = [
53+
'mongodb' => true,
54+
];
55+
56+
if (false == isset($supported[$schema])) {
57+
throw new \LogicException(sprintf(
58+
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
59+
$schema,
60+
implode('", "', array_keys($supported))
61+
));
62+
}
63+
64+
return [
65+
'uri' => $schema.':' === $dsn ? $schema.'://127.0.0.1/' : $dsn,
66+
];
67+
}
3068
}

pkg/mongodb/MongodbConsumer.php

+4-5
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,14 @@ protected function receiveMessage()
136136
$now = time();
137137
$collection = $this->context->getCollection();
138138
$message = $collection->findOneAndDelete(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]],
139-
['sort' => ['priority' => -1], 'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
139+
['sort' => ['priority' => -1, 'published_at' => 1], 'typeMap' => ['root' => 'array', 'document' => 'array']]);
140140

141141
if (!$message) {
142142
return null;
143143
}
144-
145-
$convertedMessage = $this->convertMessage($message);
146-
147-
return $convertedMessage;
144+
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
145+
return $this->convertMessage($message);
146+
}
148147
}
149148

150149
/**

pkg/mongodb/MongodbProducer.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Enqueue\Mongodb;
44

5-
use Enqueue\Util\JSON;
65
use Interop\Queue\Exception;
76
use Interop\Queue\InvalidDestinationException;
87
use Interop\Queue\InvalidMessageException;
@@ -81,10 +80,11 @@ public function send(PsrDestination $destination, PsrMessage $message)
8180
$mongoMessage = [
8281
'published_at' => $publishedAt,
8382
'body' => $body,
84-
'headers' => JSON::encode($message->getHeaders()),
85-
'properties' => JSON::encode($message->getProperties()),
83+
'headers' => $message->getHeaders(),
84+
'properties' => $message->getProperties(),
8685
'priority' => $message->getPriority(),
8786
'queue' => $destination->getQueueName(),
87+
'redelivered' => $message->isRedelivered(),
8888
];
8989

9090
$delay = $message->getDeliveryDelay();

pkg/mongodb/Tests/Functional/MongodbConsumerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate()
7575
$olderTime = $time - 10000;
7676

7777
$expectedPriority5Body = __CLASS__.'_priority5_'.$time;
78-
$expectedPriority5BodyOlderTime = __CLASS__.'_priority5_'.$olderTime;
78+
$expectedPriority5BodyOlderTime = __CLASS__.'_priority5Old_'.$olderTime;
7979

8080
$producer = $context->createProducer();
8181

pkg/mongodb/Tests/MongodbConnectionFactoryTest.php

+15-9
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,27 @@ public function testShouldImplementConnectionFactoryInterface()
1818

1919
public function testCouldBeConstructedWithEmptyConfiguration()
2020
{
21+
$params = [
22+
'uri' => 'mongodb://127.0.0.1/',
23+
'uriOptions' => [],
24+
'driverOptions' => [],
25+
];
26+
2127
$factory = new MongodbConnectionFactory();
22-
$this->assertAttributeEquals('mongodb://127.0.0.1/', 'uri', $factory);
23-
$this->assertAttributeEquals([], 'config', $factory);
24-
$this->assertAttributeEquals([], 'uriOptions', $factory);
25-
$this->assertAttributeEquals([], 'driverOptions', $factory);
28+
$this->assertAttributeEquals($params, 'config', $factory);
2629
}
2730

2831
public function testCouldBeConstructedWithCustomConfiguration()
2932
{
30-
$factory = new MongodbConnectionFactory('mongodb://127.0.0.3/', ['testValue' => 123], ['testValue' => 123], ['testValue' => 123]);
33+
$params = [
34+
'uri' => 'mongodb://127.0.0.3/',
35+
'uriOptions' => ['testValue' => 123],
36+
'driverOptions' => ['testValue' => 123],
37+
];
38+
39+
$factory = new MongodbConnectionFactory($params);
3140

32-
$this->assertAttributeEquals('mongodb://127.0.0.3/', 'uri', $factory);
33-
$this->assertAttributeEquals(['testValue' => 123], 'config', $factory);
34-
$this->assertAttributeEquals(['testValue' => 123], 'uriOptions', $factory);
35-
$this->assertAttributeEquals(['testValue' => 123], 'driverOptions', $factory);
41+
$this->assertAttributeEquals($params, 'config', $factory);
3642
}
3743

3844
public function testShouldCreateContext()

pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php

+2-4
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@ protected function createMongodbContext()
1111
if (false == $env = getenv('MONGO_DSN')) {
1212
$this->markTestSkipped('The MONGO_DSN env is not available. Skip tests');
1313
}
14-
15-
$factory = new MongodbConnectionFactory($env);
14+
$params = ['uri' => $env];
15+
$factory = new MongodbConnectionFactory($params);
1616

1717
$context = $factory->createContext();
1818

19-
//$context->getClient()->dropDatabase('enqueue');
20-
2119
return $context;
2220
}
2321
}

0 commit comments

Comments
 (0)