BytePlusのSeed SpeechのVoice Replicationを使って音声クローンを試す

初めに

以下の音声クローンを試していきます。音声クローン以外にもいくつか提供されている音声を使うこともできます

www.byteplus.com

開発環境

環境構築

uv venv -p 3.12
.\.venv\Scripts\activate

必要なライブラリをインストールします

uv pip install python-dotenv==1.0.0 requests

.envを作成

以下のような .env ファイルを作成します

# Application ID (BytePlusコンソールから取得)
BYTEPLUS_APP_ID=your_app_id_here

# Access Token (BytePlusコンソールから取得)
BYTEPLUS_ACCESS_TOKEN=your_access_token_here

# Speaker ID (音声クローニング用の一意なID、例: S_xxxxx)
BYTEPLUS_SPEAKER_ID=your_speaker_id_here

音声のアップロード

まずは音声をアップロードしていきます

BytePlusを使うためのコアスクリプトclient.py として作成をします

import os
import json
import base64
import time
import requests
from dotenv import load_dotenv

load_dotenv()


class VoiceCloneClient:
    def __init__(self):
        self.app_id = os.getenv('BYTEPLUS_APP_ID')
        self.access_token = os.getenv('BYTEPLUS_ACCESS_TOKEN')
        self.speaker_id = os.getenv('BYTEPLUS_SPEAKER_ID')
        self.base_domain = "voice.ap-southeast-1.bytepluses.com"

        if not all([self.app_id, self.access_token, self.speaker_id]):
            raise ValueError("環境変数が設定されていません")

        self.session = requests.Session()

    def upload_audio(self, audio_file_path, text=None, model_type=1, language=2):
        """音声アップロード・トレーニング開始"""
        url = f"https://{self.base_domain}/api/v1/mega_tts/audio/upload"

        if not os.path.exists(audio_file_path):
            print(f"エラー: ファイルが見つかりません")
            return False

        with open(audio_file_path, 'rb') as f:
            audio_bytes = f.read()

        file_ext = os.path.splitext(audio_file_path)[1][1:].lower()
        audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')

        headers = {
            'Authorization': f'Bearer;{self.access_token}',
            'Resource-Id': 'volc.megatts.voiceclone',
            'Content-Type': 'application/json'
        }

        body = {
            "appid": self.app_id,
            "speaker_id": self.speaker_id,
            "audios": [{"audio_bytes": audio_base64, "audio_format": file_ext if file_ext != "wav" else None}],
            "source": 2,
            "language": language,
            "model_type": model_type,
            "extra_params": json.dumps({"voice_clone_denoise_model_id": "SpeechInpaintingV2", "voice_clone_enable_mss": False})
        }

        if text:
            body["audios"][0]["text"] = text

        try:
            print(f"音声アップロード中...")
            response = self.session.post(url, headers=headers, json=body, timeout=60)

            if response.status_code == 200:
                result = response.json()
                if result.get('BaseResp', {}).get('StatusCode') == 0:
                    print(f"アップロード成功")
                    return True
                else:
                    print(f"エラー: {result.get('BaseResp', {}).get('StatusMessage')}")
                    return False
            else:
                print(f"HTTPエラー: {response.status_code}")
                print(f"レスポンス: {response.text}")
                return False

        except Exception as e:
            print(f"エラー: {e}")
            return False

    def check_status(self, wait=False, max_wait_time=300, poll_interval=5):
        """トレーニングステータス確認"""
        url = f"https://{self.base_domain}/api/v1/mega_tts/status"
        headers = {
            'Authorization': f'Bearer;{self.access_token}',
            'Resource-Id': 'volc.megatts.voiceclone',
            'Content-Type': 'application/json'
        }
        body = {"appid": self.app_id, "speaker_id": self.speaker_id}

        start_time = time.time()

        while True:
            try:
                response = self.session.post(url, headers=headers, json=body, timeout=30)

                if response.status_code == 200:
                    result = response.json()
                    if result.get('BaseResp', {}).get('StatusCode') == 0:
                        status = result.get('status')

                        # 2=Success, 4=Active: トレーニング完了
                        if status in [2, 4]:
                            return result
                        # 3=Failed: 失敗
                        elif status == 3:
                            print("トレーニング失敗")
                            return result
                        # 1=Training: トレーニング中
                        elif status == 1:
                            if not wait:
                                return result
                            if time.time() - start_time > max_wait_time:
                                print("タイムアウト")
                                return result
                            print(f"トレーニング中...")
                            time.sleep(poll_interval)
                            continue
                        # 0=NotFound
                        else:
                            print("音声データが見つかりません")
                            return result
                    else:
                        print(f"エラー: {result.get('BaseResp', {}).get('StatusMessage')}")
                        return None
                else:
                    print(f"HTTPエラー: {response.status_code}")
                    return None

            except Exception as e:
                print(f"エラー: {e}")
                return None

    def clone_tts(self, text, output_file="output.mp3", format="mp3", sample_rate=24000):
        """クローン音声でTTS実行"""
        status_info = self.check_status()
        if not status_info or status_info.get('status') not in [2, 4]:
            print("エラー: トレーニング未完了")
            return False

        url = f"https://{self.base_domain}/api/v3/tts/unidirectional"
        headers = {
            "X-Api-App-Id": self.app_id,
            "X-Api-Access-Key": self.access_token,
            "X-Api-Resource-Id": "volc.megatts.default",
            "X-Api-Request-Id": f"{int(time.time() * 1000)}",
            "Content-Type": "application/json"
        }
        payload = {
            "req_params": {
                "text": text,
                "speaker": self.speaker_id,
                "additions": json.dumps({"disable_markdown_filter": True, "enable_language_detector": True}),
                "audio_params": {"format": format, "sample_rate": sample_rate, "speech_rate": 0, "loudness_rate": 0}
            }
        }

        try:
            response = self.session.post(url, headers=headers, json=payload, stream=True, timeout=120)

            if response.status_code != 200:
                print(f"HTTPエラー: {response.status_code}")
                return False

            audio_chunks = []
            for line in response.iter_lines():
                if line:
                    try:
                        data = json.loads(line.decode('utf-8'))
                        if data.get('code') == 20000000:
                            break
                        elif data.get('code') != 0 and data.get('code') is not None:
                            print(f"エラー: {data.get('message')}")
                            return False
                        if data.get('data'):
                            audio_chunks.append(base64.b64decode(data['data']))
                    except:
                        continue

            if audio_chunks:
                with open(output_file, 'wb') as f:
                    f.write(b''.join(audio_chunks))
                print(f"保存完了: {output_file}")
                return True
            else:
                print("音声データ受信失敗")
                return False

        except Exception as e:
            print(f"エラー: {e}")
            return False

    def __del__(self):
        if hasattr(self, 'session'):
            self.session.close()

