"""Provide a client for aiovlc."""
from __future__ import annotations
import asyncio
from types import TracebackType
from typing import Literal, Self
from .const import LOGGER
from .exceptions import ConnectError, ConnectReadError
from .model.command import (
Add,
Clear,
Enqueue,
GetLength,
GetLengthOutput,
GetTime,
GetTimeOutput,
Info,
InfoOutput,
Next,
Password,
PasswordOutput,
Pause,
Play,
Prev,
Random,
Seek,
SetVolume,
Status,
StatusOutput,
Stop,
Volume,
VolumeOutput,
)
IAC = bytes([255]) # "Interpret As Command"
TERMINATOR = "\n"
[docs]
class Client:
"""Represent a client for aiovlc."""
def __init__(
self,
password: str,
host: str = "localhost",
port: int = 4212,
timeout: int = 10,
) -> None:
"""Set up the client client."""
self.host = host
self.password = password
self.port = port
self.timeout = timeout
self.command_lock = asyncio.Lock()
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
async def __aenter__(self) -> Self:
"""Connect the client with context manager."""
await self.connect()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Disconnect the client with context manager."""
await self.disconnect()
[docs]
async def connect(self) -> None:
"""Connect the client."""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(
host=self.host,
port=self.port,
),
timeout=self.timeout,
)
except OSError as err:
raise ConnectError(f"Failed to connect: {err}") from err
[docs]
async def disconnect(self) -> None:
"""Disconnect the client."""
if self._writer is None:
raise RuntimeError("Client is not connected")
try:
self._writer.close()
await asyncio.wait_for(self._writer.wait_closed(), timeout=self.timeout)
except OSError:
pass
[docs]
async def read(self, read_until: str = TERMINATOR) -> str:
"""Return a decoded message."""
if self._reader is None:
raise RuntimeError("Client is not connected")
try:
read = await asyncio.wait_for(
self._reader.readuntil(read_until.encode("utf-8")),
timeout=self.timeout,
)
except asyncio.LimitOverrunError as err:
raise ConnectReadError(err) from err
except asyncio.IncompleteReadError as err:
raise ConnectReadError(err, err.partial) from err
except OSError as err:
raise ConnectError(f"Failed to read: {err}") from err
LOGGER.debug("Bytes read: %s", read)
# Drop IAC command and read again.
if IAC in read:
return await self.read(read_until)
return read.decode("utf-8")
[docs]
async def write(self, command: str) -> None:
"""Write a command to the connection."""
if self._writer is None:
raise RuntimeError("Client is not connected")
try:
self._writer.write(command.encode("utf-8"))
await asyncio.wait_for(self._writer.drain(), timeout=self.timeout)
except OSError as err:
raise ConnectError(f"Failed to write: {err}") from err
[docs]
async def login(self) -> PasswordOutput:
"""Login."""
return await Password(self.password).send(self)
[docs]
async def add(self, playlist_item: str) -> None:
"""Send the add command."""
await Add(playlist_item).send(self)
[docs]
async def clear(self) -> None:
"""Send the clear command."""
await Clear().send(self)
[docs]
async def enqueue(self, playlist_item: str) -> None:
"""Send the enqueue command."""
await Enqueue(playlist_item).send(self)
[docs]
async def get_length(self) -> GetLengthOutput:
"""Send the get_length command."""
return await GetLength().send(self)
[docs]
async def get_time(self) -> GetTimeOutput:
"""Send the get_time command."""
return await GetTime().send(self)
[docs]
async def info(self) -> InfoOutput:
"""Send the info command."""
return await Info().send(self)
[docs]
async def next(self) -> None:
"""Send the next command."""
await Next().send(self)
[docs]
async def pause(self) -> None:
"""Send the pause command."""
await Pause().send(self)
[docs]
async def play(self) -> None:
"""Send the play command."""
await Play().send(self)
[docs]
async def prev(self) -> None:
"""Send the prev command."""
await Prev().send(self)
[docs]
async def random(self, mode: Literal["on", "off"] | None = None) -> None:
"""Send the random command."""
await Random(mode).send(self)
[docs]
async def seek(self, seconds: int) -> None:
"""Send the seek command."""
await Seek(seconds).send(self)
[docs]
async def set_volume(self, volume: int) -> None:
"""Send the volume command with a parameter to set volume."""
await SetVolume(volume).send(self)
[docs]
async def status(self) -> StatusOutput:
"""Send the status command."""
return await Status().send(self)
[docs]
async def stop(self) -> None:
"""Send the stop command."""
await Stop().send(self)
[docs]
async def volume(self) -> VolumeOutput:
"""Send the volume command."""
return await Volume().send(self)