/* * Copyright 2022 Sangoma Technologies Corporation * Kevin Harwell * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ const { Writable, Transform, PassThrough, } = require('stream'); const GoogleSpeech = require('@google-cloud/speech'); const { TranscribeStreamingClient, StartStreamTranscriptionCommand, } = require('@aws-sdk/client-transcribe-streaming'); const { config } = require('process'); /* * For speech provider implementer. * * Basic Provider public interface: * * function setConfig(config) - sets configuration used by recognize stream * function start(config) - starts the recognize stream * function restart(config) - restarts the recognize stream * function end() - stops recognize and writable stream * function write(data) - writes data to the writable stream * event result(result) - triggered when a result is received from provider * field results[] - cache of received results (oldest to newest) * * Basic result object public interface: * * result = { * text: * score: * }; */ /* * Google Speech API: * https://googleapis.dev/nodejs/speech/latest/ * * Google infinite streaming speech example: * https://cloud.google.com/speech-to-text/docs/samples/speech-transcribe-infinite-streaming * * Nodejs stream API: * https://nodejs.org/api/stream.html */ // const encoding = 'Encoding of the audio file, e.g. LINEAR16'; // const sampleRateHertz = 16000; // const languageCode = 'BCP-47 language code with regional subtags, e.g. en-US'; // const limit = 10000; // ms - set to low number for demo purposes const DEFAULT_ENCODING = "MULAW"; const DEFAULT_SAMPLE_RATE = 8000; const DEFAULT_LANGUAGE = "en-US"; const DEFAULT_RESTART_TIME = 10; // in seconds const DEFAULT_MAX_RESULTS = 100; /** * @class GoogleProvider. * * Start, restart, and stop Google speech to text recognition. Results are * emitted via a "result" event that is passed the following object: * * result = { * text: * score: * }; * * @extends Writable */ class GoogleProvider extends Writable { /* Mapped encodings supported by Google */ static encodings = { ulaw: "MULAW", slin16: "LINEAR16", opus: "OGG Opus", }; /* Languages this provider supports */ static languages = [ "en-US", ]; /** * Creates an instance of a Google provider stream. * * @param {Object} [options] - provider specific options * @param {Object} [options.restartTime=10] - If specified auto-restart * recognition stream after a given interval (in seconds) * @param {Object} [options.maxResults=100] - The maximum number of results * to cache before results are dropped (oldest dropped first) */ constructor(options) { super(); this.config = { encoding: DEFAULT_ENCODING, sampleRateHertz: DEFAULT_SAMPLE_RATE, languageCode: DEFAULT_LANGUAGE, }; this.restartTimer = null; this.restartTimeout = options && options.restartTime || DEFAULT_RESTART_TIME; this.maxResults = options && options.maxResults || DEFAULT_MAX_RESULTS; this.results = []; this.recognizeStream = null; } _construct(callback) { this.client = new GoogleSpeech.SpeechClient(); callback(); } _write(chunk, encoding, callback) { if (this.recognizeStream) { this.recognizeStream.write(chunk); } callback(); } _writev(chunks, callback) { for (let chunk in chunks) { this._write(chunk, null, callback); } callback(); } _final(callback) { this.stop(); this.client.close(); callback(); } /** * Sets the configuration to use on the recognition stream. * * @param {Object} [config] - configuration to set * @param {Object} [config.codec] - the codec to map to an encoding * @param {string} [config.language] - the language to use */ setConfig(config) { if (!config) { return; } let update = {}; if (config.codec) { if (!(config.codec.name in GoogleProvider.encodings)) { throw new Error("Codec '" + config.codec.name + " 'not supported"); } update.encodingencoding = GoogleProvider.encodings[config.codec.name]; update.sampleRateHertz = config.codec.sampleRate; } if (config.language) { if (!GoogleProvider.languages.includes(config.language)) { throw new Error("Language '" + config.language + " 'not supported"); } update.languageCode = config.language; } this.config = {...this.config, ...update}; } /** * Starts the recognition stream. * * @param {Object} [config] - configuration to use * @param {Object} [config.codec] - the codec to map to an encoding * @param {string} [config.language] - the language to use */ start(config) { if (this.recognizeStream) { return; // Already started } this.setConfig(config); config = this.config; const request = { config, interimResults: true, }; this.recognizeStream = this.client .streamingRecognize(request) .on('error', (e) => { console.error("GoogleProvider: " + e + " - ending stream"); this.end(); }) .on('data', (response) => { if (response.results[0] && response.results[0].alternatives[0]) { if (response.results[0].alternatives[0].confidence == 0) { return; } let result = { text: response.results[0].alternatives[0].transcript, score: Math.round(response.results[0].alternatives[0].confidence * 100), }; console.debug("GoogleProvider: result: " + JSON.stringify(result)); this.emit('result', result); if (this.results.length == this.maxResults) { this.results.shift(); } this.results.push(result); } else { // stream limit reached restart? console.debug("GoogleProvider: received response, but no result"); } }); if (this.restartTimeout) { /* * Google's speech engine may stop transcribing after a while, * so restart the recognize stream after a specified interval. */ this.restartTimer = setTimeout(() => this.restart(), this.restartTimeout * 1000); } while (this.writableCorked) { this.uncork(); } } /** * Stops the recognition stream. */ stop() { if (this.restartTimer) { clearInterval(this.restartTimer); this.restartTimer = null; } if (!this.recognizeStream) { return; } this.cork(); // Buffer any incoming data this.recognizeStream.end(); this.recognizeStream = null; } /** * Restarts the recognition stream. * * @param {Object} [config] - configuration to use * @param {Object} [config.codec] - the codec to map to an encoding * @param {string} [config.language] - the language to use */ restart(config) { this.stop(); this.start(config); } }; // class AWSProvider extends Writable { // constructor(options) { // super(); // this.LanguageCode = "en-GB"; // this.MediaEncoding = "pcm"; // this.credentials = { // accessKeyId: process.env.AWS_ACCESS_KEY_ID, // secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, // } // this.results = []; // this.stream = new TransformStream(); // this.recognizeStream = this.stream.writable.getWriter(); // (async () => { // this.readStream = this.stream.readable; // this.config = { // LanguageCode: this.LanguageCode, // MediaEncoding: this.MediaEncoding, // MediaSampleRateHertz: "8000", // AudioStream: (async function* () { // for await (const chunk of this.stream.readable.read()) { // yield {AudioEvent: {AudioChunk: chunk}}; // } // // for (let result = await this.readStream.read(); !result.done; result = await this.readStream.read()) { // // return {AudioEvent: {AudioChunk: result}}; // // } // })(), // // AudioStream: this.readStream.read(), // } // }) // this.command = null; // } // _construct(callback) { // this.client = new TranscribeStreamingClient({ // region: "eu-west-2", // credentials: this.credentials // }); // callback(); // } // _write(chunk, encoding, callback) { // if (this.recognizeStream) { // console.debug("writing chunk"); // this.recognizeStream.write(chunk); // } // callback(); // } // setConfig(config) { // if (!config) { // return; // } // } // start(config) { // if (this.command) { // return; // } // this.setConfig(config); // config = this.config; // // this.config.AudioStream = this.recognizeStream; // this.command = new StartStreamTranscriptionCommand(this.config) // console.debug("command created") // this.client.send(this.command).then((response) => { // print("response from command"); // console.debug(response) // }).catch((err) => { // console.log("error in command"); // console.debug(err); // }) // // (async function() { // // // console.debug("start async") // // this.results = await this.client.send(this.command); // // // .then(async (response) => { // // // console.debug(JSON.stringify(response)); // // // }).catch(err => { // // // console.debug("error"); // // // console.debug(err); // // // }); // // try { // // // console.debug("event"); // // for await (const event of this.results.TranscriptResultStream) { // // console.log(JSON.stringify(event)); // // } // // } catch (err) { // // console.log("error") // // console.log(err) // // } // // })(); // // console.debug("AWSProvider"); // } // restart(config) { // this.start(config); // } // } class AWSProvider extends Writable { constructor(options) { super(); this.LanguageCode = "en-GB"; this.MediaEncoding = "pcm"; this.credentials = { accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, } console.debug(process.env.AWS_SECRET_KEY_ID) this.stream = new TransformStream(); this.readStream = this.stream.readable.getReader(); this.writeStream = this.stream.writable; this.recognizeStream = null; console.log(this.writeStream); } _construct(callback) { this.client = new TranscribeStreamingClient({ region: "eu-west-2", credentials: this.credentials }); callback(); } _write(chunk, encoding, callback) { this.recognizeStream.write(chunk); // this.readStream.read().then((res) => { // console.log(res) // }) callback(); } _writev(chunks, callback) { for (let chunk in chunks) { this._write(chunk, null, callback); } callback(); } _final(callback) { this.stop(); callback(); } start(config) { // this.setConfig(config); // config = this.config; console.log("START"); this.recognizeStream = this.writeStream.getWriter(); // const passthrough = new PassThrough(); // this.readStream.pipe(passthrough); const readStream = this.readStream; async function* audioSource() { // await readStream.start(); while (readStream.ends !== true) { const chunk = await readStream.read(); yield chunk; } } async function* audioStream() { for await (const chunk of audioSource()) { console.debug("CHUNKING"); yield {AudioEvent: {AudioChunk: chunk.value}}; } } audioStream().next().then(res => console.debug(res.value.AudioEvent.AudioChunk)); // console.debug('AUDIO'); // this.audioStream().next().then(res => console.debug(res)); // this.audioStream().next().then(res => console.debug(res)); this.param = { LanguageCode: this.LanguageCode, MediaEncoding: this.MediaEncoding, MediaSampleRateHertz: 8000, AudioStream: audioStream(), } const command = new StartStreamTranscriptionCommand(this.param); this.client.send(command).then(async (res) => { for await (const event of res.TranscriptResultStream) { console.debug(event); }; }) return; (async () => { console.log("STARTED"); }) } stop() { if(!this.recognizeStream) { return; } // this.recognizeStream.close(); // console.log(this.recognizeStream); console.log("End of stream"); // return; } restart(config) { this.stop() this.start(config) } } /** * Gets a speech provider * * @param {string} name - A speech provider name * @param {Object} options - Provider specific options * @return A speech provider. */ function getProvider(name, options) { if (name == "google") { return new GoogleProvider(options); } if (name == "aws") { return new AWSProvider(options); } throw new Error("Unsupported speech provider '" + name + "'"); } module.exports = { getProvider, }