Source code for aiovlc.client

"""Provide a client for aiovlc."""

from __future__ import annotations

import asyncio
from types import TracebackType
from typing import Literal, Self

from .const import LOGGER
from .exceptions import ConnectError, ConnectReadError
from .model.command import (
    Add,
    Clear,
    Enqueue,
    GetLength,
    GetLengthOutput,
    GetTime,
    GetTimeOutput,
    Info,
    InfoOutput,
    Next,
    Password,
    PasswordOutput,
    Pause,
    Play,
    Prev,
    Random,
    Seek,
    SetVolume,
    Status,
    StatusOutput,
    Stop,
    Volume,
    VolumeOutput,
)

IAC = bytes([255])  # "Interpret As Command"
TERMINATOR = "\n"


[docs] class Client: """Represent a client for aiovlc.""" def __init__( self, password: str, host: str = "localhost", port: int = 4212, timeout: int = 10, ) -> None: """Set up the client client.""" self.host = host self.password = password self.port = port self.timeout = timeout self.command_lock = asyncio.Lock() self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None async def __aenter__(self) -> Self: """Connect the client with context manager.""" await self.connect() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: """Disconnect the client with context manager.""" await self.disconnect()
[docs] async def connect(self) -> None: """Connect the client.""" try: self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection( host=self.host, port=self.port, ), timeout=self.timeout, ) except OSError as err: raise ConnectError(f"Failed to connect: {err}") from err
[docs] async def disconnect(self) -> None: """Disconnect the client.""" if self._writer is None: raise RuntimeError("Client is not connected") try: self._writer.close() await asyncio.wait_for(self._writer.wait_closed(), timeout=self.timeout) except OSError: pass
[docs] async def read(self, read_until: str = TERMINATOR) -> str: """Return a decoded message.""" if self._reader is None: raise RuntimeError("Client is not connected") try: read = await asyncio.wait_for( self._reader.readuntil(read_until.encode("utf-8")), timeout=self.timeout, ) except asyncio.LimitOverrunError as err: raise ConnectReadError(err) from err except asyncio.IncompleteReadError as err: raise ConnectReadError(err, err.partial) from err except OSError as err: raise ConnectError(f"Failed to read: {err}") from err LOGGER.debug("Bytes read: %s", read) # Drop IAC command and read again. if IAC in read: return await self.read(read_until) return read.decode("utf-8")
[docs] async def write(self, command: str) -> None: """Write a command to the connection.""" if self._writer is None: raise RuntimeError("Client is not connected") try: self._writer.write(command.encode("utf-8")) await asyncio.wait_for(self._writer.drain(), timeout=self.timeout) except OSError as err: raise ConnectError(f"Failed to write: {err}") from err
[docs] async def login(self) -> PasswordOutput: """Login.""" return await Password(self.password).send(self)
[docs] async def add(self, playlist_item: str) -> None: """Send the add command.""" await Add(playlist_item).send(self)
[docs] async def clear(self) -> None: """Send the clear command.""" await Clear().send(self)
[docs] async def enqueue(self, playlist_item: str) -> None: """Send the enqueue command.""" await Enqueue(playlist_item).send(self)
[docs] async def get_length(self) -> GetLengthOutput: """Send the get_length command.""" return await GetLength().send(self)
[docs] async def get_time(self) -> GetTimeOutput: """Send the get_time command.""" return await GetTime().send(self)
[docs] async def info(self) -> InfoOutput: """Send the info command.""" return await Info().send(self)
[docs] async def next(self) -> None: """Send the next command.""" await Next().send(self)
[docs] async def pause(self) -> None: """Send the pause command.""" await Pause().send(self)
[docs] async def play(self) -> None: """Send the play command.""" await Play().send(self)
[docs] async def prev(self) -> None: """Send the prev command.""" await Prev().send(self)
[docs] async def random(self, mode: Literal["on", "off"] | None = None) -> None: """Send the random command.""" await Random(mode).send(self)
[docs] async def seek(self, seconds: int) -> None: """Send the seek command.""" await Seek(seconds).send(self)
[docs] async def set_volume(self, volume: int) -> None: """Send the volume command with a parameter to set volume.""" await SetVolume(volume).send(self)
[docs] async def status(self) -> StatusOutput: """Send the status command.""" return await Status().send(self)
[docs] async def stop(self) -> None: """Send the stop command.""" await Stop().send(self)
[docs] async def volume(self) -> VolumeOutput: """Send the volume command.""" return await Volume().send(self)