Skip to content

Commit 311da4a

Browse files
committed
Fix: EntityTrackerSubscriber to track entities separately for each connection
Fix: Index managers for the same connection to reuse the same ConnectionManager instance
1 parent 555bf15 commit 311da4a

File tree

14 files changed

+144
-22
lines changed

14 files changed

+144
-22
lines changed

DependencyInjection/Compiler/AddConnectionsPass.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function process(ContainerBuilder $container)
3333
$connectionDefinition->setFactory(
3434
[
3535
new Reference('sfes.connection_factory'),
36-
'createConnectionManager'
36+
'createConnectionManager',
3737
]
3838
);
3939

@@ -42,7 +42,7 @@ public function process(ContainerBuilder $container)
4242
$connectionDefinition
4343
);
4444

45-
if ($connectionName === 'default') {
45+
if ('default' === $connectionName) {
4646
$container->setAlias('sfes.connection', 'sfes.connection.default');
4747
}
4848
}

DependencyInjection/Compiler/AddIndexManagersPass.php

+2-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use Symfony\Component\DependencyInjection\Reference;
1010

1111
/**
12-
* Compiles elastic search data.
12+
* Registers index manager service definitions
1313
*/
1414
class AddIndexManagersPass implements CompilerPassInterface
1515
{
@@ -27,9 +27,7 @@ public function process(ContainerBuilder $container)
2727
// Make sure the connection service definition exists
2828
$connectionService = sprintf('sfes.connection.%s', $indexSettings['connection']);
2929
if (!$container->hasDefinition($connectionService)) {
30-
throw new InvalidConfigurationException(
31-
'There is no ES connection with name '.$indexSettings['connection']
32-
);
30+
throw new InvalidConfigurationException(sprintf('There is no ES connection with name %s', $indexSettings['connection']));
3331
}
3432

3533
$indexManagerClass = $container->getParameter('sfes.index_manager.class');

Document/Repository/Repository.php

+7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class Repository
3737
*/
3838
protected $metadata;
3939

40+
/**
41+
* @var DocumentMetadataCollector
42+
*/
43+
protected $documentMetadataCollector;
44+
4045
/**
4146
* Constructor.
4247
*
@@ -90,6 +95,7 @@ public function getById($id, $resultType = Finder::RESULTS_OBJECT)
9095
* @param int $resultsType Bitmask value determining how the results are returned
9196
* @param array $additionalRequestParams Additional params to pass to the ES client's search() method
9297
* @param int $totalHits The total hits of the query response
98+
*
9399
* @return mixed
94100
*/
95101
public function find(array $searchBody, $resultsType = Finder::RESULTS_OBJECT, array $additionalRequestParams = [], &$totalHits = null)
@@ -102,6 +108,7 @@ public function find(array $searchBody, $resultsType = Finder::RESULTS_OBJECT, a
102108
*
103109
* @param array $searchBody
104110
* @param array $additionalRequestParams
111+
*
105112
* @return int
106113
*/
107114
public function count(array $searchBody = [], array $additionalRequestParams = [])

Event/PostCommitEvent.php

+18-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Sineflow\ElasticsearchBundle\Event;
44

5+
use Sineflow\ElasticsearchBundle\Manager\ConnectionManager;
56
use Symfony\Component\EventDispatcher\Event;
67

78
/**
@@ -15,11 +16,18 @@ class PostCommitEvent extends Event
1516
private $bulkResponse;
1617

1718
/**
18-
* @param array $bulkResponse
19+
* @var string
1920
*/
20-
public function __construct(array $bulkResponse)
21+
private $connectionName;
22+
23+
/**
24+
* @param array $bulkResponse
25+
* @param ConnectionManager $connectionManager
26+
*/
27+
public function __construct(array $bulkResponse, ConnectionManager $connectionManager)
2128
{
2229
$this->bulkResponse = $bulkResponse;
30+
$this->connectionName = $connectionManager->getConnectionName();
2331
}
2432

2533
/**
@@ -29,4 +37,12 @@ public function getBulkResponse()
2937
{
3038
return $this->bulkResponse;
3139
}
40+
41+
/**
42+
* @return string
43+
*/
44+
public function getConnectionName()
45+
{
46+
return $this->connectionName;
47+
}
3248
}

Event/PrePersistEvent.php

+19-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Sineflow\ElasticsearchBundle\Event;
44

55
use Sineflow\ElasticsearchBundle\Document\DocumentInterface;
6+
use Sineflow\ElasticsearchBundle\Manager\ConnectionManager;
67
use Symfony\Component\EventDispatcher\Event;
78

89
/**
@@ -15,19 +16,25 @@ class PrePersistEvent extends Event
1516
*/
1617
private $document;
1718

19+
/**
20+
* @var string
21+
*/
22+
private $connectionName;
23+
1824
/**
1925
* @var int
2026
*/
2127
private $bulkOperationIndex;
2228

2329
/**
2430
* @param DocumentInterface $document
25-
* @param int $bulkOperationIndex
31+
* @param ConnectionManager $connectionManager
2632
*/
27-
public function __construct(DocumentInterface $document, $bulkOperationIndex)
33+
public function __construct(DocumentInterface $document, ConnectionManager $connectionManager)
2834
{
29-
$this->document = $document;
30-
$this->bulkOperationIndex = $bulkOperationIndex;
35+
$this->document = $document;
36+
$this->connectionName = $connectionManager->getConnectionName();
37+
$this->bulkOperationIndex = $connectionManager->getBulkOperationsCount();
3138
}
3239

3340
/**
@@ -38,6 +45,14 @@ public function getDocument()
3845
return $this->document;
3946
}
4047

48+
/**
49+
* @return string
50+
*/
51+
public function getConnectionName()
52+
{
53+
return $this->connectionName;
54+
}
55+
4156
/**
4257
* @return int
4358
*/

Manager/ConnectionManager.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public function commit($forceRefresh = true)
220220
}
221221

