from __future__ import annotations
from datetime import datetime
from time import sleep
from typing import Any, MutableMapping, Optional
from urllib import parse
from meilisearch._httprequests import HttpRequests
from meilisearch.config import Config
from meilisearch.errors import MeilisearchTimeoutError
from meilisearch.models.task import Task, TaskInfo, TaskResults
[docs]
class TaskHandler:
"""
A class covering the Meilisearch Task API
The task class gives access to all task routes and gives information about the progress of asynchronous operations.
https://www.meilisearch.com/docs/reference/api/tasks
"""
def __init__(self, config: Config):
"""Parameters
----------
config: Config object containing permission and location of Meilisearch.
"""
self.config = config
self.http = HttpRequests(config)
[docs]
def get_tasks(self, parameters: Optional[MutableMapping[str, Any]] = None) -> TaskResults:
"""Get all tasks.
Parameters
----------
parameters (optional):
parameters accepted by the get tasks route: https://www.meilisearch.com/docs/reference/api/tasks#get-tasks.
Returns
-------
task:
TaskResults instance contining limit, from, next and results containing a list of all
enqueued, processing, succeeded or failed tasks.
Raises
------
MeilisearchApiError
An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors
"""
if parameters is None:
parameters = {}
for param in parameters:
if isinstance(parameters[param], list):
parameters[param] = ",".join(parameters[param])
tasks = self.http.get(f"{self.config.paths.task}?{parse.urlencode(parameters)}")
return TaskResults(tasks)
[docs]
def get_task(self, uid: int) -> Task:
"""Get one task.
Parameters
----------
uid:
Identifier of the task.
Returns
-------
task:
Task instance containing information about the processed asynchronous task.
Raises
------
MeilisearchApiError
An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors
"""
task = self.http.get(f"{self.config.paths.task}/{uid}")
return Task(**task)
[docs]
def cancel_tasks(self, parameters: MutableMapping[str, Any]) -> TaskInfo:
"""Cancel a list of enqueued or processing tasks.
Parameters
----------
parameters:
parameters accepted by the cancel tasks https://www.meilisearch.com/docs/reference/api/tasks#cancel-task.
Returns
-------
task_info:
TaskInfo instance containing information about a task to track the progress of an asynchronous process.
https://www.meilisearch.com/docs/reference/api/tasks#get-one-task
Raises
------
MeilisearchApiError
An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors
"""
for param in parameters:
if isinstance(parameters[param], list):
parameters[param] = ",".join(parameters[param])
response = self.http.post(f"{self.config.paths.task}/cancel?{parse.urlencode(parameters)}")
return TaskInfo(**response)
[docs]
def delete_tasks(self, parameters: MutableMapping[str, Any]) -> TaskInfo:
"""Delete a list of enqueued or processing tasks.
Parameters
----------
config:
Config object containing permission and location of Meilisearch.
parameters:
parameters accepted by the delete tasks route:https://www.meilisearch.com/docs/reference/api/tasks#delete-task.
Returns
-------
task_info:
TaskInfo instance containing information about a task to track the progress of an asynchronous process.
https://www.meilisearch.com/docs/reference/api/tasks#get-one-task
Raises
------
MeilisearchApiError
An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors
"""
for param in parameters:
if isinstance(parameters[param], list):
parameters[param] = ",".join(parameters[param])
response = self.http.delete(f"{self.config.paths.task}?{parse.urlencode(parameters)}")
return TaskInfo(**response)
[docs]
def wait_for_task(
self,
uid: int,
timeout_in_ms: int = 5000,
interval_in_ms: int = 50,
) -> Task:
"""Wait until the task fails or succeeds in Meilisearch.
Parameters
----------
uid:
Identifier of the task to wait for being processed.
timeout_in_ms (optional):
Time the method should wait before raising a MeilisearchTimeoutError.
interval_in_ms (optional):
Time interval the method should wait (sleep) between requests.
Returns
-------
task:
Task instance containing information about the processed asynchronous task.
Raises
------
MeilisearchTimeoutError
An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors
"""
start_time = datetime.now()
elapsed_time = 0.0
while elapsed_time < timeout_in_ms:
task = self.get_task(uid)
if task.status not in ("enqueued", "processing"):
return task
sleep(interval_in_ms / 1000)
time_delta = datetime.now() - start_time
elapsed_time = time_delta.seconds * 1000 + time_delta.microseconds / 1000
raise MeilisearchTimeoutError(
f"timeout of ${timeout_in_ms}ms has exceeded on process ${uid} when waiting for task to be resolve."
)