diff --git a/composer.json b/composer.json index ae6f450b6..b41926045 100644 --- a/composer.json +++ b/composer.json @@ -11,17 +11,25 @@ ], "require": { "php": "^5.6|^7.0", - "psr/log": "~1.0", - "guzzlehttp/ringphp" : "~1.0" + "psr/log": "^1.0", + "psr/http-message": "^1.0", + "php-http/httplug": "^1.0", + "php-http/logger-plugin": "^1.0", + "php-http/client-common": "dev-master", + "php-http/discovery": "^1.0", + "php-http/message-factory": "^1.0" }, "require-dev": { "phpunit/phpunit": "^4.7|^5.4", - "mockery/mockery": "0.9.4", "symfony/yaml": "^2.8", "symfony/finder": "^2.8", "cpliakas/git-wrapper": "~1.0", "sami/sami": "~3.2", - "doctrine/inflector": "^1.1" + "doctrine/inflector": "^1.1", + "monolog/monolog": "^1.21", + "php-http/socket-client": "^1.2", + "php-http/message": "^1.3", + "guzzlehttp/psr7": "^1.3" }, "suggest": { "ext-curl": "*", diff --git a/docs/breaking-changes.asciidoc b/docs/breaking-changes.asciidoc index aebb599f9..387b2fa6a 100644 --- a/docs/breaking-changes.asciidoc +++ b/docs/breaking-changes.asciidoc @@ -1,4 +1,17 @@ +== Breaking changes from 2.x + +- The client now requires PHP version 5.6 or higher +- The client does not depend on any http client implementation, instead it relies on link:http://httplug.io[Httplug] + which abstract most of existing http client in PHP (guzzle6, guzzle5, react, zend, cakephp, buzz, socket, curl, ...) +- The builder pattern has less options. Please see link:_configuration.html[Configuration] for more details +- Notably setting the handler or ssl is not possible anymore, you should set a `HttpAsyncClient` instead with the +configuration of your choice +- All exceptions about the client (NoNodesAvailable, etc...) do not exist anymore, you should use the one provided by +Httplug +- Many exceptions about Elasticsearch are now under the `Http` namespace, also they include a PSR7 Response and the +original PSR7 Request + == Breaking changes from 1.x - The client now requires PHP version 5.4 or higher diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 9f6a9fbc4..24fcce018 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -74,47 +74,6 @@ alive nodes, and `setRetries(5)`, the client will attempt to execute the command result in a connection timeout (for example), the client will throw an `OperationTimeoutException`. Depending on the Connection Pool being used, these nodes may also be marked dead. -To help in identification, exceptions that are thrown due to max retries will wrap a `MaxRetriesException`. For example, -you can catch a specific curl exception then check if it wraps a MaxRetriesException using `getPrevious()`: - -[source,php] ----- -$client = Elasticsearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - -try { - $client->search($searchParams); -} catch (Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost $e) { - $previous = $e->getPrevious(); - if ($previous instanceof 'Elasticsearch\Common\Exceptions\MaxRetriesException') { - echo "Max retries!"; - } -} ----- - -Alternatively, all "hard" curl exceptions (`CouldNotConnectToHost`, `CouldNotResolveHostException`, `OperationTimeoutException`) -extend the more general `TransportException`. So you could instead catch the general `TransportException` and then -check it's previous value: - -[source,php] ----- -$client = Elasticsearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - -try { - $client->search($searchParams); -} catch (Elasticsearch\Common\Exceptions\TransportException $e) { - $previous = $e->getPrevious(); - if ($previous instanceof 'Elasticsearch\Common\Exceptions\MaxRetriesException') { - echo "Max retries!"; - } -} ----- - [[enabling_logger]] === Enabling the Logger @@ -129,8 +88,8 @@ You might have noticed that Monolog was suggested during installation. To begin { "require": { ... - "elasticsearch/elasticsearch" : "~2.0", - "monolog/monolog": "~1.0" + "elasticsearch/elasticsearch" : "^2.0", + "monolog/monolog": "^1.0" } } ---------------------------- @@ -142,34 +101,7 @@ And then update your composer installation: php composer.phar update ---------------------------- -Once Monolog (or another logger) is installed, you need to create a log object and inject it into the client. The -`ClientBuilder` object has a helper static function that will generate a common Monolog-based logger for you. All you need -to do is provide the path to your desired logging location: - -[source,php] ----- -$logger = ClientBuilder::defaultLogger('path/to/your.log'); - -$client = ClientBuilder::create() // Instantiate a new ClientBuilder - ->setLogger($logger) // Set the logger with a default logger - ->build(); // Build the client object ----- - -You can also specify the severity of log messages that you wish to log: - -[source,php] ----- -// set severity with second parameter -$logger = ClientBuilder::defaultLogger('/path/to/logs/', Logger::INFO); - -$client = ClientBuilder::create() // Instantiate a new ClientBuilder - ->setLogger($logger) // Set the logger with a default logger - ->build(); // Build the client object ----- - -The `defaultLogger()` method is just a helper, you are not required to use it. You can create your own logger and inject -that instead: - +Once Monolog (or another logger) is installed, you need to create a log object and inject it into the client. [source,php] ---- @@ -185,69 +117,49 @@ $client = ClientBuilder::create() // Instantiate a new ClientBuilder ---- -=== Configure the HTTP Handler +=== Configure the HTTP Client -Elasticsearch-PHP uses an interchangeable HTTP transport layer called https://github.com/guzzle/RingPHP/[RingPHP]. This -allows the client to construct a generic HTTP request, then pass it to the transport layer to execute. The actual execution -details are hidden from the client and modular, so that you can choose from several HTTP handlers depending on your needs. +Elasticsearch-PHP uses an interchangeable HTTP transport layer called http://docs.php-http.org/en/latest/httplug/introduction.html[HTTPlug]. +This allows the client to construct a PSR7 HTTP Request, then pass it to the transport layer to execute. +The actual execution details are hidden from the client and modular, so that you can choose from several HTTP clients +depending on your needs. -The default handler that the client uses is a combination handler. When executing in synchronous mode, the handler -uses `CurlHandler`, which executes single curl calls. These are very fast for single requests. When asynchronous (future) -mode is enabled, the handler switches to `CurlMultiHandler`, which uses the curl_multi interface. This involves a bit -more overhead, but allows batches of HTTP requests to be processed in parallel. +By default it will try to find an `HttpAsyncClient` with helps of the http://docs.php-http.org/en/latest/discovery.html[discovery system] -You can configure the HTTP handler with one of several helper functions, or provide your own custom handler: +You can configure the HTTP Client in the builder: [source,php] ---- -$defaultHandler = ClientBuilder::defaultHandler(); -$singleHandler = ClientBuilder::singleHandler(); -$multiHandler = ClientBuilder::multiHandler(); -$customHandler = new MyCustomHandler(); - -$client = ClientBuilder::create() - ->setHandler($defaultHandler) - ->build(); ----- - -For details on creating your own custom Ring handler, please see the http://guzzle.readthedocs.org/en/latest/handlers.html[RingPHP Documentation] - -The default handler is recommended in almost all cases. This allows fast synchronous execution, while retaining flexibility -to invoke parallel batches with async future mode. You may consider using just the `singleHandler` if you know you will -never need async capabilities, since it will save a small amount of overhead by reducing indirection. - +use GuzzleHttp\Client as GuzzleClient; +use Http\Adapter\Guzzle6\Client as GuzzleAdapter; -=== Setting the Connection Pool - -The client maintains a pool of connections, with each connection representing a node in your cluster. There are several -connection pool implementations available, and each has slightly different behavior (pinging vs no pinging, etc). -Connection pools are configured via the `setConnectionPool()` method: +$config = [ + // Config params of Guzzle6 +]; -[source,php] ----- -$connectionPool = '\Elasticsearch\ConnectionPool\StaticNoPingConnectionPool'; +$httpAsyncClient = new GuzzleAdapter(new GuzzleClient($config)); $client = ClientBuilder::create() - ->setConnectionPool($connectionPool) + ->setHttpAsyncClient($httpAsyncClient) ->build(); ---- -For more details, please see the dedicated page on link:_connection_pool.html[configuring connection pools]. +For details on creating or using an HttpAsyncClient, please see http://docs.php-http.org/en/latest/clients.html[HTTPlug Documentation] -=== Setting the Connection Selector +=== Setting the Client Pool -The connection pool manages the connections to your cluster, but the Selector is the logic that decides which connection -should be used for the next API request. There are several selectors that you can choose from. Selectors can be changed -via the `setSelector()` method: +The connection pool manages the logic that decides which connection should be used for the next API request. +There are several strategies that you can choose from. It can be changed via the `setClientPoolStrategy()` method: [source,php] ---- -$selector = '\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector'; +use Http\Client\Common\HttpClientPool\LeastUsedClientPool; + $client = ClientBuilder::create() - ->setSelector($selector) + ->setClientPoolStrategy(LeastUsedClientPool::class) ->build(); ---- -For more details, please see the dedicated page on link:_selectors.html[configuring selectors]. +For more details, please see the dedicated page on http://docs.php-http.org/en/latest/components/client-common.html#httpclientpool[HTTPlug]. === Setting the Serializer @@ -269,61 +181,6 @@ $client = ClientBuilder::create() For more details, please see the dedicated page on link:_serializers.html[configuring serializers]. - -=== Setting a custom ConnectionFactory - -The ConnectionFactory instantiates new Connection objects when requested by the ConnectionPool. A single Connection -represents a single node. Since the client hands actual networking work over to RingPHP, the Connection's main job is -book-keeping: Is this node alive? Did it fail a ping request? What is the host and port? - -There is little reason to provide your own ConnectionFactory, but if you need to do so, you need to supply an intact -ConnectionFactory object to the `setConnectionFactory()` method. The object should implement the `ConnectionFactoryInterface` -interface. - -[source,php] ----- - -class MyConnectionFactory implements ConnectionFactoryInterface -{ - - public function __construct($handler, array $connectionParams, - SerializerInterface $serializer, - LoggerInterface $logger, - LoggerInterface $tracer) - { - // Code here - } - - - /** - * @param $hostDetails - * - * @return ConnectionInterface - */ - public function create($hostDetails) - { - // Code here...must return a Connection object - } -} - - -$connectionFactory = new MyConnectionFactory( - $handler, - $connectionParams, - $serializer, - $logger, - $tracer -); - -$client = ClientBuilder::create() - ->setConnectionFactory($connectionFactory); - ->build(); ----- - -As you can see, if you decide to inject your own ConnectionFactory, you take over the responsibiltiy of wiring it correctly. -The ConnectionFactory requires a working HTTP handler, serializer, logger and tracer. - - === Set the Endpoint closure The client uses an Endpoint closure to dispatch API requests to the correct Endpoint object. A namespace object will @@ -378,7 +235,7 @@ $params = [ 'localhost:9200' ], 'retries' => 2, - 'handler' => ClientBuilder::singleHandler() + 'httpAsyncClient' => new GuzzleAdapter(new GuzzleClient([])) ]; $client = ClientBuilder::fromConfig($params); ---- diff --git a/docs/connection-pool.asciidoc b/docs/connection-pool.asciidoc deleted file mode 100644 index 192fb3dbd..000000000 --- a/docs/connection-pool.asciidoc +++ /dev/null @@ -1,204 +0,0 @@ - -== Connection Pool - -The connection pool is an object inside the client that is responsible for maintaining the current list of nodes. -Theoretically, nodes are either dead or alive. - -However, in the real world, things are never so clear. Nodes are sometimes in a gray-zone of _"probably dead but not -confirmed"_, _"timed-out but unclear why"_ or _"recently dead but now alive"_. The connection pool's job is to -manage this set of unruly connections and try to provide the best behavior to the client. - -If a connection pool is unable to find an alive node to query against, it will return a `NoNodesAvailableException`. -This is distinct from an exception due to maximum retries. For example, your cluster may have 10 nodes. You execute -a request and 9 out of the 10 nodes fail due to connection timeouts. The tenth node succeeds and the query executes. -The first nine nodes will be marked dead (depending on the connection pool being used) and their "dead" timers will begin -ticking. - -When the next request is sent to the client, nodes 1-9 are still considered "dead", so they will be skipped. The request -is sent to the only known alive node (#10), and if this node fails, a `NoNodesAvailableException` is returned. You'll note -this is much less than the `retries` value, because `retries` only applies to retries against alive nodes. In this case, -only one node is known to be alive, so `NoNodesAvailableException` is returned. - - -There are several connection pool implementations that you can choose from: - -=== staticNoPingConnectionPool (default) - -This connection pool maintains a static list of hosts, which are assumed to be alive when the client initializes. If -a node fails a request, it is marked as `dead` for 60 seconds and the next node is tried. After 60 seconds, the node -is revived and put back into rotation. Each additional failed request will cause the dead timeout to increase exponentially. - -A successful request will reset the "failed ping timeout" counter. - -If you wish to explicitly set the `StaticNoPingConnectionPool` implementation, you may do so with the `setConnectionPool()` -method of the ClientBuilder object: - -[source,php] ----- -$client = ClientBuilder::create() - ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticNoPingConnectionPool', []) - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== staticConnectionPool - -Identical to the `StaticNoPingConnectionPool`, except it pings nodes before they are used to determine if they are alive. -This may be useful for long-running scripts, but tends to be additional overhead that is unnecessary for average PHP scripts. - -To use the `StaticConnectionPool`: - -[source,php] ----- -$client = ClientBuilder::create() - ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', []) - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== simpleConnectionPool - -The `SimpleConnectionPool` simply returns the next node as specified by the Selector; it does not perform track -the "liveness" of nodes. This pool will return nodes whether they are alive or dead. It is just a simple pool of static -hosts. - -The `SimpleConnectionPool` is not recommended for routine use, but it may be a useful debugging tool. - -To use the `SimpleConnectionPool`: - -[source,php] ----- -$client = ClientBuilder::create() - ->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool', []) - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== sniffingConnectionPool - -Unlike the two previous static connection pools, this one is dynamic. The user provides a seed list of hosts, which the -client uses to "sniff" and discover the rest of the cluster. It achieves this through the Cluster State API. As new -nodes are added or removed from the cluster, the client will update it's pool of active connections. - -To use the `SniffingConnectionPool`: - -[source,php] ----- -$client = ClientBuilder::create() - ->setConnectionPool('\Elasticsearch\ConnectionPool\SniffingConnectionPool', []) - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - - -=== Custom Connection Pool - -If you wish to implement your own custom Connection Pool, your class must implement `ConnectionPoolInterface`: - -[source,php] ----- -class MyCustomConnectionPool implements ConnectionPoolInterface -{ - - /** - * @param bool $force - * - * @return ConnectionInterface - */ - public function nextConnection($force = false) - { - // code here - } - - /** - * @return void - */ - public function scheduleCheck() - { - // code here - } -} ----- - -You can then instantiate an instance of your ConnectionPool and inject it into the ClientBuilder: - -[source,php] ----- -$myConnectionPool = new MyCustomConnectionPool(); - -$client = ClientBuilder::create() - ->setConnectionPool($myConnectionPool, []) - ->build(); ----- - -If your connection pool only makes minor changes, you may consider extending `AbstractConnectionPool`, which provides -some helper concrete methods. If you choose to go down this route, you need to make sure your ConnectionPool's implementation -has a compatible constructor (since it is not defined in the interface): - -[source,php] ----- -class MyCustomConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface -{ - - public function __construct($connections, SelectorInterface $selector, ConnectionFactory $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - } - - /** - * @param bool $force - * - * @return ConnectionInterface - */ - public function nextConnection($force = false) - { - // code here - } - - /** - * @return void - */ - public function scheduleCheck() - { - // code here - } -} ----- - -If your constructor matches AbstractConnectionPool, you may use either object injection or namespace instantiation: - -[source,php] ----- -$myConnectionPool = new MyCustomConnectionPool(); - -$client = ClientBuilder::create() - ->setConnectionPool($myConnectionPool, []) // object injection - ->setConnectionPool('/MyProject/ConnectionPools/MyCustomConnectionPool', []) // or namespace - ->build(); ----- - - -=== Which connection pool to choose? PHP and connection pooling - -At first glance, the `sniffingConnectionPool` implementation seems superior. For many languages, it is. In PHP, the -conversation is a bit more nuanced. - -Because PHP is a share-nothing architecture, there is no way to maintain a connection pool across script instances. -This means that every script is responsible for creating, maintaining, and destroying connections everytime the script -is re-run. - -Sniffing is a relatively lightweight operation (one API call to `/_cluster/state`, followed by pings to each node) but -it may be a non-negligible overhead for certain PHP applications. The average PHP script will likely load the client, -execute a few queries and then close. Imagine this script being called 1000 times per second: the sniffing connection -pool will perform the sniffing and pinging process 1000 times per second. The sniffing process will add a large -amount of overhead - -In reality, if your script only executes a few queries, the sniffing concept is _too_ robust. It tends to be more -useful in long-lived processes which potentially "out-live" a static list. - -For this reason the default connection pool is currently the `staticNoPingConnectionPool`. You can, of course, change -this default - but we strongly recommend you load test and verify that it does not negatively impact your performance. diff --git a/docs/selectors.asciidoc b/docs/selectors.asciidoc deleted file mode 100644 index f091c1f60..000000000 --- a/docs/selectors.asciidoc +++ /dev/null @@ -1,110 +0,0 @@ - -== Selectors - -The connection pool maintains the list of connections, and decides when nodes should transition from alive to dead (and -vice versa). It has no logic to choose connections, however. That job belongs to the Selector class. - -The selector's job is to return a single connection from a provided array of connections. Like the Connection Pool, -there are several implementations to choose from. - -=== RoundRobinSelector (Default) - -This selector returns connections in a round-robin fashion. Node #1 is selected on the first request, Node #2 on -the second request, etc. This ensures an even load of traffic across your cluster. Round-robin'ing happens on a -per-request basis (e.g. sequential requests go to different nodes). - -The `RoundRobinSelector` is default, but if you wish to explicitily configure it you can do: - -[source,php] ----- -$client = ClientBuilder::create() - ->setSelector('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== StickyRoundRobinSelector - -This selector is "sticky", in that it prefers to reuse the same connection repeatedly. For example, Node #1 is chosen -on the first request. Node #1 will continue to be re-used for each subsequent request until that node fails. Upon failure, -the selector will round-robin to the next available node, then "stick" to that node. - -This is an ideal strategy for many PHP scripts. Since PHP scripts are shared-nothing and tend to exit quickly, creating -new connections for each request is often a sub-optimal strategy and introduces a lot of overhead. Instead, it is -better to "stick" to a single connection for the duration of the script. - -By default, this selector will randomize the hosts upon initialization, which will still guarantee an even distribution -of load across the cluster. It changes the round-robin dynamics from per-request to per-script. - -If you are using <<_future_mode>>, the "sticky" behavior of this selector will be non-ideal, since all parallel requests -will go to the same node instead of multiple nodes in your cluster. When using future mode, the default `RoundRobinSelector` -should be preferred. - -If you wish to use this selector, you may do so with: - -[source,php] ----- -$client = ClientBuilder::create() - ->setSelector('\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector') - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== RandomSelector - -This selector simply returns a random node, regardless of state. It is generally just for testing. - -If you wish to use this selector, you may do so with: - -[source,php] ----- -$client = ClientBuilder::create() - ->setSelector('\Elasticsearch\ConnectionPool\Selectors\RandomSelector') - ->build(); ----- - -Note that the implementation is specified via a namespace path to the class. - -=== Custom Selector - -You can implement your own custom selector. Custom selectors must implement `SelectorInterface` - -[source,php] ----- -namespace MyProject\Selectors; - -use Elasticsearch\Connections\ConnectionInterface; -use Elasticsearch\ConnectionPool\Selectors\SelectorInterface - -class MyCustomSelector implements SelectorInterface -{ - - /** - * Selects the first connection - * - * @param array $connections Array of Connection objects - * - * @return ConnectionInterface - */ - public function select($connections) - { - // code here - } - -} ----- -{zwsp} + - -You can then use your custom selector either via object injection or namespace instantiation: - -[source,php] ----- -$mySelector = new MyCustomSelector(); - -$client = ClientBuilder::create() - ->setSelector($mySelector) // object injection - ->setSelector('\MyProject\Selectors\FirstSelector') // or namespace - ->build(); ----- diff --git a/src/Elasticsearch/Client.php b/src/Elasticsearch/Client.php index 31c0ad5cd..32e0f9e7d 100644 --- a/src/Elasticsearch/Client.php +++ b/src/Elasticsearch/Client.php @@ -1338,14 +1338,6 @@ private function verifyNotNullOrEmpty($name, $var) */ private function performRequest(AbstractEndpoint $endpoint) { - $promise = $this->transport->performRequest( - $endpoint->getMethod(), - $endpoint->getURI(), - $endpoint->getParams(), - $endpoint->getBody(), - $endpoint->getOptions() - ); - - return $this->transport->resultOrFuture($promise, $endpoint->getOptions()); + return $this->transport->performRequest($endpoint); } } diff --git a/src/Elasticsearch/ClientBuilder.php b/src/Elasticsearch/ClientBuilder.php index 0914e63c8..742eeefb0 100644 --- a/src/Elasticsearch/ClientBuilder.php +++ b/src/Elasticsearch/ClientBuilder.php @@ -4,24 +4,25 @@ use Elasticsearch\Common\Exceptions\InvalidArgumentException; use Elasticsearch\Common\Exceptions\RuntimeException; -use Elasticsearch\ConnectionPool\AbstractConnectionPool; -use Elasticsearch\ConnectionPool\Selectors\SelectorInterface; -use Elasticsearch\ConnectionPool\StaticNoPingConnectionPool; -use Elasticsearch\Connections\Connection; -use Elasticsearch\Connections\ConnectionFactory; -use Elasticsearch\Connections\ConnectionFactoryInterface; use Elasticsearch\Namespaces\NamespaceBuilderInterface; -use Elasticsearch\Serializers\SerializerInterface; -use Elasticsearch\ConnectionPool\Selectors; +use Elasticsearch\Plugin\ErrorPlugin; use Elasticsearch\Serializers\SmartSerializer; -use GuzzleHttp\Ring\Client\CurlHandler; -use GuzzleHttp\Ring\Client\CurlMultiHandler; -use GuzzleHttp\Ring\Client\Middleware; -use Psr\Log\LoggerInterface; +use Http\Client\Common\EmulatedHttpAsyncClient; +use Http\Client\Common\HttpClientPool; +use Http\Client\Common\HttpClientPool\RandomClientPool; +use Http\Client\Common\HttpClientPoolItem; +use Http\Client\Common\Plugin\AddHostPlugin; +use Http\Client\Common\Plugin\LoggerPlugin; +use Http\Client\Common\Plugin\RetryPlugin; +use Http\Client\Common\PluginClient; +use Http\Client\HttpAsyncClient; +use Http\Discovery\Exception\NotFoundException; +use Http\Discovery\HttpAsyncClientDiscovery; +use Http\Discovery\HttpClientDiscovery; +use Http\Discovery\MessageFactoryDiscovery; +use Http\Discovery\UriFactoryDiscovery; +use Http\Message\UriFactory; use Psr\Log\NullLogger; -use Monolog\Logger; -use Monolog\Handler\StreamHandler; -use Monolog\Processor\IntrospectionProcessor; /** * Class ClientBuilder @@ -34,57 +35,28 @@ */ class ClientBuilder { - /** @var Transport */ - private $transport; - /** @var callback */ - private $endpoint; - - /** @var NamespaceBuilderInterface[] */ - private $registeredNamespacesBuilders = []; - - /** @var ConnectionFactoryInterface */ - private $connectionFactory; - - private $handler; - - /** @var LoggerInterface */ - private $logger; - - /** @var LoggerInterface */ - private $tracer; - - /** @var string */ - private $connectionPool = '\Elasticsearch\ConnectionPool\StaticNoPingConnectionPool'; - - /** @var string */ - private $serializer = '\Elasticsearch\Serializers\SmartSerializer'; - - /** @var string */ - private $selector = '\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector'; + private $retries; - /** @var array */ - private $connectionPoolArgs = [ - 'randomizeHosts' => true - ]; + private $httpAsyncClient; - /** @var array */ private $hosts; - /** @var int */ - private $retries; + private $serializer; - /** @var bool */ - private $sniffOnStart = false; + private $logger; - /** @var null|array */ - private $sslCert = null; + private $endpoint; + + private $clientPoolStrategy = RandomClientPool::class; - /** @var null|array */ - private $sslKey = null; + private $registeredNamespacesBuilders = []; - /** @var null|bool|string */ - private $sslVerification = null; + public function __construct() + { + $this->serializer = new SmartSerializer(); + $this->hosts = ['localhost:9200']; + } /** * @return ClientBuilder @@ -113,6 +85,7 @@ public static function create() public static function fromConfig($config, $quiet = false) { $builder = new self; + foreach ($config as $key => $value) { $method = "set$key"; if (method_exists($builder, $method)) { @@ -123,108 +96,43 @@ public static function fromConfig($config, $quiet = false) if ($quiet === false && count($config) > 0) { $unknown = implode(array_keys($config)); + throw new RuntimeException("Unknown parameters provided: $unknown"); } + return $builder->build(); } - /** - * @param array $singleParams - * @param array $multiParams - * @throws \RuntimeException - * @return callable - */ - public static function defaultHandler($multiParams = [], $singleParams = []) - { - $future = null; - if (extension_loaded('curl')) { - $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); - if (function_exists('curl_reset')) { - $default = new CurlHandler($singleParams); - $future = new CurlMultiHandler($config); - } else { - $default = new CurlMultiHandler($config); - } - } else { - throw new \RuntimeException('Elasticsearch-PHP requires cURL, or a custom HTTP handler.'); - } - - return $future ? Middleware::wrapFuture($default, $future) : $default; - } /** - * @param array $params - * @throws \RuntimeException - * @return CurlMultiHandler + * Set the client pool strategy for selecting elasticsearch server + * + * @param string $clientPoolStrategy + * + * @throws \InvalidArgumentException + * + * @return $this */ - public static function multiHandler($params = []) + public function setClientPoolStrategy($clientPoolStrategy) { - if (function_exists('curl_multi_init')) { - return new CurlMultiHandler(array_merge([ 'mh' => curl_multi_init() ], $params)); - } else { - throw new \RuntimeException('CurlMulti handler requires cURL.'); + if (!is_string($clientPoolStrategy)) { + throw new \InvalidArgumentException("Client pool must be a class path extending HttpClientPool"); } - } - /** - * @return CurlHandler - * @throws \RuntimeException - */ - public static function singleHandler() - { - if (function_exists('curl_reset')) { - return new CurlHandler(); - } else { - throw new \RuntimeException('CurlSingle handler requires cURL.'); + if (!is_subclass_of($clientPoolStrategy, HttpClientPool::class)) { + throw new \InvalidArgumentException("Client pool must be a class path extending HttpClientPool"); } - } - /** - * @param $path string - * @return \Monolog\Logger\Logger - */ - public static function defaultLogger($path, $level = Logger::WARNING) - { - $log = new Logger('log'); - $handler = new StreamHandler($path, $level); - $log->pushHandler($handler); - - return $log; - } - - /** - * @param \Elasticsearch\Connections\ConnectionFactoryInterface $connectionFactory - * @return $this - */ - public function setConnectionFactory(ConnectionFactoryInterface $connectionFactory) - { - $this->connectionFactory = $connectionFactory; - - return $this; - } - - /** - * @param \Elasticsearch\ConnectionPool\AbstractConnectionPool|string $connectionPool - * @param array $args - * @throws \InvalidArgumentException - * @return $this - */ - public function setConnectionPool($connectionPool, array $args = []) - { - if (is_string($connectionPool)) { - $this->connectionPool = $connectionPool; - $this->connectionPoolArgs = $args; - } elseif (is_object($connectionPool)) { - $this->connectionPool = $connectionPool; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object extending AbstractConnectionPool"); - } + $this->clientPoolStrategy = $clientPoolStrategy; return $this; } /** + * Set the strategy to get an endpoint object + * * @param callable $endpoint + * * @return $this */ public function setEndpoint($endpoint) @@ -235,7 +143,10 @@ public function setEndpoint($endpoint) } /** + * Register a new namespace to Elasticsearch + * * @param NamespaceBuilderInterface $namespaceBuilder + * * @return $this */ public function registerNamespace(NamespaceBuilderInterface $namespaceBuilder) @@ -246,29 +157,10 @@ public function registerNamespace(NamespaceBuilderInterface $namespaceBuilder) } /** - * @param \Elasticsearch\Transport $transport - * @return $this - */ - public function setTransport($transport) - { - $this->transport = $transport; - - return $this; - } - - /** - * @param mixed $handler - * @return $this - */ - public function setHandler($handler) - { - $this->handler = $handler; - - return $this; - } - - /** + * Add a logger to have information about requests and responses + * * @param \Psr\Log\LoggerInterface $logger + * * @return $this */ public function setLogger($logger) @@ -279,24 +171,23 @@ public function setLogger($logger) } /** - * @param \Psr\Log\LoggerInterface $tracer - * @return $this - */ - public function setTracer($tracer) - { - $this->tracer = $tracer; - - return $this; - } - - /** + * Set the serializer to use for requests and responses + * * @param \Elasticsearch\Serializers\SerializerInterface|string $serializer + * * @throws \InvalidArgumentException + * * @return $this */ public function setSerializer($serializer) { - $this->parseStringOrObject($serializer, $this->serializer, 'SerializerInterface'); + if (is_string($serializer)) { + $this->serializer = new $serializer; + } elseif (is_object($serializer)) { + $this->serializer = $serializer; + } else { + throw new InvalidArgumentException('Serializer must be a class path or instantiated object implementing SerializerInterface'); + } return $this; } @@ -324,59 +215,13 @@ public function setRetries($retries) } /** - * @param \Elasticsearch\ConnectionPool\Selectors\SelectorInterface|string $selector - * @throws \InvalidArgumentException - * @return $this - */ - public function setSelector($selector) - { - $this->parseStringOrObject($selector, $this->selector, 'SelectorInterface'); - - return $this; - } - - /** - * @param boolean $sniffOnStart - * @return $this - */ - public function setSniffOnStart($sniffOnStart) - { - $this->sniffOnStart = $sniffOnStart; - - return $this; - } - - /** - * @param $cert - * @param null|string $password - * @return $this - */ - public function setSSLCert($cert, $password = null) - { - $this->sslCert = [$cert, $password]; - - return $this; - } - - /** - * @param $key - * @param null|string $password - * @return $this - */ - public function setSSLKey($key, $password = null) - { - $this->sslKey = [$key, $password]; - - return $this; - } - - /** - * @param bool|string $value + * @param HttpAsyncClient $httpAsyncClient + * * @return $this */ - public function setSSLVerification($value = true) + public function setHttpAsyncClient(HttpAsyncClient $httpAsyncClient) { - $this->sslVerification = $value; + $this->httpAsyncClient = $httpAsyncClient; return $this; } @@ -386,62 +231,23 @@ public function setSSLVerification($value = true) */ public function build() { - $this->buildLoggers(); - - if (is_null($this->handler)) { - $this->handler = ClientBuilder::defaultHandler(); - } - - $sslOptions = null; - if (isset($this->sslKey)) { - $sslOptions['ssl_key'] = $this->sslKey; - } - if (isset($this->sslCert)) { - $sslOptions['cert'] = $this->sslCert; - } - if (isset($this->sslVerification)) { - $sslOptions['verify'] = $this->sslVerification; - } - - if (!is_null($sslOptions)) { - $sslHandler = function (callable $handler, array $sslOptions) { - return function (array $request) use ($handler, $sslOptions) { - // Add our custom headers - foreach ($sslOptions as $key => $value) { - $request['client'][$key] = $value; - } - - // Send the request using the handler and return the response. - return $handler($request); - }; - }; - $this->handler = $sslHandler($this->handler, $sslOptions); + if (null === $this->httpAsyncClient) { + try { + $this->httpAsyncClient = HttpAsyncClientDiscovery::find(); + } catch (NotFoundException $exception) { + $this->httpAsyncClient = new EmulatedHttpAsyncClient(HttpClientDiscovery::find()); + } } - if (is_null($this->serializer)) { - $this->serializer = new SmartSerializer(); - } elseif (is_string($this->serializer)) { + if (is_string($this->serializer)) { $this->serializer = new $this->serializer; } + + $finalClient = $this->createHttpClient(); + $requestBuilder = new MessageBuilder(MessageFactoryDiscovery::find(), $this->serializer); + $transport = new Transport($finalClient, $requestBuilder); - if (is_null($this->connectionFactory)) { - $connectionParams = []; - $this->connectionFactory = new ConnectionFactory($this->handler, $connectionParams, $this->serializer, $this->logger, $this->tracer); - } - - if (is_null($this->hosts)) { - $this->hosts = $this->getDefaultHost(); - } - - if (is_null($this->selector)) { - $this->selector = new Selectors\RoundRobinSelector(); - } elseif (is_string($this->selector)) { - $this->selector = new $this->selector; - } - - $this->buildTransport(); - - if (is_null($this->endpoint)) { + if (null === $this->endpoint) { $serializer = $this->serializer; $this->endpoint = function ($class) use ($serializer) { @@ -455,136 +261,44 @@ public function build() } $registeredNamespaces = []; + foreach ($this->registeredNamespacesBuilders as $builder) { /** @var $builder NamespaceBuilderInterface */ - $registeredNamespaces[$builder->getName()] = $builder->getObject($this->transport, $this->serializer); - } - - return $this->instantiate($this->transport, $this->endpoint, $registeredNamespaces); - } - - /** - * @param Transport $transport - * @param callable $endpoint - * @param Object[] $registeredNamespaces - * @return Client - */ - protected function instantiate(Transport $transport, callable $endpoint, array $registeredNamespaces) - { - return new Client($transport, $endpoint, $registeredNamespaces); - } - - private function buildLoggers() - { - if (is_null($this->logger)) { - $this->logger = new NullLogger(); - } - - if (is_null($this->tracer)) { - $this->tracer = new NullLogger(); - } - } - - private function buildTransport() - { - $connections = $this->buildConnectionsFromHosts($this->hosts); - - if (is_string($this->connectionPool)) { - $this->connectionPool = new $this->connectionPool( - $connections, - $this->selector, - $this->connectionFactory, - $this->connectionPoolArgs); - } elseif (is_null($this->connectionPool)) { - $this->connectionPool = new StaticNoPingConnectionPool( - $connections, - $this->selector, - $this->connectionFactory, - $this->connectionPoolArgs); - } - - if (is_null($this->retries)) { - $this->retries = count($connections); - } - - if (is_null($this->transport)) { - $this->transport = new Transport($this->retries, $this->sniffOnStart, $this->connectionPool, $this->logger); + $registeredNamespaces[$builder->getName()] = $builder->getObject($transport, $this->serializer); } - } - private function parseStringOrObject($arg, &$destination, $interface) - { - if (is_string($arg)) { - $destination = new $arg; - } elseif (is_object($arg)) { - $destination = $arg; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object implementing $interface"); - } + return new Client($transport, $this->endpoint, $registeredNamespaces); } /** - * @return array + * @return HttpAsyncClient */ - private function getDefaultHost() + private function createHttpClient() { - return ['localhost:9200']; - } - - /** - * @param array $hosts - * - * @throws \InvalidArgumentException - * @return \Elasticsearch\Connections\Connection[] - */ - private function buildConnectionsFromHosts($hosts) - { - if (is_array($hosts) === false) { - throw new InvalidArgumentException('Hosts parameter must be an array of strings'); - } - - $connections = []; - foreach ($hosts as $host) { - $host = $this->prependMissingScheme($host); - $host = $this->extractURIParts($host); - $connections[] = $this->connectionFactory->create($host); + /** @var HttpClientPool $pool */ + $pool = new $this->clientPoolStrategy; + $uriFactory = UriFactoryDiscovery::find(); + $retries = null === $this->retries ? count($this->hosts) : $this->retries; + $logger = null === $this->logger ? new NullLogger() : $this->logger; + + foreach ($this->hosts as $host) { + $client = new PluginClient($this->httpAsyncClient, [ + new AddHostPlugin($uriFactory->createUri($host, [ + 'replace' => true + ])), + ]); + + $pool->addHttpClient(new HttpClientPoolItem($client, 0)); } - - return $connections; - } - - /** - * @param array $host - * - * @throws \InvalidArgumentException - * @return array - */ - private function extractURIParts($host) - { - $parts = parse_url($host); - - if ($parts === false) { - throw new InvalidArgumentException("Could not parse URI"); - } - - if (isset($parts['port']) !== true) { - $parts['port'] = 9200; - } - - return $parts; - } - - /** - * @param string $host - * - * @return string - */ - private function prependMissingScheme($host) - { - if (!filter_var($host, FILTER_VALIDATE_URL, FILTER_FLAG_SCHEME_REQUIRED)) { - $host = 'http://' . $host; - } - - return $host; + + return new PluginClient($pool, [ + new ErrorPlugin( + $this->serializer + ), + new RetryPlugin([ + 'retries' => $retries + ]), + new LoggerPlugin($logger), + ]); } } diff --git a/src/Elasticsearch/Common/Exceptions/ClientErrorResponseException.php b/src/Elasticsearch/Common/Exceptions/ClientErrorResponseException.php deleted file mode 100644 index 844bbccaa..000000000 --- a/src/Elasticsearch/Common/Exceptions/ClientErrorResponseException.php +++ /dev/null @@ -1,16 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class ClientErrorResponseException extends TransportException implements ElasticsearchException -{ -} diff --git a/src/Elasticsearch/Common/Exceptions/AlreadyExpiredException.php b/src/Elasticsearch/Common/Exceptions/Http/AlreadyExpiredException.php similarity index 66% rename from src/Elasticsearch/Common/Exceptions/AlreadyExpiredException.php rename to src/Elasticsearch/Common/Exceptions/Http/AlreadyExpiredException.php index 411c70abb..ccc0accdb 100644 --- a/src/Elasticsearch/Common/Exceptions/AlreadyExpiredException.php +++ b/src/Elasticsearch/Common/Exceptions/Http/AlreadyExpiredException.php @@ -1,6 +1,8 @@ - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class MaxRetriesException extends TransportException implements ElasticsearchException -{ -} diff --git a/src/Elasticsearch/Common/Exceptions/NoNodesAvailableException.php b/src/Elasticsearch/Common/Exceptions/NoNodesAvailableException.php deleted file mode 100644 index 63a179327..000000000 --- a/src/Elasticsearch/Common/Exceptions/NoNodesAvailableException.php +++ /dev/null @@ -1,16 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class NoNodesAvailableException extends \Exception implements ElasticsearchException -{ -} diff --git a/src/Elasticsearch/ConnectionPool/AbstractConnectionPool.php b/src/Elasticsearch/ConnectionPool/AbstractConnectionPool.php deleted file mode 100644 index e191a4037..000000000 --- a/src/Elasticsearch/ConnectionPool/AbstractConnectionPool.php +++ /dev/null @@ -1,82 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -abstract class AbstractConnectionPool implements ConnectionPoolInterface -{ - /** - * Array of connections - * - * @var ConnectionInterface[] - */ - protected $connections; - - /** - * Array of initial seed connections - * - * @var ConnectionInterface[] - */ - protected $seedConnections; - - /** - * Selector object, used to select a connection on each request - * - * @var SelectorInterface - */ - protected $selector; - - /** @var \Elasticsearch\Connections\ConnectionFactory */ - protected $connectionFactory; - - /** - * Constructor - * - * @param ConnectionInterface[] $connections The Connections to choose from - * @param SelectorInterface $selector A Selector instance to perform the selection logic for the available connections - * @param ConnectionFactory $factory ConnectionFactory instance - * @param array $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactory $factory, $connectionPoolParams) - { - $paramList = array('connections', 'selector', 'connectionPoolParams'); - foreach ($paramList as $param) { - if (isset($$param) === false) { - throw new InvalidArgumentException('`' . $param . '` parameter must not be null'); - } - } - - if (isset($connectionPoolParams['randomizeHosts']) === true - && $connectionPoolParams['randomizeHosts'] === true) { - shuffle($connections); - } - - $this->connections = $connections; - $this->seedConnections = $connections; - $this->selector = $selector; - $this->connectionPoolParams = $connectionPoolParams; - $this->connectionFactory = $factory; - } - - /** - * @param bool $force - * - * @return Connection - */ - abstract public function nextConnection($force = false); - - abstract public function scheduleCheck(); -} diff --git a/src/Elasticsearch/ConnectionPool/ConnectionPoolInterface.php b/src/Elasticsearch/ConnectionPool/ConnectionPoolInterface.php deleted file mode 100644 index d10fc3542..000000000 --- a/src/Elasticsearch/ConnectionPool/ConnectionPoolInterface.php +++ /dev/null @@ -1,29 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -interface ConnectionPoolInterface -{ - /** - * @param bool $force - * - * @return ConnectionInterface - */ - public function nextConnection($force = false); - - /** - * @return void - */ - public function scheduleCheck(); -} diff --git a/src/Elasticsearch/ConnectionPool/Selectors/RandomSelector.php b/src/Elasticsearch/ConnectionPool/Selectors/RandomSelector.php deleted file mode 100644 index b544292e3..000000000 --- a/src/Elasticsearch/ConnectionPool/Selectors/RandomSelector.php +++ /dev/null @@ -1,29 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class RandomSelector implements SelectorInterface -{ - /** - * Select a random connection from the provided array - * - * @param ConnectionInterface[] $connections an array of ConnectionInterface instances to choose from - * - * @return \Elasticsearch\Connections\ConnectionInterface - */ - public function select($connections) - { - return $connections[array_rand($connections)]; - } -} diff --git a/src/Elasticsearch/ConnectionPool/Selectors/RoundRobinSelector.php b/src/Elasticsearch/ConnectionPool/Selectors/RoundRobinSelector.php deleted file mode 100644 index e8b978454..000000000 --- a/src/Elasticsearch/ConnectionPool/Selectors/RoundRobinSelector.php +++ /dev/null @@ -1,36 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class RoundRobinSelector implements SelectorInterface -{ - /** - * @var int - */ - private $current = 0; - - /** - * Select the next connection in the sequence - * - * @param ConnectionInterface[] $connections an array of ConnectionInterface instances to choose from - * - * @return \Elasticsearch\Connections\ConnectionInterface - */ - public function select($connections) - { - $this->current += 1; - - return $connections[$this->current % count($connections)]; - } -} diff --git a/src/Elasticsearch/ConnectionPool/Selectors/SelectorInterface.php b/src/Elasticsearch/ConnectionPool/Selectors/SelectorInterface.php deleted file mode 100644 index 72dfd195b..000000000 --- a/src/Elasticsearch/ConnectionPool/Selectors/SelectorInterface.php +++ /dev/null @@ -1,24 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -interface SelectorInterface -{ - /** - * Perform logic to select a single ConnectionInterface instance from the array provided - * - * @param ConnectionInterface[] $connections an array of ConnectionInterface instances to choose from - * - * @return \Elasticsearch\Connections\ConnectionInterface - */ - public function select($connections); -} diff --git a/src/Elasticsearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php b/src/Elasticsearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php deleted file mode 100644 index f44a68ca4..000000000 --- a/src/Elasticsearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php +++ /dev/null @@ -1,47 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class StickyRoundRobinSelector implements SelectorInterface -{ - /** - * @var int - */ - private $current = 0; - - /** - * @var int - */ - private $currentCounter = 0; - - /** - * Use current connection unless it is dead, otherwise round-robin - * - * @param ConnectionInterface[] $connections Array of connections to choose from - * - * @return ConnectionInterface - */ - public function select($connections) - { - /** @var ConnectionInterface[] $connections */ - if ($connections[$this->current]->isAlive()) { - return $connections[$this->current]; - } - - $this->currentCounter += 1; - $this->current = $this->currentCounter % count($connections); - - return $connections[$this->current]; - } -} diff --git a/src/Elasticsearch/ConnectionPool/SimpleConnectionPool.php b/src/Elasticsearch/ConnectionPool/SimpleConnectionPool.php deleted file mode 100644 index 5b6da46ca..000000000 --- a/src/Elasticsearch/ConnectionPool/SimpleConnectionPool.php +++ /dev/null @@ -1,35 +0,0 @@ -selector->select($this->connections); - } - - public function scheduleCheck() - { - } -} diff --git a/src/Elasticsearch/ConnectionPool/SniffingConnectionPool.php b/src/Elasticsearch/ConnectionPool/SniffingConnectionPool.php deleted file mode 100644 index 4c5dffebc..000000000 --- a/src/Elasticsearch/ConnectionPool/SniffingConnectionPool.php +++ /dev/null @@ -1,152 +0,0 @@ -setConnectionPoolParams($connectionPoolParams); - $this->nextSniff = time() + $this->sniffingInterval; - } - - /** - * @param bool $force - * - * @return Connection - * @throws \Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function nextConnection($force = false) - { - $this->sniff($force); - - $size = count($this->connections); - while ($size--) { - /** @var Connection $connection */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true || $connection->ping() === true) { - return $connection; - } - } - - if ($force === true) { - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - return $this->nextConnection(true); - } - - public function scheduleCheck() - { - $this->nextSniff = -1; - } - - /** - * @param bool $force - */ - private function sniff($force = false) - { - if ($force === false && $this->nextSniff >= time()) { - return; - } - - $total = count($this->connections); - - while ($total--) { - /** @var Connection $connection */ - $connection = $this->selector->select($this->connections); - - if ($connection->isAlive() xor $force) { - continue; - } - - if ($this->sniffConnection($connection) === true) { - return; - } - } - - if ($force === true) { - return; - } - - foreach ($this->seedConnections as $connection) { - if ($this->sniffConnection($connection) === true) { - return; - } - } - } - - /** - * @param Connection $connection - * @return bool - */ - private function sniffConnection(Connection $connection) - { - try { - $response = $connection->sniff(); - } catch (OperationTimeoutException $exception) { - return false; - } - - $nodes = $this->parseClusterState($connection->getTransportSchema(), $response); - - if (count($nodes) === 0) { - return false; - } - - $this->connections = array(); - - foreach ($nodes as $node) { - $nodeDetails = array( - 'host' => $node['host'], - 'port' => $node['port'] - ); - $this->connections[] = $this->connectionFactory->create($nodeDetails); - } - - $this->nextSniff = time() + $this->sniffingInterval; - - return true; - } - - private function parseClusterState($transportSchema, $nodeInfo) - { - $pattern = '/\/([^:]*):([0-9]+)\]/'; - $schemaAddress = $transportSchema . '_address'; - $hosts = array(); - - foreach ($nodeInfo['nodes'] as $node) { - if (isset($node[$schemaAddress]) === true) { - if (preg_match($pattern, $node[$schemaAddress], $match) === 1) { - $hosts[] = array( - 'host' => $match[1], - 'port' => (int) $match[2], - ); - } - } - } - - return $hosts; - } - - private function setConnectionPoolParams($connectionPoolParams) - { - if (isset($connectionPoolParams['sniffingInterval']) === true) { - $this->sniffingInterval = $connectionPoolParams['sniffingInterval']; - } - } -} diff --git a/src/Elasticsearch/ConnectionPool/StaticConnectionPool.php b/src/Elasticsearch/ConnectionPool/StaticConnectionPool.php deleted file mode 100644 index 8231e6b85..000000000 --- a/src/Elasticsearch/ConnectionPool/StaticConnectionPool.php +++ /dev/null @@ -1,93 +0,0 @@ -scheduleCheck(); - } - - /** - * @param bool $force - * - * @return Connection - * @throws \Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function nextConnection($force = false) - { - $skipped = array(); - - $total = count($this->connections); - while ($total--) { - /** @var Connection $connection */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - if ($connection->ping() === true) { - return $connection; - } - } else { - $skipped[] = $connection; - } - } - - // All "alive" nodes failed, force pings on "dead" nodes - foreach ($skipped as $connection) { - if ($connection->ping() === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck() - { - foreach ($this->connections as $connection) { - $connection->markDead(); - } - } - - /** - * @param Connection $connection - * - * @return bool - */ - private function readyToRevive(Connection $connection) - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/Elasticsearch/ConnectionPool/StaticNoPingConnectionPool.php b/src/Elasticsearch/ConnectionPool/StaticNoPingConnectionPool.php deleted file mode 100644 index 4426d1588..000000000 --- a/src/Elasticsearch/ConnectionPool/StaticNoPingConnectionPool.php +++ /dev/null @@ -1,76 +0,0 @@ -connections); - while ($total--) { - /** @var Connection $connection */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck() - { - } - - /** - * @param \Elasticsearch\Connections\Connection $connection - * - * @return bool - */ - private function readyToRevive(Connection $connection) - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/Elasticsearch/Connections/Connection.php b/src/Elasticsearch/Connections/Connection.php deleted file mode 100644 index 6f2e6488d..000000000 --- a/src/Elasticsearch/Connections/Connection.php +++ /dev/null @@ -1,683 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class Connection implements ConnectionInterface -{ - /** @var callable */ - protected $handler; - - /** @var SerializerInterface */ - protected $serializer; - - /** - * @var string - */ - protected $transportSchema = 'http'; // TODO depreciate this default - - /** - * @var string - */ - protected $host; - - /** - * @var string || null - */ - protected $path; - - /** - * @var LoggerInterface - */ - protected $log; - - /** - * @var LoggerInterface - */ - protected $trace; - - /** - * @var array - */ - protected $connectionParams; - - /** @var bool */ - protected $isAlive = false; - - /** @var float */ - private $pingTimeout = 1; //TODO expose this - - /** @var int */ - private $lastPing = 0; - - /** @var int */ - private $failedPings = 0; - - private $lastRequest = array(); - - /** - * Constructor - * - * @param $handler - * @param array $hostDetails - * @param array $connectionParams Array of connection-specific parameters - * @param \Elasticsearch\Serializers\SerializerInterface $serializer - * @param \Psr\Log\LoggerInterface $log Logger object - * @param \Psr\Log\LoggerInterface $trace - */ - public function __construct($handler, $hostDetails, $connectionParams, - SerializerInterface $serializer, LoggerInterface $log, LoggerInterface $trace) - { - if (isset($hostDetails['port']) !== true) { - $hostDetails['port'] = 9200; - } - - if (isset($hostDetails['scheme'])) { - $this->transportSchema = $hostDetails['scheme']; - } - - if (isset($hostDetails['user']) && isset($hostDetails['pass'])) { - $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; - $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass']; - } - - $host = $hostDetails['host'].':'.$hostDetails['port']; - $path = null; - if (isset($hostDetails['path']) === true) { - $path = $hostDetails['path']; - } - $this->host = $host; - $this->path = $path; - $this->log = $log; - $this->trace = $trace; - $this->connectionParams = $connectionParams; - $this->serializer = $serializer; - - $this->handler = $this->wrapHandler($handler, $log, $trace); - } - - /** - * @param $method - * @param $uri - * @param null $params - * @param null $body - * @param array $options - * @param \Elasticsearch\Transport $transport - * @return mixed - */ - public function performRequest($method, $uri, $params = null, $body = null, $options = [], Transport $transport = null) - { - if (isset($body) === true) { - $body = $this->serializer->serialize($body); - } - - $request = [ - 'http_method' => $method, - 'scheme' => $this->transportSchema, - 'uri' => $this->getURI($uri, $params), - 'body' => $body, - 'headers' => [ - 'host' => [$this->host] - ] - - ]; - $request = array_merge_recursive($request, $this->connectionParams, $options); - - - $handler = $this->handler; - $future = $handler($request, $this, $transport, $options); - - return $future; - } - - /** @return string */ - public function getTransportSchema() - { - return $this->transportSchema; - } - - /** @return array */ - public function getLastRequestInfo() - { - return $this->lastRequest; - } - - private function wrapHandler(callable $handler, LoggerInterface $logger, LoggerInterface $tracer) - { - return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler, $logger, $tracer) { - - $this->lastRequest = []; - $this->lastRequest['request'] = $request; - - // Send the request using the wrapped handler. - $response = Core::proxy($handler($request), function ($response) use ($connection, $transport, $logger, $tracer, $request, $options) { - - $this->lastRequest['response'] = $response; - - if (isset($response['error']) === true) { - if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { - $this->log->warning("Curl exception encountered."); - - $exception = $this->getCurlRetryException($request, $response); - - $this->logRequestFail( - $request['http_method'], - $response['effective_url'], - $request['body'], - $request['headers'], - $response['status'], - $response['body'], - $response['transfer_stats']['total_time'], - $exception - ); - - $node = $connection->getHost(); - $this->log->warning("Marking node $node dead."); - $connection->markDead(); - - // If the transport has not been set, we are inside a Ping or Sniff, - // so we don't want to retrigger retries anyway. - // - // TODO this could be handled better, but we are limited because connectionpools do not - // have access to Transport. Architecturally, all of this needs to be refactored - if (isset($transport) === true) { - $transport->connectionPool->scheduleCheck(); - - $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; - $shouldRetry = $transport->shouldRetry($request); - $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; - - $this->log->warning("Retries left? $shouldRetryText"); - if ($shouldRetry && !$neverRetry) { - return $transport->performRequest( - $request['http_method'], - $request['uri'], - [], - $request['body'], - $options - ); - } - } - - $this->log->warning("Out of retries, throwing exception from $node"); - // Only throw if we run out of retries - throw $exception; - } else { - // Something went seriously wrong, bail - $exception = new TransportException($response['error']->getMessage()); - $this->logRequestFail( - $request['http_method'], - $response['effective_url'], - $request['body'], - $request['headers'], - $response['status'], - $response['body'], - $response['transfer_stats']['total_time'], - $exception - ); - throw $exception; - } - } else { - $connection->markAlive(); - - if (isset($response['body']) === true) { - $response['body'] = stream_get_contents($response['body']); - $this->lastRequest['response']['body'] = $response['body']; - } - - if ($response['status'] >= 400 && $response['status'] < 500) { - $ignore = isset($request['client']['ignore']) ? $request['client']['ignore'] : []; - $this->process4xxError($request, $response, $ignore); - } elseif ($response['status'] >= 500) { - $ignore = isset($request['client']['ignore']) ? $request['client']['ignore'] : []; - $this->process5xxError($request, $response, $ignore); - } - - // No error, deserialize - $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - } - $this->logRequestSuccess( - $request['http_method'], - $response['effective_url'], - $request['body'], - $request['headers'], - $response['status'], - $response['body'], - $response['transfer_stats']['total_time'] - ); - - return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; - - }); - - return $response; - }; - } - - /** - * @param string $uri - * @param array $params - * - * @return string - */ - private function getURI($uri, $params) - { - if (isset($params) === true && !empty($params)) { - array_walk($params, function (&$value, &$key) { - if ($value === true) { - $value = 'true'; - } else if ($value === false) { - $value = 'false'; - } - }); - - $uri .= '?' . http_build_query($params); - } - - if ($this->path !== null) { - $uri = $this->path . $uri; - } - - return $uri; - } - - /** - * Log a successful request - * - * @param string $method - * @param string $fullURI - * @param string $body - * @param array $headers - * @param string $statusCode - * @param string $response - * @param string $duration - * - * @return void - */ - public function logRequestSuccess($method, $fullURI, $body, $headers, $statusCode, $response, $duration) - { - $this->log->debug('Request Body', array($body)); - $this->log->info( - 'Request Success:', - array( - 'method' => $method, - 'uri' => $fullURI, - 'headers' => $headers, - 'HTTP code' => $statusCode, - 'duration' => $duration, - ) - ); - $this->log->debug('Response', array($response)); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($method, $fullURI, $body); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response, - 'method' => $method, - 'uri' => $fullURI, - 'HTTP code' => $statusCode, - 'duration' => $duration, - ) - ); - } - - /** - * Log a a failed request - * - * @param string $method - * @param string $fullURI - * @param string $body - * @param array $headers - * @param null|string $statusCode - * @param null|string $response - * @param string $duration - * @param \Exception|null $exception - * - * @return void - */ - public function logRequestFail($method, $fullURI, $body, $headers, $statusCode, $response, $duration, \Exception $exception) - { - $this->log->debug('Request Body', array($body)); - $this->log->warning( - 'Request Failure:', - array( - 'method' => $method, - 'uri' => $fullURI, - 'headers' => $headers, - 'HTTP code' => $statusCode, - 'duration' => $duration, - 'error' => $exception->getMessage(), - ) - ); - $this->log->warning('Response', array($response)); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($method, $fullURI, $body); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response, - 'method' => $method, - 'uri' => $fullURI, - 'HTTP code' => $statusCode, - 'duration' => $duration, - ) - ); - } - - /** - * @return bool - */ - public function ping() - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true, - 'verbose' => true - ] - ]; - try { - $response = $this->performRequest('HEAD', '/', null, null, $options); - $response = $response->wait(); - } catch (TransportException $exception) { - $this->markDead(); - - return false; - } - - if ($response['status'] === 200) { - $this->markAlive(); - - return true; - } else { - $this->markDead(); - - return false; - } - } - - /** - * @return array - */ - public function sniff() - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true - ] - ]; - - return $this->performRequest('GET', '/_nodes/_all/clear', null, null, $options); - } - - /** - * @return bool - */ - public function isAlive() - { - return $this->isAlive; - } - - public function markAlive() - { - $this->failedPings = 0; - $this->isAlive = true; - $this->lastPing = time(); - } - - public function markDead() - { - $this->isAlive = false; - $this->failedPings += 1; - $this->lastPing = time(); - } - - /** - * @return int - */ - public function getLastPing() - { - return $this->lastPing; - } - - /** - * @return int - */ - public function getPingFailures() - { - return $this->failedPings; - } - - /** - * @return string - */ - public function getHost() - { - return $this->host; - } - - /** - * @param $request - * @param $response - * @return \Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost|\Elasticsearch\Common\Exceptions\Curl\CouldNotResolveHostException|\Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException|\Elasticsearch\Common\Exceptions\MaxRetriesException - */ - protected function getCurlRetryException($request, $response) - { - $exception = null; - $message = $response['error']->getMessage(); - $exception = new MaxRetriesException($message); - switch ($response['curl']['errno']) { - case 6: - $exception = new CouldNotResolveHostException($message, null, $exception); - break; - case 7: - $exception = new CouldNotConnectToHost($message, null, $exception); - break; - case 28: - $exception = new OperationTimeoutException($message, null, $exception); - break; - } - - return $exception; - } - - /** - * Construct a string cURL command - * - * @param string $method HTTP method - * @param string $uri Full URI of request - * @param string $body Request body - * - * @return string - */ - private function buildCurlCommand($method, $uri, $body) - { - if (strpos($uri, '?') === false) { - $uri .= '?pretty=true'; - } else { - str_replace('?', '?pretty=true', $uri); - } - - $curlCommand = 'curl -X' . strtoupper($method); - $curlCommand .= " '" . $uri . "'"; - - if (isset($body) === true && $body !== '') { - $curlCommand .= " -d '" . $body . "'"; - } - - return $curlCommand; - } - - /** - * @param $request - * @param $response - * @param $ignore - * @throws \Elasticsearch\Common\Exceptions\AlreadyExpiredException|\Elasticsearch\Common\Exceptions\BadRequest400Exception|\Elasticsearch\Common\Exceptions\Conflict409Exception|\Elasticsearch\Common\Exceptions\Forbidden403Exception|\Elasticsearch\Common\Exceptions\Missing404Exception|\Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException|null - */ - private function process4xxError($request, $response, $ignore) - { - $statusCode = $response['status']; - $responseBody = $response['body']; - - /** @var \Exception $exception */ - $exception = $this->tryDeserialize400Error($response); - - if (array_search($response['status'], $ignore) !== false) { - return; - } - - if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) { - $exception = new AlreadyExpiredException($responseBody, $statusCode); - } elseif ($statusCode === 403) { - $exception = new Forbidden403Exception($responseBody, $statusCode); - } elseif ($statusCode === 404) { - $exception = new Missing404Exception($responseBody, $statusCode); - } elseif ($statusCode === 409) { - $exception = new Conflict409Exception($responseBody, $statusCode); - } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { - $exception = new ScriptLangNotSupportedException($responseBody. $statusCode); - } elseif ($statusCode === 408) { - $exception = new RequestTimeout408Exception($responseBody, $statusCode); - } - - $this->logRequestFail( - $request['http_method'], - $response['effective_url'], - $request['body'], - $request['headers'], - $response['status'], - $response['body'], - $response['transfer_stats']['total_time'], - $exception - ); - - throw $exception; - } - - /** - * @param $request - * @param $response - * @param $ignore - * @throws \Elasticsearch\Common\Exceptions\NoDocumentsToGetException|\Elasticsearch\Common\Exceptions\NoShardAvailableException|\Elasticsearch\Common\Exceptions\RoutingMissingException|\Elasticsearch\Common\Exceptions\ServerErrorResponseException - */ - private function process5xxError($request, $response, $ignore) - { - $statusCode = $response['status']; - $responseBody = $response['body']; - - /** @var \Exception $exception */ - $exception = $this->tryDeserialize500Error($response); - - $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage(); - $this->log->error($exceptionText); - $this->log->error($exception->getTraceAsString()); - - if (array_search($statusCode, $ignore) !== false) { - return; - } - - if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { - $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { - $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { - $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception); - } - - $this->logRequestFail( - $request['http_method'], - $response['effective_url'], - $request['body'], - $request['headers'], - $response['status'], - $response['body'], - $response['transfer_stats']['total_time'], - $exception - ); - - throw $exception; - } - - private function tryDeserialize400Error($response) - { - return $this->tryDeserializeError($response, 'Elasticsearch\Common\Exceptions\BadRequest400Exception'); - } - - private function tryDeserialize500Error($response) - { - return $this->tryDeserializeError($response, 'Elasticsearch\Common\Exceptions\ServerErrorResponseException'); - } - - private function tryDeserializeError($response, $errorClass) - { - $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - if (is_array($error) === true) { - // 2.0 structured exceptions - if (isset($error['error']['reason']) === true) { - - // Try to use root cause first (only grabs the first root cause) - $root = $error['error']['root_cause']; - if (isset($root) && isset($root[0])) { - $cause = $root[0]['reason']; - $type = $root[0]['type']; - } else { - $cause = $error['error']['reason']; - $type = $error['error']['type']; - } - - $original = new $errorClass($response['body'], $response['status']); - - return new $errorClass("$type: $cause", $response['status'], $original); - } elseif (isset($error['error']) === true) { - // <2.0 semi-structured exceptions - $original = new $errorClass($response['body'], $response['status']); - - return new $errorClass($error['error'], $response['status'], $original); - } - - // <2.0 "i just blew up" nonstructured exception - // $error is an array but we don't know the format, reuse the response body instead - return new $errorClass($response['body'], $response['status']); - } - - // <2.0 "i just blew up" nonstructured exception - return new $errorClass($error, $response['status']); - } -} diff --git a/src/Elasticsearch/Connections/ConnectionFactory.php b/src/Elasticsearch/Connections/ConnectionFactory.php deleted file mode 100644 index 553307797..000000000 --- a/src/Elasticsearch/Connections/ConnectionFactory.php +++ /dev/null @@ -1,64 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -class ConnectionFactory implements ConnectionFactoryInterface -{ - /** @var array */ - private $connectionParams; - - /** @var LoggerInterface */ - private $logger; - - /** @var LoggerInterface */ - private $tracer; - - /** @var callable */ - private $handler; - - /** - * Constructor - * - * @param callable $handler - * @param array $connectionParams - * @param SerializerInterface $serializer - * @param LoggerInterface $logger - * @param LoggerInterface $tracer - */ - public function __construct(callable $handler, array $connectionParams, SerializerInterface $serializer, LoggerInterface $logger, LoggerInterface $tracer) - { - $this->handler = $handler; - $this->connectionParams = $connectionParams; - $this->logger = $logger; - $this->tracer = $tracer; - $this->serializer = $serializer; - } - /** - * @param $hostDetails - * - * @return ConnectionInterface - */ - public function create($hostDetails) - { - return new Connection( - $this->handler, - $hostDetails, - $this->connectionParams, - $this->serializer, - $this->logger, - $this->tracer - ); - } -} diff --git a/src/Elasticsearch/Connections/ConnectionFactoryInterface.php b/src/Elasticsearch/Connections/ConnectionFactoryInterface.php deleted file mode 100644 index 242a32105..000000000 --- a/src/Elasticsearch/Connections/ConnectionFactoryInterface.php +++ /dev/null @@ -1,35 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -interface ConnectionFactoryInterface -{ - /** - * @param $handler - * @param array $connectionParams - * @param SerializerInterface $serializer - * @param LoggerInterface $logger - * @param LoggerInterface $tracer - */ - public function __construct(callable $handler, array $connectionParams, - SerializerInterface $serializer, LoggerInterface $logger, LoggerInterface $tracer); - - /** - * @param $hostDetails - * - * @return ConnectionInterface - */ - public function create($hostDetails); -} diff --git a/src/Elasticsearch/Connections/ConnectionInterface.php b/src/Elasticsearch/Connections/ConnectionInterface.php deleted file mode 100644 index 7e6038584..000000000 --- a/src/Elasticsearch/Connections/ConnectionInterface.php +++ /dev/null @@ -1,78 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elastic.co - */ -interface ConnectionInterface -{ - /** - * Constructor - * - * @param $handler - * @param array $hostDetails - * @param array $connectionParams connection-specific parameters - * @param \Elasticsearch\Serializers\SerializerInterface $serializer - * @param \Psr\Log\LoggerInterface $log Logger object - * @param \Psr\Log\LoggerInterface $trace Logger object - */ - public function __construct($handler, $hostDetails, $connectionParams, - SerializerInterface $serializer, LoggerInterface $log, LoggerInterface $trace); - - /** - * Get the transport schema for this connection - * - * @return string - */ - public function getTransportSchema(); - - /** - * Check to see if this instance is marked as 'alive' - * - * @return bool - */ - public function isAlive(); - - /** - * Mark this instance as 'alive' - * - * @return void - */ - public function markAlive(); - - /** - * Mark this instance as 'dead' - * - * @return void - */ - public function markDead(); - - /** - * Return an associative array of information about the last request - * - * @return array - */ - public function getLastRequestInfo(); - - /** - * @param $method - * @param $uri - * @param null $params - * @param null $body - * @param array $options - * @param \Elasticsearch\Transport $transport - * @return mixed - */ - public function performRequest($method, $uri, $params = null, $body = null, $options = [], Transport $transport); -} diff --git a/src/Elasticsearch/Endpoints/Indices/Refresh.php b/src/Elasticsearch/Endpoints/Indices/Refresh.php index e7938d151..c7e3eb265 100644 --- a/src/Elasticsearch/Endpoints/Indices/Refresh.php +++ b/src/Elasticsearch/Endpoints/Indices/Refresh.php @@ -23,7 +23,7 @@ public function getURI() $index = $this->index; $uri = "/_refresh"; - if (isset($index) === true) { + if (isset($index) === true && !empty($index)) { $uri = "/$index/_refresh"; } diff --git a/src/Elasticsearch/MessageBuilder.php b/src/Elasticsearch/MessageBuilder.php new file mode 100644 index 000000000..d12017f9b --- /dev/null +++ b/src/Elasticsearch/MessageBuilder.php @@ -0,0 +1,120 @@ +requestFactory = $requestFactory; + $this->serializer = $serializer; + } + + /** + * Create a PSR 7 Request given an Elasticsearch Endpoint + * + * @param AbstractEndpoint $endpoint + * + * @return RequestInterface + */ + public function createRequest(AbstractEndpoint $endpoint) + { + $body = null === $endpoint->getBody() ? null : $this->serializer->serialize($endpoint->getBody()); + + return $this->requestFactory->createRequest( + $endpoint->getMethod(), + $this->createUri($endpoint->getURI(), $endpoint->getParams()), + [ + 'Content-Length' => null === $body ? 0 : strlen($body), + 'User-Agent' => 'Elasticsearch PHP Client', + ], + $body + ); + } + + /** + * Create a specific given a promise and a way to fetch it + * + * @param Promise $promise + * @param string $fetch + * + * @throws \Exception + * + * @return array|ResponseInterface|Promise + */ + public function createResponse(Promise $promise, $fetch = self::FETCH_RESULT) + { + $serializer = $this->serializer; + + if ($fetch === self::FETCH_PROMISE) { + return $promise; + } + + if ($fetch === self::FETCH_PROMISE_DESERIALIZED) { + return $promise->then(function (ResponseInterface $response) use($serializer) { + return $serializer->deserialize($response->getBody()->getContents(), $response->getHeaders()); + }); + } + + /** @var ResponseInterface $response */ + $response = $promise->wait(); + + if ($fetch === self::FETCH_PSR7_RESPONSE) { + return $response; + } + + $body = $response->getBody()->getContents(); + + return $serializer->deserialize($body, $response->getHeaders()); + } + + /** + * Create an uri + * + * @param string $uri + * @param array $parameters + * + * @return string + */ + private function createUri($uri, array $parameters = []) + { + if (count($parameters) === 0) { + return $uri; + } + + $parameters = array_map(function ($value) { + if (is_bool($value)) { + return $value ? 'true' : 'false'; + } + + return $value; + }, $parameters); + + return $uri . '?' . http_build_query($parameters); + } +} diff --git a/src/Elasticsearch/Namespaces/AbstractNamespace.php b/src/Elasticsearch/Namespaces/AbstractNamespace.php index 42a0c7815..bdce642f8 100644 --- a/src/Elasticsearch/Namespaces/AbstractNamespace.php +++ b/src/Elasticsearch/Namespaces/AbstractNamespace.php @@ -64,14 +64,6 @@ public function extractArgument(&$params, $arg) */ protected function performRequest(AbstractEndpoint $endpoint) { - $response = $this->transport->performRequest( - $endpoint->getMethod(), - $endpoint->getURI(), - $endpoint->getParams(), - $endpoint->getBody(), - $endpoint->getOptions() - ); - - return $this->transport->resultOrFuture($response, $endpoint->getOptions()); + return $this->transport->performRequest($endpoint); } } diff --git a/src/Elasticsearch/Namespaces/BooleanRequestWrapper.php b/src/Elasticsearch/Namespaces/BooleanRequestWrapper.php index eb9c4cd57..34cba9952 100644 --- a/src/Elasticsearch/Namespaces/BooleanRequestWrapper.php +++ b/src/Elasticsearch/Namespaces/BooleanRequestWrapper.php @@ -2,11 +2,13 @@ namespace Elasticsearch\Namespaces; -use Elasticsearch\Common\Exceptions\Missing404Exception; -use Elasticsearch\Common\Exceptions\RoutingMissingException; +use Elasticsearch\Common\Exceptions\Http\Missing404Exception; +use Elasticsearch\Common\Exceptions\Http\RoutingMissingException; use Elasticsearch\Endpoints\AbstractEndpoint; +use Elasticsearch\MessageBuilder; use Elasticsearch\Transport; -use GuzzleHttp\Ring\Future\FutureArrayInterface; +use Http\Promise\Promise; +use Psr\Http\Message\ResponseInterface; /** * Trait AbstractNamespace @@ -24,31 +26,27 @@ trait BooleanRequestWrapper * * @param AbstractEndpoint $endpoint The Endpoint to perform this request against * - * @throws Missing404Exception - * @throws RoutingMissingException + * @return boolean */ public static function performRequest(AbstractEndpoint $endpoint, Transport $transport) { try { - $response = $transport->performRequest( - $endpoint->getMethod(), - $endpoint->getURI(), - $endpoint->getParams(), - $endpoint->getBody(), - $endpoint->getOptions() - ); + $options = $endpoint->getOptions(); + $fetch = MessageBuilder::FETCH_PSR7_RESPONSE; - $response = $transport->resultOrFuture($response, $endpoint->getOptions()); - if (!($response instanceof FutureArrayInterface)) { - if ($response['status'] === 200) { - return true; - } else { - return false; - } - } else { - // async mode, can't easily resolve this...punt to user - return $response; + if (isset($options['client']['future'])) { + $fetch = MessageBuilder::FETCH_PROMISE; } + + $response = $transport->performRequest($endpoint, $fetch); + + if ($response instanceof Promise) { + return $response->then(function (ResponseInterface $response) { + return $response->getStatusCode() === 200; + }); + } + + return $response->getStatusCode() === 200; } catch (Missing404Exception $exception) { return false; } catch (RoutingMissingException $exception) { diff --git a/src/Elasticsearch/Plugin/ErrorPlugin.php b/src/Elasticsearch/Plugin/ErrorPlugin.php new file mode 100644 index 000000000..6a0818bae --- /dev/null +++ b/src/Elasticsearch/Plugin/ErrorPlugin.php @@ -0,0 +1,177 @@ +serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function handleRequest(RequestInterface $request, callable $next, callable $first) + { + return $next($request)->then(function (ResponseInterface $response) use ($request) { + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500) { + $this->process4xxError($request, $response); + } + + if ($response->getStatusCode() >= 500 && $response->getStatusCode() < 600) { + $this->process5xxError($request, $response); + } + + return $response; + }); + } + + /** + * Throw a 4XX Exception + * + * @param RequestInterface $request + * @param ResponseInterface $response + * + * @throws HttpException\AlreadyExpiredException + * @throws HttpException\BadRequest400Exception + * @throws HttpException\Conflict409Exception + * @throws HttpException\Forbidden403Exception + * @throws HttpException\Missing404Exception + * @throws HttpException\ScriptLangNotSupportedException + */ + private function process4xxError(RequestInterface $request, ResponseInterface $response) + { + $statusCode = $response->getStatusCode(); + $responseBody = (string)$response->getBody(); + + $exception = $this->tryDeserialize400Error($request, $response, $responseBody); + + if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) { + $exception = new HttpException\AlreadyExpiredException($responseBody, $request, $response); + } elseif ($statusCode === 403) { + $exception = new HttpException\Forbidden403Exception($responseBody, $request, $response); + } elseif ($statusCode === 404) { + $exception = new HttpException\Missing404Exception($responseBody, $request, $response); + } elseif ($statusCode === 409) { + $exception = new HttpException\Conflict409Exception($responseBody, $request, $response); + } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { + $exception = new HttpException\ScriptLangNotSupportedException($responseBody, $request, $response); + } elseif ($statusCode === 408) { + $exception = new HttpException\RequestTimeout408Exception($responseBody, $request, $response); + } + + throw $exception; + } + + /** + * Throw a 5XX Exception + * + * @param RequestInterface $request + * @param ResponseInterface $response + * + * @throws HttpException\NoDocumentsToGetException + * @throws HttpException\NoShardAvailableException + * @throws HttpException\RoutingMissingException + * @throws HttpException\ServerErrorResponseException + */ + private function process5xxError(RequestInterface $request, ResponseInterface $response) + { + $statusCode = $response->getStatusCode(); + $responseBody = (string)$response->getBody(); + + $exception = $this->tryDeserialize500Error($request, $response, $responseBody); + + if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { + $exception = new HttpException\RoutingMissingException($exception->getMessage(), $request, $response, $exception); + } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { + $exception = new HttpException\NoDocumentsToGetException($exception->getMessage(), $request, $response, $exception); + } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { + $exception = new HttpException\NoShardAvailableException($exception->getMessage(), $request, $response, $exception); + } + + throw $exception; + } + + /** + * @param RequestInterface $request + * @param ResponseInterface $response + * + * @return HttpException\BadRequest400Exception + */ + private function tryDeserialize400Error(RequestInterface $request, ResponseInterface $response, $responseBody) + { + return $this->tryDeserializeError($request, $response, $responseBody, 'Elasticsearch\Common\Exceptions\Http\BadRequest400Exception'); + } + + /** + * @param RequestInterface $request + * @param ResponseInterface $response + * + * @return HttpException\ServerErrorResponseException + */ + private function tryDeserialize500Error(RequestInterface $request, ResponseInterface $response, $responseBody) + { + return $this->tryDeserializeError($request, $response, $responseBody, 'Elasticsearch\Common\Exceptions\Http\ServerErrorResponseException'); + } + + /** + * Return a new elasticsearch exception + * + * @param RequestInterface $request + * @param ResponseInterface $response + * @param string $errorClass + * + * @return \Exception + */ + private function tryDeserializeError(RequestInterface $request, ResponseInterface $response, $responseBody, $errorClass) + { + $error = $this->serializer->deserialize($responseBody, $response->getHeaders()); + + if (is_array($error) === true) { + // 2.0 structured exceptions + if (isset($error['error']['reason']) === true) { + + // Try to use root cause first (only grabs the first root cause) + $root = $error['error']['root_cause']; + if (isset($root) && isset($root[0])) { + $cause = $root[0]['reason']; + $type = $root[0]['type']; + } else { + $cause = $error['error']['reason']; + $type = $error['error']['type']; + } + + $original = new $errorClass($responseBody, $request, $response); + + return new $errorClass("$type: $cause", $request, $response, $original); + } elseif (isset($error['error']) === true) { + // <2.0 semi-structured exceptions + $original = new $errorClass($responseBody, $request, $response); + + return new $errorClass($responseBody, $request, $response, $original); + } + + // <2.0 "i just blew up" nonstructured exception + // $error is an array but we don't know the format, reuse the response body instead + return new $errorClass($responseBody, $request, $response); + } + + // <2.0 "i just blew up" nonstructured exception + return new $errorClass($error, $request, $response); + } +} diff --git a/src/Elasticsearch/Serializers/SerializerInterface.php b/src/Elasticsearch/Serializers/SerializerInterface.php index a23796309..96aaa9e06 100644 --- a/src/Elasticsearch/Serializers/SerializerInterface.php +++ b/src/Elasticsearch/Serializers/SerializerInterface.php @@ -25,6 +25,8 @@ public function serialize($data); /** * Deserialize json encoded string into an associative array * + * @TODO Change the signature for using only a PSR7 Response as the input + * * @param string $data JSON encoded string * @param array $headers Response Headers * diff --git a/src/Elasticsearch/Serializers/SmartSerializer.php b/src/Elasticsearch/Serializers/SmartSerializer.php index 123b7cfd8..a75f64e3e 100644 --- a/src/Elasticsearch/Serializers/SmartSerializer.php +++ b/src/Elasticsearch/Serializers/SmartSerializer.php @@ -48,8 +48,8 @@ public function serialize($data) */ public function deserialize($data, $headers) { - if (isset($headers['content_type']) === true) { - if (strpos($headers['content_type'], 'json') !== false) { + if (isset($headers['Content-Type']) === true) { + if (strpos($headers['Content-Type'][0], 'json') !== false) { return $this->decode($data); } else { //Not json, return as string diff --git a/src/Elasticsearch/Transport.php b/src/Elasticsearch/Transport.php index 07323f786..d3b74768e 100644 --- a/src/Elasticsearch/Transport.php +++ b/src/Elasticsearch/Transport.php @@ -6,7 +6,12 @@ use Elasticsearch\ConnectionPool\AbstractConnectionPool; use Elasticsearch\Connections\Connection; use Elasticsearch\Connections\ConnectionInterface; +use Elasticsearch\Endpoints\AbstractEndpoint; use GuzzleHttp\Ring\Future\FutureArrayInterface; +use Http\Client\Exception; +use Http\Client\HttpAsyncClient; +use Http\Promise\Promise; +use Psr\Http\Message\ResponseInterface; use Psr\Log\LoggerInterface; /** @@ -20,153 +25,70 @@ */ class Transport { + /** - * @var AbstractConnectionPool + * @var HttpAsyncClient */ - public $connectionPool; + private $httpAsyncClient; /** - * @var LoggerInterface + * @var MessageBuilder */ - private $log; - - /** @var int */ - public $retryAttempts = 0; - - /** @var Connection */ - public $lastConnection; - - /** @var int */ - public $retries; + private $messageBuilder; /** * Transport class is responsible for dispatching requests to the * underlying cluster connections * - * @param $retries - * @param bool $sniffOnStart - * @param ConnectionPool\AbstractConnectionPool $connectionPool - * @param \Psr\Log\LoggerInterface $log Monolog logger object - */ - public function __construct($retries, $sniffOnStart = false, AbstractConnectionPool $connectionPool, LoggerInterface $log) - { - $this->log = $log; - $this->connectionPool = $connectionPool; - $this->retries = $retries; - - if ($sniffOnStart === true) { - $this->log->notice('Sniff on Start.'); - $this->connectionPool->scheduleCheck(); - } - } - - /** - * Returns a single connection from the connection pool - * Potentially performs a sniffing step before returning - * - * @return ConnectionInterface Connection + * @param HttpAsyncClient $httpAsyncClient An http client to do async requests + * @param MessageBuilder $messageBuilder + * @param bool $sniffOnStart */ - - public function getConnection() + public function __construct(HttpAsyncClient $httpAsyncClient, MessageBuilder $messageBuilder, $sniffOnStart = false) { - return $this->connectionPool->nextConnection(); + $this->httpAsyncClient = $httpAsyncClient; + $this->messageBuilder = $messageBuilder; } /** * Perform a request to the Cluster * - * @param string $method HTTP method to use - * @param string $uri HTTP URI to send request to - * @param null $params Optional query parameters - * @param null $body Optional query body - * @param array $options + * @param AbstractEndpoint $endpoint Endpoint to use + * @param string $fetch * - * @throws Common\Exceptions\NoNodesAvailableException|\Exception - * @return FutureArrayInterface - */ - public function performRequest($method, $uri, $params = null, $body = null, $options = []) - { - try { - $connection = $this->getConnection(); - } catch (Exceptions\NoNodesAvailableException $exception) { - $this->log->critical('No alive nodes found in cluster'); - throw $exception; - } - - $response = array(); - $caughtException = null; - $this->lastConnection = $connection; - - $future = $connection->performRequest( - $method, - $uri, - $params, - $body, - $options, - $this - ); - - $future->promise()->then( - //onSuccess - function ($response) { - $this->retryAttempts = 0; - // Note, this could be a 4xx or 5xx error - }, - //onFailure - function ($response) { - //some kind of real faiure here, like a timeout - $this->connectionPool->scheduleCheck(); - // log stuff - }); - - return $future; - } - - /** - * @param FutureArrayInterface $result Response of a request (promise) - * @param array $options Options for transport + * @throws \Exception * - * @return callable|array + * @return array|ResponseInterface|Promise */ - public function resultOrFuture($result, $options = []) + public function performRequest(AbstractEndpoint $endpoint, $fetch = null) { - $response = null; - $async = isset($options['client']['future']) ? $options['client']['future'] : null; - if (is_null($async) || $async === false) { - do { - $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); - - return $result; - } elseif ($async === true || $async === 'lazy') { - return $result; + $request = $this->messageBuilder->createRequest($endpoint); + $promise = $this->httpAsyncClient->sendAsyncRequest($request); + $options = $endpoint->getOptions(); + + if (array_key_exists('client', $options)) { + if (array_key_exists('ignore', $options['client'])) { + $promise = $promise->then(null, function ($exception) use ($options) { + if ( + $exception instanceof Exception\HttpException && + in_array($exception->getResponse()->getStatusCode(), $options['client']['ignore'], true) + ) { + return $exception->getResponse(); + } + + throw $exception; + }); + } + + if (null === $fetch && array_key_exists('future', $options['client']) && $options['client']['future'] === true) { + $fetch = MessageBuilder::FETCH_PROMISE_DESERIALIZED; + } } - } - - /** - * @param $request - * - * @return bool - */ - public function shouldRetry($request) - { - if ($this->retryAttempts < $this->retries) { - $this->retryAttempts += 1; - return true; + if (null === $fetch) { + $fetch = MessageBuilder::FETCH_RESULT; } - return false; - } - - /** - * Returns the last used connection so that it may be inspected. Mainly - * for debugging/testing purposes. - * - * @return Connection - */ - public function getLastConnection() - { - return $this->lastConnection; + return $this->messageBuilder->createResponse($promise, $fetch); } } diff --git a/tests/Elasticsearch/Tests/ClientTest.php b/tests/Elasticsearch/Tests/ClientTest.php index a5b0629ba..8ffb6c240 100644 --- a/tests/Elasticsearch/Tests/ClientTest.php +++ b/tests/Elasticsearch/Tests/ClientTest.php @@ -4,7 +4,6 @@ use Elasticsearch; use Elasticsearch\ClientBuilder; -use Mockery as m; /** * Class ClientTest @@ -18,13 +17,8 @@ */ class ClientTest extends \PHPUnit_Framework_TestCase { - public function tearDown() - { - m::close(); - } - /** - * @expectedException \Elasticsearch\Common\Exceptions\InvalidArgumentException + * @expectedException \LogicException */ public function testConstructorIllegalPort() { @@ -54,7 +48,6 @@ public function testFromConfig() 'localhost:9200' ], 'retries' => 2, - 'handler' => ClientBuilder::multiHandler() ]; $client = ClientBuilder::fromConfig($params); } @@ -215,55 +208,4 @@ public function testArrayOfNullDelete() // all good } } - - public function testMaxRetriesException() - { - $client = Elasticsearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - $searchParams = array( - 'index' => 'test', - 'type' => 'test', - 'body' => [ - 'query' => [ - 'match_all' => [] - ] - ] - ); - - $client = Elasticsearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - try { - $client->search($searchParams); - $this->fail("Should have thrown CouldNotConnectToHost"); - } catch (Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost $e) { - // All good - $previous = $e->getPrevious(); - $this->assertInstanceOf('Elasticsearch\Common\Exceptions\MaxRetriesException', $previous); - } catch (\Exception $e) { - throw $e; - } - - - $client = Elasticsearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - try { - $client->search($searchParams); - $this->fail("Should have thrown TransportException"); - } catch (Elasticsearch\Common\Exceptions\TransportException $e) { - // All good - $previous = $e->getPrevious(); - $this->assertInstanceOf('Elasticsearch\Common\Exceptions\MaxRetriesException', $previous); - } catch (\Exception $e) { - throw $e; - } - } } diff --git a/tests/Elasticsearch/Tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php b/tests/Elasticsearch/Tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php deleted file mode 100644 index 7e5de4fc5..000000000 --- a/tests/Elasticsearch/Tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php +++ /dev/null @@ -1,82 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elasticsearch.org - */ -class RoundRobinSelectorTest extends \PHPUnit_Framework_TestCase -{ - /** - * Add Ten connections, select 15 to verify round robin - * - * @covers \Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector::select - * - * @return void - */ - public function testTenConnections() - { - $roundRobin = new Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector(); - - $mockConnections = array(); - foreach (range(0, 10) as $index) { - $mockConnections[$index] = $this->getMockBuilder('\Elasticsearch\Connections\CurlMultiConnection') - ->disableOriginalConstructor() - ->getMock(); - } - - foreach (range(0, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $nextIndex = ($index % 10) + 1; - $this->assertEquals($mockConnections[$nextIndex], $retConnection); - } - } - - /** - * Add Ten connections, select five, remove thre, test another 10 to check - * that the round-robining works after removing connections - * - * @covers \Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector::select - * - * @return void - */ - public function testAddTenConnectionsestFiveTRemoveThree() - { - $roundRobin = new Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector(); - - $mockConnections = array(); - foreach (range(0, 10) as $index) { - $mockConnections[$index] = $this->getMockBuilder('\Elasticsearch\Connections\CurlMultiConnection') - ->disableOriginalConstructor() - ->getMock(); - } - - foreach (range(0, 4) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $nextIndex = ($index % (count($mockConnections)-1)) + 1; - $this->assertEquals($mockConnections[$nextIndex], $retConnection); - } - - unset($mockConnections[8]); - unset($mockConnections[9]); - unset($mockConnections[10]); - - foreach (range(5, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $nextIndex = ($index % (count($mockConnections)-1)) + 1; - $this->assertEquals($mockConnections[$nextIndex], $retConnection); - } - } -} diff --git a/tests/Elasticsearch/Tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php b/tests/Elasticsearch/Tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php deleted file mode 100644 index 3d93ec790..000000000 --- a/tests/Elasticsearch/Tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php +++ /dev/null @@ -1,65 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elasticsearch.org - */ -class StickyRoundRobinSelectorTest extends \PHPUnit_Framework_TestCase -{ - public function tearDown() - { - m::close(); - } - - public function testTenConnections() - { - $roundRobin = new Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector(); - - $mockConnections = array(); - $mockConnections[] = m::mock('\Elasticsearch\Connections\GuzzleConnection') - ->shouldReceive('isAlive')->times(16)->andReturn(true)->getMock(); - - foreach (range(0, 9) as $index) { - $mockConnections[] = m::mock('\Elasticsearch\Connections\GuzzleConnection'); - } - - foreach (range(0, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $this->assertEquals($mockConnections[0], $retConnection); - } - } - - public function testTenConnectionsFirstDies() - { - $roundRobin = new Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector(); - - $mockConnections = array(); - $mockConnections[] = m::mock('\Elasticsearch\Connections\GuzzleConnection') - ->shouldReceive('isAlive')->once()->andReturn(false)->getMock(); - - $mockConnections[] = m::mock('\Elasticsearch\Connections\GuzzleConnection') - ->shouldReceive('isAlive')->times(15)->andReturn(true)->getMock(); - - foreach (range(0, 8) as $index) { - $mockConnections[] = m::mock('\Elasticsearch\Connections\GuzzleConnection'); - } - - foreach (range(0, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $this->assertEquals($mockConnections[1], $retConnection); - } - } -} diff --git a/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php b/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php deleted file mode 100644 index e6f3de922..000000000 --- a/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php +++ /dev/null @@ -1,25 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elasticsearch.org - */ -class SniffingConnectionPoolIntegrationTest extends \PHPUnit_Framework_TestCase -{ - public function testSniff() - { - $client = ClientBuilder::create() - ->setHosts([$_SERVER['ES_TEST_HOST']]) - ->setConnectionPool('\Elasticsearch\ConnectionPool\SniffingConnectionPool', ['sniffingInterval' => -10]) - ->build(); - - $client->ping(); - } -} diff --git a/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolTest.php b/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolTest.php deleted file mode 100644 index 5d38c87a2..000000000 --- a/tests/Elasticsearch/Tests/ConnectionPool/SniffingConnectionPoolTest.php +++ /dev/null @@ -1,426 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elasticsearch.org - */ -class SniffingConnectionPoolTest extends \PHPUnit_Framework_TestCase -{ - public function tearDown() - { - m::close(); - } - - public function testAddOneHostThenGetConnection() - { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(true) - ->getMock(); - - $connections = array($mockConnection); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturn($connections[0]) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $connectionPoolParams = array('randomizeHosts' => false); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($mockConnection, $retConnection); - } - - public function testAddOneHostAndTriggerSniff() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true); - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock() - ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock(); - - $connections = array($mockConnection); - $mockNewConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select')->twice() - ->andReturn($mockNewConnection) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($mockNewConnection)->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($mockNewConnection, $retConnection); - } - - public function testAddOneHostAndForceNext() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true); - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock() - ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock(); - - $connections = array($mockConnection); - $mockNewConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select')->once()->andReturn($mockConnection)->getMock() - ->shouldReceive('select')->once()->andReturn($mockNewConnection)->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($mockNewConnection)->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(true); - - $this->assertEquals($mockNewConnection, $retConnection); - } - - public function testAddTenNodesThenGetConnection() - { - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(true) - ->getMock(); - - $connections[] = $mockConnection; - } - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturn($connections[0]) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $connectionPoolParams = array('randomizeHosts' => false); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($connections[0], $retConnection); - } - - public function testAddTenNodesTimeoutAllButLast() - { - $connections = array(); - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(false) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(false) - ->getMock(); - - $connections[] = $mockConnection; - } - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(true) - ->getMock(); - - $connections[] = $mockConnection; - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($connections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $connectionPoolParams = array('randomizeHosts' => false); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($connections[9], $retConnection); - } - - /** - * @expectedException Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function testAddTenNodesAllTimeout() - { - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(false) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(false) - ->getMock(); - - $connections[] = $mockConnection; - } - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($connections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $connectionPoolParams = array('randomizeHosts' => false); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - } - - public function testAddOneHostSniffTwo() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('getTransportSchema')->twice()->andReturn('http')->getMock() - ->shouldReceive('sniff')->twice()->andReturn($clusterState)->getMock(); - - $connections = array($mockConnection); - - $newConnections = array(); - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock(); - - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues(array( //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1] - )) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($newConnections[0])->getMock() - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9201))->andReturn($newConnections[1])->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[0], $retConnection); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[1], $retConnection); - } - - /** - * @expectedException Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function testAddSeed_SniffTwo_TimeoutTwo() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock() - ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock(); - - $connections = array($mockConnection); - - $newConnections = array(); - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(false)->getMock() - ->shouldReceive('ping')->andReturn(false)->getMock(); - - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(false)->getMock() - ->shouldReceive('ping')->andReturn(false)->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues(array( //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1] - )) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($newConnections[0])->getMock() - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9201))->andReturn($newConnections[1])->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($mockConnection, $retConnection); - } - - public function testTen_TimeoutNine_SniffTenth_AddTwoAlive() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(false)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('sniff')->andThrow('Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException')->getMock(); - - $connections[] = $mockConnection; - } - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('sniff')->andReturn($clusterState)->getMock() - ->shouldReceive('getTransportSchema')->twice()->andReturn('http')->getMock(); - - $connections[] = $mockConnection; - - $newConnections = $connections; - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('ping')->andReturn(true)->getMock(); - - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('ping')->andReturn(true)->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($newConnections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($newConnections[10])->getMock() - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9201))->andReturn($newConnections[11])->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[11], $retConnection); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[12], $retConnection); - } - - /** - * @expectedException Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function testTen_TimeoutNine_SniffTenth_AddTwoDead_TimeoutEveryone() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(false)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('sniff')->andThrow('Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException')->getMock(); - - $connections[] = $mockConnection; - } - - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->andReturn(true)->getMock() - ->shouldReceive('isAlive')->andReturn(true)->getMock() - ->shouldReceive('sniff')->andReturn($clusterState)->getMock() - ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock() - ->shouldReceive('sniff')->andThrow('Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException')->getMock(); - - $connections[] = $mockConnection; - - $newConnections = $connections; - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(false)->getMock() - ->shouldReceive('ping')->andReturn(false)->getMock() - ->shouldReceive('sniff')->andThrow('Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException')->getMock(); - - $newConnections[] = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('isAlive')->andReturn(false)->getMock() - ->shouldReceive('ping')->andReturn(false)->getMock() - ->shouldReceive('sniff')->andThrow('Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException')->getMock(); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($newConnections) - ->getMock(); - - $RRConnections = $newConnections; - //array_push($connections); - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory') - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9200))->andReturn($newConnections[10])->getMock() - ->shouldReceive('create')->with(array('host' => '192.168.1.119', 'port' => 9201))->andReturn($newConnections[11])->getMock(); - - $connectionPoolParams = array( - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ); - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[11], $retConnection); - - $retConnection = $connectionPool->nextConnection(); - $this->assertEquals($newConnections[12], $retConnection); - } -} diff --git a/tests/Elasticsearch/Tests/ConnectionPool/StaticConnectionPoolTest.php b/tests/Elasticsearch/Tests/ConnectionPool/StaticConnectionPoolTest.php deleted file mode 100644 index d8466db9b..000000000 --- a/tests/Elasticsearch/Tests/ConnectionPool/StaticConnectionPoolTest.php +++ /dev/null @@ -1,231 +0,0 @@ - - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2 - * @link http://elasticsearch.org - */ -class StaticConnectionPoolTest extends \PHPUnit_Framework_TestCase -{ - public function tearDown() - { - m::close(); - } - - public function testAddOneHostThenGetConnection() - { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(true) - ->getMock() - ->shouldReceive('markDead')->once()->getMock(); - - $connections = array($mockConnection); - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturn($connections[0]) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $randomizeHosts = false; - $connectionPool = new Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($mockConnection, $retConnection); - } - - public function testAddMultipleHostsThenGetFirst() - { - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(true) - ->getMock() - ->shouldReceive('markDead')->once()->getMock(); - - $connections[] = $mockConnection; - } - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturn($connections[0]) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $randomizeHosts = false; - $connectionPool = new Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertEquals($connections[0], $retConnection); - } - - /** - * @expectedException Elasticsearch\Common\Exceptions\NoNodesAvailableException - */ - public function testAllHostsFailPing() - { - $connections = array(); - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(false) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(false) - ->getMock() - ->shouldReceive('markDead')->once()->getMock() - ->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock() - ->shouldReceive('getLastPing')->andReturn(time())->once()->getMock(); - - $connections[] = $mockConnection; - } - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($connections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $randomizeHosts = false; - $connectionPool = new Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts); - - $connectionPool->nextConnection(); - } - - public function testAllExceptLastHostFailPingRevivesInSkip() - { - $connections = array(); - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(false) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(false) - ->getMock() - ->shouldReceive('markDead')->once()->getMock() - ->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock() - ->shouldReceive('getLastPing')->andReturn(time())->once()->getMock(); - - $connections[] = $mockConnection; - } - - $goodConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->once() - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive')->once() - ->andReturn(false) - ->getMock() - ->shouldReceive('markDead')->once()->getMock() - ->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock() - ->shouldReceive('getLastPing')->andReturn(time())->once()->getMock(); - - $connections[] = $goodConnection; - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($connections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $randomizeHosts = false; - $connectionPool = new Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts); - - $ret = $connectionPool->nextConnection(); - $this->assertEquals($goodConnection, $ret); - } - - public function testAllExceptLastHostFailPingRevivesPreSkip() - { - $connections = array(); - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping') - ->andReturn(false) - ->getMock() - ->shouldReceive('isAlive') - ->andReturn(false) - ->getMock() - ->shouldReceive('markDead')->once()->getMock() - ->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock() - ->shouldReceive('getLastPing')->andReturn(time())->once()->getMock(); - - $connections[] = $mockConnection; - } - - $goodConnection = m::mock('\Elasticsearch\Connections\Connection') - ->shouldReceive('ping')->once() - ->andReturn(true) - ->getMock() - ->shouldReceive('isAlive')->once() - ->andReturn(false) - ->getMock() - ->shouldReceive('markDead')->once()->getMock() - ->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock() - ->shouldReceive('getLastPing')->andReturn(time()-10000)->once()->getMock(); - - $connections[] = $goodConnection; - - $selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector') - ->shouldReceive('select') - ->andReturnValues($connections) - ->getMock(); - - $connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory'); - - $randomizeHosts = false; - $connectionPool = new Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts); - - $ret = $connectionPool->nextConnection(); - $this->assertEquals($goodConnection, $ret); - } - - public function testCustomConnectionPoolIT() - { - $clientBuilder = \Elasticsearch\ClientBuilder::create(); - $clientBuilder->setHosts(['localhost:1']); - $client = $clientBuilder - ->setRetries(0) - ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', []) - ->build(); - - try { - $client->search([]); - $this->fail("Should have thrown NoNodesAvailableException"); - } catch (Elasticsearch\Common\Exceptions\NoNodesAvailableException $e) { - // All good - } catch (\Exception $e) { - throw $e; - } - } -} diff --git a/tests/Elasticsearch/Tests/Helper/Iterators/SearchResponseIteratorTest.php b/tests/Elasticsearch/Tests/Helper/Iterators/SearchResponseIteratorTest.php index 7c5569f45..5b332880a 100644 --- a/tests/Elasticsearch/Tests/Helper/Iterators/SearchResponseIteratorTest.php +++ b/tests/Elasticsearch/Tests/Helper/Iterators/SearchResponseIteratorTest.php @@ -2,8 +2,9 @@ namespace Elasticsearch\Tests\Helper\Iterators; +use Elasticsearch\Client; use Elasticsearch\Helper\Iterators\SearchResponseIterator; -use Mockery as m; +use Prophecy\Argument; /** * Class SearchResponseIteratorTest @@ -14,12 +15,6 @@ */ class SearchResponseIteratorTest extends \PHPUnit_Framework_TestCase { - - public function tearDown() - { - m::close(); - } - public function testWithNoResults() { $search_params = array( @@ -34,51 +29,24 @@ public function testWithNoResults() ) ); - $mock_client = m::mock('\Elasticsearch\Client'); - - $mock_client->shouldReceive('search') - ->once() - ->ordered() - ->with($search_params) - ->andReturn(array('_scroll_id' => 'scroll_id_01')); - - $mock_client->shouldReceive('scroll') - ->once() - ->ordered() - ->with( - array( - 'scroll_id' => 'scroll_id_01', - 'scroll' => '5m' - ) - ) - ->andReturn( - array( - '_scroll_id' => 'scroll_id_02', - 'hits' => array( - 'hits' => array( - ) - ) - ) - ); - - $mock_client->shouldReceive('scroll') - ->never() - ->with( - array( - 'scroll_id' => 'scroll_id_02', - 'scroll' => '5m' - ) - ); - - $mock_client->shouldReceive('clearScroll') - ->once() - ->ordered() - ->withAnyArgs(); - - - $responses = new SearchResponseIterator($mock_client, $search_params); - - $this->assertCount(0, $responses); + $client = $this->prophesize(Client::class); + $client->search($search_params)->willReturn([ + '_scroll_id' => 'scroll_id_01' + ]); + $client->scroll([ + 'scroll_id' => 'scroll_id_01', + 'scroll' => '5m' + ])->willReturn([ + '_scroll_id' => 'scroll_id_02', + 'hits' => [ + 'hits' => [] + ] + ]); + $client->clearScroll(Argument::any())->shouldBeCalled(); + + $responses = new SearchResponseIterator($client->reveal(), $search_params); + + $this->assertCount(1, $responses); } public function testWithScan() @@ -95,87 +63,43 @@ public function testWithScan() ) ); - $mock_client = m::mock('\Elasticsearch\Client'); - - $mock_client->shouldReceive('search') - ->once() - ->ordered() - ->with($search_params) - ->andReturn(array('_scroll_id' => 'scroll_id_01')); - - $mock_client->shouldReceive('scroll') - ->once() - ->ordered() - ->with( - array( - 'scroll_id' => 'scroll_id_01', - 'scroll' => '5m' - ) - ) - ->andReturn( - array( - '_scroll_id' => 'scroll_id_02', - 'hits' => array( - 'hits' => array( - array() - ) - ) - ) - ); - - $mock_client->shouldReceive('scroll') - ->once() - ->ordered() - ->with( - array( - 'scroll_id' => 'scroll_id_02', - 'scroll' => '5m' - ) - ) - ->andReturn( - array( - '_scroll_id' => 'scroll_id_03', - 'hits' => array( - 'hits' => array( - array() - ) - ) - ) - ); - - $mock_client->shouldReceive('scroll') - ->once() - ->ordered() - ->with( - array( - 'scroll_id' => 'scroll_id_03', - 'scroll' => '5m' - ) - ) - ->andReturn( - array( - '_scroll_id' => 'scroll_id_04', - 'hits' => array( - ) - ) - ); - - $mock_client->shouldReceive('scroll') - ->never() - ->with( - array( - 'scroll_id' => 'scroll_id_04', - 'scroll' => '5m' - ) - ); - - $mock_client->shouldReceive('clearScroll') - ->once() - ->ordered() - ->withAnyArgs(); - - $responses = new SearchResponseIterator($mock_client, $search_params); - - $this->assertCount(2, $responses); + $client = $this->prophesize(Client::class); + $client->search($search_params)->willReturn([ + '_scroll_id' => 'scroll_id_01' + ]); + + $client->scroll([ + 'scroll_id' => 'scroll_id_01', + 'scroll' => '5m' + ])->willReturn([ + '_scroll_id' => 'scroll_id_02', + 'hits' => [ + 'hits' => [[]] + ] + ]); + + $client->scroll([ + 'scroll_id' => 'scroll_id_02', + 'scroll' => '5m' + ])->willReturn([ + '_scroll_id' => 'scroll_id_03', + 'hits' => [ + 'hits' => [[]] + ] + ]); + + $client->scroll([ + 'scroll_id' => 'scroll_id_03', + 'scroll' => '5m' + ])->willReturn([ + '_scroll_id' => 'scroll_id_04', + 'hits' => [ + 'hits' => [] + ] + ]); + $client->clearScroll(Argument::any())->shouldBeCalled(); + $responses = new SearchResponseIterator($client->reveal(), $search_params); + + $this->assertCount(3, $responses); } } diff --git a/tests/Elasticsearch/Tests/RegisteredNamespaceTest.php b/tests/Elasticsearch/Tests/RegisteredNamespaceTest.php index f40430f0c..f2d109036 100644 --- a/tests/Elasticsearch/Tests/RegisteredNamespaceTest.php +++ b/tests/Elasticsearch/Tests/RegisteredNamespaceTest.php @@ -6,7 +6,6 @@ use Elasticsearch\ClientBuilder; use Elasticsearch\Serializers\SerializerInterface; use Elasticsearch\Transport; -use Mockery as m; /** * Class RegisteredNamespaceTest @@ -20,11 +19,6 @@ */ class RegisteredNamespaceTest extends \PHPUnit_Framework_TestCase { - public function tearDown() - { - m::close(); - } - public function testRegisteringNamespace() { $builder = new FooNamespaceBuilder(); @@ -62,4 +56,4 @@ public function fooMethod() { return "123"; } -} \ No newline at end of file +} diff --git a/tests/Elasticsearch/Tests/Serializers/ArrayToJSONSerializerTest.php b/tests/Elasticsearch/Tests/Serializers/ArrayToJSONSerializerTest.php index e22e3cac4..5e2da02e3 100644 --- a/tests/Elasticsearch/Tests/Serializers/ArrayToJSONSerializerTest.php +++ b/tests/Elasticsearch/Tests/Serializers/ArrayToJSONSerializerTest.php @@ -4,7 +4,6 @@ use Elasticsearch\Serializers\ArrayToJSONSerializer; use PHPUnit_Framework_TestCase; -use Mockery as m; /** * Class ArrayToJSONSerializerTest @@ -12,11 +11,6 @@ */ class ArrayToJSONSerializerTest extends PHPUnit_Framework_TestCase { - public function tearDown() - { - m::close(); - } - public function testSerializeArray() { $serializer = new ArrayToJSONSerializer(); diff --git a/tests/Elasticsearch/Tests/Serializers/EverythingToJSONSerializerTest.php b/tests/Elasticsearch/Tests/Serializers/EverythingToJSONSerializerTest.php index 5bcffc479..b2ecaccfb 100644 --- a/tests/Elasticsearch/Tests/Serializers/EverythingToJSONSerializerTest.php +++ b/tests/Elasticsearch/Tests/Serializers/EverythingToJSONSerializerTest.php @@ -4,7 +4,6 @@ use Elasticsearch\Serializers\EverythingToJSONSerializer; use PHPUnit_Framework_TestCase; -use Mockery as m; /** * Class EverythingToJSONSerializerTest @@ -12,11 +11,6 @@ */ class EverythingToJSONSerializerTest extends PHPUnit_Framework_TestCase { - public function tearDown() - { - m::close(); - } - public function testSerializeArray() { $serializer = new EverythingToJSONSerializer(); diff --git a/tests/Elasticsearch/Tests/YamlRunnerTest.php b/tests/Elasticsearch/Tests/YamlRunnerTest.php index 83dfa2d0d..15b1e0cc3 100644 --- a/tests/Elasticsearch/Tests/YamlRunnerTest.php +++ b/tests/Elasticsearch/Tests/YamlRunnerTest.php @@ -4,14 +4,16 @@ use Doctrine\Common\Inflector\Inflector; use Elasticsearch; -use Elasticsearch\Common\Exceptions\BadRequest400Exception; -use Elasticsearch\Common\Exceptions\Conflict409Exception; -use Elasticsearch\Common\Exceptions\Forbidden403Exception; -use Elasticsearch\Common\Exceptions\Missing404Exception; -use Elasticsearch\Common\Exceptions\RequestTimeout408Exception; -use Elasticsearch\Common\Exceptions\ServerErrorResponseException; -use Elasticsearch\Common\Exceptions\RoutingMissingException; -use GuzzleHttp\Ring\Future\FutureArrayInterface; +use Elasticsearch\Common\Exceptions\Http\BadRequest400Exception; +use Elasticsearch\Common\Exceptions\Http\Conflict409Exception; +use Elasticsearch\Common\Exceptions\Http\Forbidden403Exception; +use Elasticsearch\Common\Exceptions\Http\Missing404Exception; +use Elasticsearch\Common\Exceptions\Http\RequestTimeout408Exception; +use Elasticsearch\Common\Exceptions\Http\ServerErrorResponseException; +use Elasticsearch\Common\Exceptions\Http\RoutingMissingException; +use Http\Promise\Promise; +use Monolog\Handler\StreamHandler; +use Monolog\Logger; use Symfony\Component\Finder\Finder; use Symfony\Component\Finder\SplFileInfo; use Symfony\Component\Yaml\Exception\ParseException; @@ -94,7 +96,14 @@ public static function setUpBeforeClass() public function setUp() { $this->clean(); - $this->client = Elasticsearch\ClientBuilder::create()->setHosts([self::getHost()])->build(); + + $logger = new Logger('elasticsearch-debug'); + $logger->pushHandler(new StreamHandler('php://stderr')); + + $this->client = Elasticsearch\ClientBuilder::create() + ->setHosts([self::getHost()]) + //->setLogger($logger) + ->build(); } /** @@ -301,7 +310,7 @@ public function executeRequest($caller, $method, $endpointParams, $expectedError try { $response = $caller->$method($endpointParams); - while ($response instanceof FutureArrayInterface) { + if ($response instanceof Promise) { $response = $response->wait(); } @@ -332,10 +341,9 @@ public function executeRequest($caller, $method, $endpointParams, $expectedError public function executeAsyncExistRequest($caller, $method, $endpointParams, $expectedError, $testName) { try { - $response = $caller->$method($endpointParams); - while ($response instanceof FutureArrayInterface) { + while ($response instanceof Promise) { $response = $response->wait(); }