Salesforce

Snowflake integration

« Go Back
Information
Snowflake integration
000004067
Public
Product Selection
aiWare - aiWare
Article Details

Snowflake tools unlock the power of Veritone's scalable, real-time cognitive capabilities, opening up new possibilities for customers who need to automate their business solutions while solving data-heavy problems using AI. The steps to integrate Snowflake are below.

Set up Automate Studio

  1. Log in to https://automate.veritone.com/
  2. Create a new flow
  3. Import the sample code below
  4. Click Deploy
  5. Click on the "aiware-in" node
  6. Copy the deployed url for use later
  7. Obtain an SSO token (see our token-based authentication instructions)

Automate Studio Sample Code

[{"id":"2f4d82e6.fef49e","type":"create-job-graphql","z":"2fa59aa3.f3dd56","name":"","waitForResults":true,"isRawChunk":false,"clusterId":"prd5-21xbyq0x-4h0s-o685-snas-oovhdai552v9","chunkSizeInSeconds":"900","inputUrl":"snowFlakeInput","inputUrlType":"msg","enginePayload":"payload.aiware.enginePayload","enginePayloadType":"","jsonPayloadSwitcher":false,"tasks":[{"engine":{"id":"c0e55cde-340b-44d7-bb42-2e0d65e98255","name":"Transcription - E - English (Global) V3","price":125,"builds":{"records":[{"manifest":{"url":"https://github.com/veritone/speechmatics-container-service/","user":"kmurray+cs@veritone.com","build":"2fb31cd3-a08b-48ce-b075-7b5fe113e3b5","oauth":"","runtime":"","category":"Transcription","engineId":"c0e55cde-340b-44d7-bb42-2e0d65e98255","isPublic":true,"schedule":"","schemaId":0,"sourceId":0,"ingestion":{"scanner":false,"supportsLiveStreams":false,"supportedSourceTypes":null},"libraries":null,"maxFileMb":0,"categories":null,"engineMode":"chunk","clusterSize":"custom","isBenchmark":false,"isConductor":false,"useCustomUI":false,"gpuSupported":"","inputOptions":null,"releaseNotes":"Build Information: [2021-06-28_17:26:14] [master - 214cbfcc2441e9308b1110f5ab0a86eda1de399d] en:7.0.0; Is speaker separation engine: false","customProfile":"speechmatics","externalCalls":[],"inputEncoding":"","outputFormats":["application/json"],"volumeProfile":"","maxConcurrency":50,"isCJISCompliant":false,"trainableViaApi":false,"whitelistOrgIds":null,"maxMediaLengthMs":0,"minMediaLengthMs":0,"schmerverCountry":"","fedRampImpactLevel":0,"initialConcurrency":50,"sourceFileDeletion":false,"supportedLanguages":null,"supportedInputTypes":["text/html"],"preferredInputFormat":"application/json","supportedInputFormats":["audio/mp4","audio/flac","audio/wav","audio/mpeg"]},"dockerImage":"registry.central.aiware.com/c0e55cde-340b-44d7-bb42-2e0d65e98255:2fb31cd3-a08b-48ce-b075-7b5fe113e3b5"}]},"fields":[{"max":null,"min":null,"info":"Allows a custom dictionary wordlist to be added to the engine at runtime. Having additional words can improve the likelihood they will be output in the final transcription. Words and phrases must be comma-delimited.","name":"keywords","step":null,"label":"keywords","required":null,"defaultValue":""},{"max":null,"min":null,"info":"Type of speaker recognition to include in the output: none - transcription only, no dirization; speaker - speaker identification, takes significantly longer; speaker_change - speaker changed, but no identification","name":"diarization","step":null,"label":"diarization","required":false,"defaultValue":""},{"max":null,"min":null,"info":"How sensitive to speaker changes the algorithm should be. Only used if diarization = speaker_change. Value must be a number in the range of 0.0 to 1.0, higher numbers are more sensitive (more frequent changes detected).","name":"speakerChangeSensitivity","step":null,"label":"speakerChangeSensitivity","required":false,"defaultValue":"0.4"},{"max":null,"min":null,"info":"Include punctuation in the output","name":"advancedPunctuation","step":null,"label":"advancedPunctuation","required":false,"defaultValue":"true"}],"rating":null,"category":{"id":"67cd4dd0-2f75-445d-a6f0-2f297d6cd182","name":"Transcription"},"manifest":{"engineMode":"chunk","supportedInputTypes":["text/html"]},"description":"This engine converts US English speech to text.","deploymentModel":"FullyNetworkIsolated","libraryRequired":false},"fields":{"keywords":"","diarization":"","advancedPunctuation":"true","speakerChangeSensitivity":"0.4"},"engineId":"c0e55cde-340b-44d7-bb42-2e0d65e98255","chunkType":"audio","clusterId":"prd5-21xbyq0x-4h0s-o685-snas-oovhdai552v9","libraryId":"","engineMode":"chunk","engineName":"Transcription - E - English (Global) V3","engineCategory":{"id":"67cd4dd0-2f75-445d-a6f0-2f297d6cd182","name":"Transcription","type":{"name":"Cognition","description":null},"description":"Convert the spoken word into readable text","categoryType":"transcript","totalEngines":2241},"libraryRequired":false,"engineCategoryId":"67cd4dd0-2f75-445d-a6f0-2f297d6cd182","enginePayloadVal":{},"enginePayloadTypeVal":"","jsonPayloadSwitcherVal":false}],"controllerBaseUrl":"https://automate-controller-v3f.aws-prod-rt.veritone.com/edge/v1","createJobType":"engine-selector","createJobPriority":"-20","jsonJob":"","listFieldTasks":[],"listFieldRoutes":[],"template":"","syntax":"mustache","x":440,"y":60,"wires":[["39d57275.6a1a0e"],[]]},{"id":"671cdbad.426534","type":"function","z":"2fa59aa3.f3dd56","name":"Extract Data","func":"\nmsg.snowFlakeInput = msg.payload.aiwareChunk[1]\nmsg.payload = {};\nreturn msg;","outputs":1,"noerr":0,"x":250,"y":60,"wires":[["2f4d82e6.fef49e"]]},{"id":"e4d8e1ba.1affa","type":"aiware-in","z":"2fa59aa3.f3dd56","name":"","format":"object","samples":[{"id":"buw731","name":"Default","value":{},"status":"active"}],"tdoContent":"{}","_mtime":1629143905992,"x":100,"y":60,"wires":[["671cdbad.426534"]]},{"id":"fdcd89af.c4cde8","type":"aiware-out","z":"2fa59aa3.f3dd56","name":"","statusCode":200,"failureMsg":"","failureMsgType":"","failureReason":"","failureReasonType":"","skipResultCallback":false,"disableDebug":false,"x":860,"y":60,"wires":[]},{"id":"39d57275.6a1a0e","type":"function","z":"2fa59aa3.f3dd56","name":"Fromat Result","func":"\nmsg.payload = msg.payload.aiware.engineResultSimple;\nreturn msg;","outputs":1,"noerr":0,"x":660,"y":60,"wires":[["fdcd89af.c4cde8"]]}]