222222
if ($this->eventDispatcher) {
223-
$this->eventDispatcher->dispatch(Events::POST_COMMIT, new PostCommitEvent($response));
223+
$this->eventDispatcher->dispatch(Events::POST_COMMIT, new PostCommitEvent($response, $this));
224224
}
225225
}
226226

@@ -329,7 +329,9 @@ public function getAliases()
329329
*
330330
* $params['index'] = (list) A comma-separated list of indices/aliases to check (Required)
331331
* @param array $params Associative array of parameters
332+
*
332333
* @return bool
334+
*
333335
* @throws InvalidArgumentException
334336
*/
335337
public function existsIndexOrAlias(array $params)
@@ -370,7 +372,9 @@ public function existsIndexOrAlias(array $params)
370372
*
371373
* @param array $params
372374
* $params['name'] = (list) A comma-separated list of alias names to return (Required)
375+
*
373376
* @return bool
377+
*
374378
* @throws InvalidArgumentException
375379
*/
376380
public function existsAlias(array $params)

Manager/ConnectionManagerFactory.php

+16-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ConnectionManagerFactory
2323
private $tracer;
2424

2525
/**
26-
* @var boolean
26+
* @var bool
2727
*/
2828
private $kernelDebug;
2929

@@ -32,6 +32,13 @@ class ConnectionManagerFactory
3232
*/
3333
private $eventDispatcher;
3434

35+
/**
36+
* Array to keep track of already created connection managers, so the same instance is returned for subsequent service requests
37+
*
38+
* @var array ConnectionManager[]
39+
*/
40+
private $connectionManagers = [];
41+
3542
/**
3643
* @param boolean $kernelDebug
3744
* @param LoggerInterface $tracer
@@ -63,10 +70,16 @@ public function setEventDispatcher($eventDispatcher)
6370
/**
6471
* @param string $connectionName
6572
* @param array $connectionSettings
73+
*
6674
* @return ConnectionManager
6775
*/
6876
public function createConnectionManager($connectionName, $connectionSettings)
6977
{
78+
// If we already have a ConnectionManager instance for the required connection, do not create a new one
79+
if (isset($this->connectionManagers[$connectionName])) {
80+
return $this->connectionManagers[$connectionName];
81+
}
82+
7083
$clientBuilder = ClientBuilder::create();
7184

7285
$clientBuilder->setHosts($connectionSettings['hosts']);
@@ -88,6 +101,8 @@ public function createConnectionManager($connectionName, $connectionSettings)
88101
$connectionManager->setLogger($this->logger ?: new NullLogger());
89102
$connectionManager->setEventDispatcher($this->eventDispatcher);
90103

104+
$this->connectionManagers[$connectionName] = $connectionManager;
105+
91106
return $connectionManager;
92107
}
93108
}