次に 音声をアップロードするスクリプトupload.py として作成をします

import os
from client import VoiceCloneClient

def main():
    # クライアント作成
    client = VoiceCloneClient()
    print(f"Speaker ID: {client.speaker_id}\n")

    # 音声ファイル
    audio_file = "EMOTION100_003.wav"

    if not os.path.exists(audio_file):
        print(f"エラー: {audio_file}が見つかりません")
        return

    # アップロード
    print("音声アップロード開始")
    if not client.upload_audio(
        audio_file_path=audio_file,
        model_type=1,  # 日本語対応
        language=2     # 日本語
    ):
        print("アップロード失敗")
        return

    # トレーニング完了待機
    print("\nトレーニング完了待機中...")
    status_info = client.check_status(wait=True, max_wait_time=300, poll_interval=5)

    if status_info and status_info.get('status') in [2, 4]:
        print(f"\nトレーニング完了!")
        print(f"Speaker ID: {client.speaker_id}")
        print(f"バージョン: {status_info.get('version')}")

        # テスト音声生成
        print("\nテスト音声生成中...")
        test_text = "こんにちは、新しく学習した音声のテストです。"
        if client.clone_tts(text=test_text, output_file="test_output.mp3"):
            print("テスト成功!test_output.mp3を確認してください")
        else:
            print("テスト失敗")
    else:
        print("トレーニング失敗")


if __name__ == "__main__":
    main()

以下を実行して音声をアップロードします

uv run upload.py

**音声ファイル名は audio_file = で指定します

学習されたモデルで推論を実行

先ほどアップロードした音声ファイルでモデルができました。こちらを使って推論をしていきます

音声を推論するためのスクリプトgenerate.py として作成をします

import time
from client import VoiceCloneClient


def main():
    client = VoiceCloneClient()

    # ステータス確認
    status_info = client.check_status()
    if not status_info or status_info.get('status') not in [2, 4]:
        print("エラー: トレーニング未完了")
        return

    # 日本語テキスト
    test_texts = [
        "こんにちは、これはクローンされた音声のテストです。",
        "BytePlusの音声クローニング機能を使用しています。",
        "今日はとても良い天気ですね。",
        "日本語の音声合成がうまく動作しているか確認します。",
        "最後のテストです。ありがとうございました。"
    ]

    print(f"\n日本語音声生成開始 ({len(test_texts)}件)\n")

    for i, text in enumerate(test_texts, 1):
        print(f"[{i}/{len(test_texts)}] {text}")
        output_file = f"japanese_output_{i}.mp3"

        if client.clone_tts(text=text, output_file=output_file):
            print(f"OK: {output_file}\n")
        else:
            print(f"NG: 失敗\n")

        if i < len(test_texts):
            time.sleep(3)

    print("完了")


if __name__ == "__main__":
    main()

以下で実行することで、5つのファイルが作成されます

uv run generate.py