AWS Lambda setup

Follow this Lambda function Snowflake tutorial below with the following exceptions:

  • Select¬†Node.js 14.x as the language
  • Copy the sample code below for your Lambda function
  • Replace the flowUrl and ssoToken with the values obtained in section 1

Lambda Sample Code

const https = require('https');
const url = require('url');

const ssoToken = "e6bd46e6-f977-459f-8be8-5e66ba7e5872";
const flowUrl = "https://automate-controller-v3f.aws-prod-rt.veritone.com/edge/v1/flow/6476ea71-8b35-4d0d-b3db-1a56e7464bf7/latest/process";

exports.handler = async (event) => {


    const body = JSON.parse(event.body);
    const executionUrl = url.parse(flowUrl);
    let results = [];
    
    const forLoop = async _ => {
        for (let i =0; i< body.data.length; i++) {
            let isRetrieve = body.data[i][1].split('-').length == 5;
            
            let defaultOptions = {
                host: isRetrieve ? 'api.veritone.com' : executionUrl.host,
                port: 443,
                headers: {
                    'Content-Type': 'application/json',
                    'Authorization': 'Bearer ' + ssoToken
                }
            }
        
            let post = (path, payload) => new Promise((resolve, reject) => {
                const options = { ...defaultOptions, path, method: 'POST' };
                const req = https.request(options, res => {
                    let buffer = "";
                    res.on('data', chunk => buffer += chunk)
                    res.on('end', () => resolve(JSON.parse(buffer)))
                });
                req.on('error', e => reject(e.message));
                req.write(payload);
                req.end();
            })
        
            let payload = isRetrieve ? JSON.stringify({"operationName":"getFlowExecutionResult","variables":{},"query":`query getFlowExecutionResult {
  flowExecution(flowExecutionId: \"${body.data[i][1]}\") {
    flowExecutionResult
  }
}
`}) : JSON.stringify(body.data[i]) ;
            
            let flowExecution = await post(isRetrieve ? "/v3/graphql" : executionUrl.path, payload);
            
            let result = isRetrieve ? (flowExecution.data.flowExecution ? flowExecution.data.flowExecution.flowExecutionResult : null) : flowExecution.FlowExecutionId;
            
            results.push([i, result]);
        }
    }
    await forLoop();
    const response = {
        statusCode: 200,
        body: JSON.stringify({data: results})
    };
    return response;
};

Create the proxy service (Amazon API Gateway)

Follow these Snowflake instructions to create your proxy service.

Create the API integration for AWS in Snowflake

Follow this API integration Snowflake tutorial to create the integration.

Set up the trust relationship(s) between Snowflake and the new IAM role

Follow this Snowflake tutorial to set up the Trust Relationship.

Create the external function

Making sure you use the sample functions provided below, use this Snowflake tutorial to create your external function.

  • Create job¬†
    create external function automate_create_job(input varchar)
        returns variant
        api_integration = <api_integration_name>
        as '<resource_invocation_url>';
  • Fetch results¬†
    create external function automate_fetch_results(FlowExecutionId varchar)
        returns variant
        api_integration = <api_integration_name>
        as '<resource_invocation_url>';
  • Two functions will be created: one to asynchronously trigger Veritone processing, and a second synchronous function to return the results.

Prepare to test the functions

This step assumes you have a database setup. See this Snowflake tutorial to ensure you have a workable dataset. We recommended that you have two additional columns in your Snowflake table:

  • executionId (varchar)
  • executionResult (varchar)

Example usage

  • Create a new execution
  • select automate_create_job('input');
  • Returns flow execution id that can be used with the
  • Retrieve execution result
  • automate_fetch_results("FlowExecutionId")
  • If the process is not finished the value will be null
  • Update a column with the execution id or the results from a fetch
  • update media set executionId automate_create_job(source);
  • Assuming the media table has a field source for the input and executionId
  • Fetch result and save in result column
  • update media set executionResult automate_fetch_results(executionId);
  • Assuming the media table has a field executionId for the input and executionResult
  • Query executionResults field
  • Select any value from executionResult
Additional Technical Documentation Information
Properties
11/21/2023 8:56 PM
12/4/2023 6:33 PM
12/4/2023 6:33 PM
Documentation
Documentation
000004067
Translation Information
English

Powered by