Manager/IndexManager.php

+1-2
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,7 @@ public function update($documentClass, $id, array $fields = [], $script = null,
719719
public function persist(DocumentInterface $document, array $metaParams = [])
720720
{
721721
if ($this->eventDispatcher) {
722-
$bulkOperationIndex = $this->getConnection()->getBulkOperationsCount();
723-
$this->eventDispatcher->dispatch(Events::PRE_PERSIST, new PrePersistEvent($document, $bulkOperationIndex));
722+
$this->eventDispatcher->dispatch(Events::PRE_PERSIST, new PrePersistEvent($document, $this->getConnection()));
724723
}
725724

726725
$documentArray = $this->documentConverter->convertToArray($document);

Manager/IndexManagerFactory.php

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public function setEventDispatcher($eventDispatcher)
8585
* @param string $managerName
8686
* @param ConnectionManager $connection
8787
* @param array $indexSettings
88+
*
8889
* @return IndexManager
8990
*/
9091
public function createManager(

Subscriber/EntityTrackerSubscriber.php

+12-4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public static function getSubscribedEvents()
4747

4848
/**
4949
* @param PrePersistEvent $prePersistEvent
50+
*
51+
* @throws \ReflectionException
5052
*/
5153
public function onPrePersist(PrePersistEvent $prePersistEvent)
5254
{
@@ -56,8 +58,8 @@ public function onPrePersist(PrePersistEvent $prePersistEvent)
5658
);
5759
if (isset($propertiesMetadata['_id'])) {
5860
$bulkOperationIndex = $prePersistEvent->getBulkOperationIndex();
59-
$this->entitiesData[$bulkOperationIndex]['entity'] = $prePersistEvent->getDocument();
60-
$this->entitiesData[$bulkOperationIndex]['metadata'] = $propertiesMetadata;
61+
$this->entitiesData[$prePersistEvent->getConnectionName()][$bulkOperationIndex]['entity'] = $prePersistEvent->getDocument();
62+
$this->entitiesData[$prePersistEvent->getConnectionName()][$bulkOperationIndex]['metadata'] = $propertiesMetadata;
6163
}
6264
}
6365

@@ -66,7 +68,13 @@ public function onPrePersist(PrePersistEvent $prePersistEvent)
6668
*/
6769
public function onPostCommit(PostCommitEvent $postCommitEvent)
6870
{
69-
foreach ($this->entitiesData as $bulkOperationIndex => $entityData) {
71+
// No need to do anything if there are no persisted entities for that connection
72+
if (empty($this->entitiesData[$postCommitEvent->getConnectionName()])) {
73+
return;
74+
}
75+
76+
// Update the ids of persisted entity objects
77+
foreach ($this->entitiesData[$postCommitEvent->getConnectionName()] as $bulkOperationIndex => $entityData) {
7078
$idValue = current($postCommitEvent->getBulkResponse()['items'][$bulkOperationIndex])['_id'];
7179
$idPropertyMetadata = $entityData['metadata']['_id'];
7280
$entity = $entityData['entity'];
@@ -78,6 +86,6 @@ public function onPostCommit(PostCommitEvent $postCommitEvent)
7886
}
7987

8088
// Clear the array to avoid any memory leaks
81-
$this->entitiesData = [];
89+
$this->entitiesData[$postCommitEvent->getConnectionName()] = [];
8290
}
8391
}

Tests/Functional/Subscriber/EntityTrackerSubscriberTest.php

+30-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Sineflow\ElasticsearchBundle\Finder\Finder;
66
use Sineflow\ElasticsearchBundle\Tests\AbstractElasticsearchTestCase;
77
use Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\FooBundle\Document\Customer;
8+
use Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\FooBundle\Document\Log;
89

910
/**
1011
* Class EntityTrackerSubscriberTest
@@ -19,22 +20,47 @@ public function testPersistWithSeveralBulkOps()
1920
$converter = $this->getContainer()->get('sfes.document_converter');
2021

2122
$imWithAliases = $this->getIndexManager('customer');
22-
$imWithAliases->getConnection()->setAutocommit(false);
2323

24+
// Another index manager on the same connection
25+
$imNoAliases = $this->getIndexManager('bar');
26+
27+
// Make sure both index managers share the same connection object instance
28+
$this->assertSame($imWithAliases->getConnection(), $imNoAliases->getConnection());
29+
30+
$imNoAliases->getConnection()->setAutocommit(false);
31+
32+
// Index manager on another connection
33+
$backupIm = $this->getIndexManager('backup');
34+
$backupIm->getConnection()->setAutocommit(false);
35+
36+
// Make sure this index manager has a separate connection manager
37+
$this->assertNotSame($imWithAliases->getConnection(), $backupIm->getConnection());
38+
39+
40+
// Persist raw document - ignored by the subscriber as there's no entity to update
2441
$rawCustomer = new Customer();
2542
$rawCustomer->name = 'firstRaw';
2643
$documentArray = $converter->convertToArray($rawCustomer);
2744
$imWithAliases->persistRaw('AcmeFooBundle:Customer', $documentArray);
2845

46+
// Persist entity - handled by the subscriber
2947
$customer = new Customer();
3048
$customer->name = 'batman';
3149
$imWithAliases->persist($customer);
3250

51+
// Persist another raw document - ignored by the subscriber as there's no entity to update
3352
$secondRawCustomer = new Customer();
3453
$secondRawCustomer->name = 'secondRaw';
3554
$documentArray = $converter->convertToArray($secondRawCustomer);
3655
$imWithAliases->persistRaw('AcmeFooBundle:Customer', $documentArray);
3756

57+
// Persist an entity to another connection to make sure the subscriber handles the 2 commits independently
58+
$log = new Log();
59+
$log->id = 123;
60+
$log->entry = 'test log entry';
61+
$backupIm->persist($log);
62+
63+
// Persist another entity to the first connection - handled by the subscriber
3864
$secondCustomer = new Customer();
3965
$secondCustomer->id = '555';
4066
$secondCustomer->name = 'joker';
@@ -45,12 +71,15 @@ public function testPersistWithSeveralBulkOps()
4571
$this->assertNull($secondRawCustomer->id);
4672
$this->assertEquals('555', $secondCustomer->id);
4773

74+
4875
$imWithAliases->getConnection()->commit();
76+
$backupIm->getConnection()->commit();
4977

5078
$this->assertNull($rawCustomer->id, 'id should not have been set');
5179
$this->assertNotNull($customer->id, 'id should have been set');
5280
$this->assertNull($secondRawCustomer->id, 'id should not have been set');
5381
$this->assertEquals('555', $secondCustomer->id);
82+
$this->assertEquals(123, $log->id);
5483

5584
// Get the customer from ES by name
5685
$finder = $this->getContainer()->get('sfes.finder');

Tests/app/config/config_test.yml

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ sineflow_elasticsearch:
1818
logging: true
1919
bulk_batch_size: 100
2020

21+
backup_conn:
22+
hosts: '%elasticsearch_hosts%'
23+
profiling: false
24+
logging: false
25+
2126
indices:
2227
_base:
2328
connection: default
@@ -53,6 +58,12 @@ sineflow_elasticsearch:
5358
types:
5459
- AcmeBarBundle:Product
5560

61+
backup:
62+
connection: backup_conn
63+
name: sineflow-esb-backup
64+
types:
65+
- AcmeFooBundle:Log
66+
5667
services:
5768
app.es.language_provider:
5869
class: Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\BarBundle\LanguageProvider

0 commit comments

Comments
 (0)