To open a WebSocket connection, you need to create a new WebSocket using the wss protocol in the url. The same script that creates the WebSocket can also set up the subscriptions to the task outputs.
Before you begin
You must do the following:
- Create one or more scheduled jobs that are generating output to which you want to subscribe.
- Create an environment variable,
CONTROLLER_URL, set to the AI Processing API base URL. An example would be:
CONTROLLER_URL="dev-local.aiware.run:8443" # processing api url
Steps
- Verify the scheduled jobs you set up are running and have active jobs for the transcription engine.
- Open a WebSocket connection tool such as Piesocket.
- Connect to
wss://CONTROLLER_URL/edge/v1/client/ws?token=${edgeToken} - To query the taskIDs you want to subscribe to, use the ID from the scheduled job(s) and query for all running tasks. Use the returned taskIDs for the WebSocket.
An example of the code used to connect to wss on the frontend is:
const useSocket = () => {
const baseEndpoint = useSelector((state) => state.config.apiRoot);
const edgeToken = useSelector((state) => state.config.api.edgeToken);
const caseJobs = useSelector((state) => casesSelectors.selectCaseJobs(state));
const [subscribedJobs, setSubscribedJobs] = useState([]);
const [isReady, setIsReady] = useState(false);
const [taskOutput, setTaskOutput] = useState(false);
const [isConnected, setIsConnected] = useState(false);
const [socket, setSocket] = useState(null);
// Send the Payload over for subscription
const subscription = (msgType = 'subscribe') => {
switch (msgType) {
case 'subscribe':
if (!isEmpty(caseJobs)) {
caseJobs.forEach((job) => {
let jobs = get(job, 'jobsToSubscribe', []);
jobs.forEach((jobId) => {
if (subscribedJobs.includes(jobId)) return;
setSubscribedJobs([...subscribedJobs, jobId]);
socket.send(
JSON.stringify({
jobID: jobId,
msgType: msgType,
token: `${edgeToken}`, // Need to set a default edge token when installed.
})
);
});
});
}
break;
case 'unsubscribe':
if (!isEmpty(subscribedJobs)) {
subscribedJobs.forEach((jobId) => {
let newSubscribedJobs = subscribedJobs.filter((id) => id !== jobId);
setSubscribedJobs(newSubscribedJobs);
socket.send(
JSON.stringify({
jobID: jobId,
msgType: msgType,
token: `${edgeToken}`, // Need to set a default edge token when installed.
})
);
});
}
break;
}
};
const subscribeToCase = (status) => setIsReady(status);
const unSubscribeToCase = () => subscription('unsubscribe');
useEffect(() => {
if (isReady && !isEmpty(caseJobs)) {
subscription('subscribe');
} else if (!isReady && !isEmpty(subscribedJobs)) {
subscription('unsubscribe');
}
}, [caseJobs]);
useEffect(() => {
const connect = () => {
setSocket(
new WebSocket(
`${baseEndpoint.replace(
'https',
'wss'
)}/edge/v1/client/ws?token=${edgeToken}`
)
);
};
if (!socket) {
connect();
return;
}
// On Open Connection
socket.onopen = (e) => {
console.log('Connected', e);
setIsConnected(true);
};
// Event handler for receiving text messages
socket.onmessage = (event) => {
const payload = JSON.parse(event.data);
let switchType = payload.ackMsgType || payload.msgType;
switch (switchType) {
case 'taskOutput':
setTaskOutput(transformOutput(payload));
console.log('TASK OUTPUT: ', payload);
break;
case 'taskStatus':
console.log(`TASK STATUS [${payload.status}]: `, payload);
break;
case 'subscribe':
case 'unsubscribe':
// If the job is not in the list of subscribed jobs, add it.
if (switchType === 'subscribe' && payload.status === 'OK') {
console.log(`SUBSCRIBED: ${payload.jobID}`, payload);
} else if (switchType === 'unsubscribe' && payload.status === 'OK') {
console.log(`UNSUBSCRIBED: ${payload.jobID}`, payload);
}
break;
}
};
// On Socket Close Connection
socket.onclose = (e) => {
setIsConnected(false);
console.log('Disconnected Socket connection: ', e);
// UnSubscribe to all jobs if the socket is closed.
subscription('unsubscribe');
setTimeout(() => {
connect();
}, 6000);
};
// On Socket Error, Try to reconnect
socket.onerror = (e) => {
setIsConnected(false);
console.log('Error on socket connection: ', e);
setTimeout(() => {
connect();
}, 1000);
};
return () => socket.close();
}, [socket]);
return { subscribeToCase, unSubscribeToCase, taskOutput, isConnected };
};
export default useSocket;
GraphQL query example used to fetch the tasks to subscribe to
query {
// The scheduled job id of the transcription created earlier
scheduleJob_1:scheduledJob(id: "1") {
id
jobs (status: running) {
count
records {
id
status
tasks (status: running) {
records {
id
status
}
}
}
}
}
}