audiooutput.py

"""PySide6 port of the multimedia/audiooutput example from Qt v5.x, originating from PyQt"""

from math import pi, sin
from struct import pack

from PySide6.QtCore import QByteArray, QIODevice, Qt, QTimer, qWarning
from PySide6.QtMultimedia import (QAudio, QAudioDeviceInfo, QAudioFormat,
        QAudioOutput)
from PySide6.QtWidgets import (QApplication, QComboBox, QHBoxLayout, QLabel,
        QMainWindow, QPushButton, QSlider, QVBoxLayout, QWidget)


class Generator(QIODevice):

    def __init__(self, format, durationUs, sampleRate, parent):
        super(Generator, self).__init__(parent)

        self.m_pos = 0
        self.m_buffer = QByteArray()

        self.generateData(format, durationUs, sampleRate)

    def start(self):
        self.open(QIODevice.ReadOnly)

    def stop(self):
        self.m_pos = 0
        self.close()

    def generateData(self, format, durationUs, sampleRate):
        pack_format = ''

        if format.sampleSize() == 8:
            if format.sampleType() == QAudioFormat.UnSignedInt:
                scaler = lambda x: ((1.0 + x) / 2 * 255)
                pack_format = 'B'
            elif format.sampleType() == QAudioFormat.SignedInt:
                scaler = lambda x: x * 127
                pack_format = 'b'
        elif format.sampleSize() == 16:
            if format.sampleType() == QAudioFormat.UnSignedInt:
                scaler = lambda x: (1.0 + x) / 2 * 65535
                pack_format = '<H' if format.byteOrder() == QAudioFormat.LittleEndian else '>H'
            elif format.sampleType() == QAudioFormat.SignedInt:
                scaler = lambda x: x * 32767
                pack_format = '<h' if format.byteOrder() == QAudioFormat.LittleEndian else '>h'

        assert(pack_format != '')

        channelBytes = format.sampleSize() // 8
        sampleBytes = format.channelCount() * channelBytes

        length = (format.sampleRate() * format.channelCount() * (format.sampleSize() // 8)) * durationUs // 100000

        self.m_buffer.clear()
        sampleIndex = 0
        factor = 2 * pi * sampleRate / format.sampleRate()

        while length != 0:
            x = sin((sampleIndex % format.sampleRate()) * factor)
            packed = pack(pack_format, int(scaler(x)))

            for _ in range(format.channelCount()):
                self.m_buffer.append(packed)
                length -= channelBytes

            sampleIndex += 1

    def readData(self, maxlen):
        data = QByteArray()
        total = 0

        while maxlen > total:
            chunk = min(self.m_buffer.size() - self.m_pos, maxlen - total)
            data.append(self.m_buffer.mid(self.m_pos, chunk))
            self.m_pos = (self.m_pos + chunk) % self.m_buffer.size()
            total += chunk

        return data.data()

    def writeData(self, data):
        return 0

    def bytesAvailable(self):
        return self.m_buffer.size() + super(Generator, self).bytesAvailable()


class AudioTest(QMainWindow):

    PUSH_MODE_LABEL = "Enable push mode"
    PULL_MODE_LABEL = "Enable pull mode"
    SUSPEND_LABEL = "Suspend playback"
    RESUME_LABEL = "Resume playback"

    DurationSeconds = 1
    ToneSampleRateHz = 600
    DataSampleRateHz = 44100

    def __init__(self):
        super(AudioTest, self).__init__()

        self.m_device = QAudioDeviceInfo.defaultOutputDevice()
        self.m_output = None

        self.initializeWindow()
        self.initializeAudio()

    def initializeWindow(self):
        layout = QVBoxLayout()

        self.m_deviceBox = QComboBox()
        self.m_deviceBox.activated[int].connect(self.deviceChanged)
        for deviceInfo in QAudioDeviceInfo.availableDevices(QAudio.AudioOutput):
            self.m_deviceBox.addItem(deviceInfo.deviceName(), deviceInfo)

        layout.addWidget(self.m_deviceBox)

        self.m_modeButton = QPushButton()
        self.m_modeButton.clicked.connect(self.toggleMode)
        self.m_modeButton.setText(self.PUSH_MODE_LABEL)

        layout.addWidget(self.m_modeButton)

        self.m_suspendResumeButton = QPushButton(
                clicked=self.toggleSuspendResume)
        self.m_suspendResumeButton.setText(self.SUSPEND_LABEL)

        layout.addWidget(self.m_suspendResumeButton)

        volumeBox = QHBoxLayout()
        volumeLabel = QLabel("Volume:")
        self.m_volumeSlider = QSlider(Qt.Horizontal, minimum=0, maximum=100,
                singleStep=10)
        self.m_volumeSlider.valueChanged.connect(self.volumeChanged)

        volumeBox.addWidget(volumeLabel)
        volumeBox.addWidget(self.m_volumeSlider)

        layout.addLayout(volumeBox)

        window = QWidget()
        window.setLayout(layout)

        self.setCentralWidget(window)

    def initializeAudio(self):
        self.m_pullTimer = QTimer(self)
        self.m_pullTimer.timeout.connect(self.pullTimerExpired)
        self.m_pullMode = True

        self.m_format = QAudioFormat()
        self.m_format.setSampleRate(self.DataSampleRateHz)
        self.m_format.setChannelCount(1)
        self.m_format.setSampleSize(16)
        self.m_format.setCodec('audio/pcm')
        self.m_format.setByteOrder(QAudioFormat.LittleEndian)
        self.m_format.setSampleType(QAudioFormat.SignedInt)

        info = QAudioDeviceInfo(QAudioDeviceInfo.defaultOutputDevice())
        if not info.isFormatSupported(self.m_format):
            qWarning("Default format not supported - trying to use nearest")
            self.m_format = info.nearestFormat(self.m_format)

        self.m_generator = Generator(self.m_format,
                self.DurationSeconds * 1000000, self.ToneSampleRateHz, self)

        self.createAudioOutput()

    def createAudioOutput(self):
        self.m_audioOutput = QAudioOutput(self.m_device, self.m_format)
        self.m_audioOutput.notify.connect(self.notified)
        self.m_audioOutput.stateChanged.connect(self.handleStateChanged)

        self.m_generator.start()
        self.m_audioOutput.start(self.m_generator)
        self.m_volumeSlider.setValue(self.m_audioOutput.volume() * 100)

    def deviceChanged(self, index):
        self.m_pullTimer.stop()
        self.m_generator.stop()
        self.m_audioOutput.stop()
        self.m_device = self.m_deviceBox.itemData(index)

        self.createAudioOutput()

    def volumeChanged(self, value):
        if self.m_audioOutput is not None:
            self.m_audioOutput.setVolume(value / 100.0)

    def notified(self):
        qWarning("bytesFree = %d, elapsedUSecs = %d, processedUSecs = %d" % (
                self.m_audioOutput.bytesFree(),
                self.m_audioOutput.elapsedUSecs(),
                self.m_audioOutput.processedUSecs()))

    def pullTimerExpired(self):
        if self.m_audioOutput is not None and self.m_audioOutput.state() != QAudio.StoppedState:
            chunks = self.m_audioOutput.bytesFree() // self.m_audioOutput.periodSize()
            for _ in range(chunks):
                data = self.m_generator.read(self.m_audioOutput.periodSize())
                if data is None or len(data) != self.m_audioOutput.periodSize():
                    break

                self.m_output.write(data)

    def toggleMode(self):
        self.m_pullTimer.stop()
        self.m_audioOutput.stop()

        if self.m_pullMode:
            self.m_modeButton.setText(self.PULL_MODE_LABEL)
            self.m_output = self.m_audioOutput.start()
            self.m_pullMode = False
            self.m_pullTimer.start(20)
        else:
            self.m_modeButton.setText(self.PUSH_MODE_LABEL)
            self.m_pullMode = True
            self.m_audioOutput.start(self.m_generator)

        self.m_suspendResumeButton.setText(self.SUSPEND_LABEL)

    def toggleSuspendResume(self):
        if self.m_audioOutput.state() == QAudio.SuspendedState:
            qWarning("status: Suspended, resume()")
            self.m_audioOutput.resume()
            self.m_suspendResumeButton.setText(self.SUSPEND_LABEL)
        elif self.m_audioOutput.state() == QAudio.ActiveState:
            qWarning("status: Active, suspend()")
            self.m_audioOutput.suspend()
            self.m_suspendResumeButton.setText(self.RESUME_LABEL)
        elif self.m_audioOutput.state() == QAudio.StoppedState:
            qWarning("status: Stopped, resume()")
            self.m_audioOutput.resume()
            self.m_suspendResumeButton.setText(self.SUSPEND_LABEL)
        elif self.m_audioOutput.state() == QAudio.IdleState:
            qWarning("status: IdleState")

    stateMap = {
        QAudio.ActiveState: "ActiveState",
        QAudio.SuspendedState: "SuspendedState",
        QAudio.StoppedState: "StoppedState",
        QAudio.IdleState: "IdleState"}

    def handleStateChanged(self, state):
        qWarning("state = " + self.stateMap.get(state, "Unknown"))


if __name__ == '__main__':

    import sys

    app = QApplication(sys.argv)
    app.setApplicationName("Audio Output Test")

    audio = AudioTest()
    audio.show()

    sys.exit(app.exec_())