Amazon IVS

This page supplements the AWS Media blog entry on integrating Hive moderation into Amazon IVS streams. We include the full code for the example implementation described in the blog post.

Please note this is an untested illustration of how combine to features of Hive and Amazon IVS APIs when building content moderation for IVS streams. It is not intended for production use.

# Imports
import requests 
import boto3


#Your Hive API keys
vis_API_Key = 'your_visAPI_key' 
aud_API_Key = 'your_audAPI_key'
dash_API_Key = 'your_DashboardAPI_key'


# Other constants
CFDOMAIN = 'YOUR_CLOUDFRONT_DOMAIN'
banned_classes = ['general_nsfw', 'general_suggestive', 'yes_smoking']
vis_threshold = 0.9 # Confidence score threshold above which content is flagged or moderated, configurable.


# Uses the boto3 IVS client to stop the channel and delete the streamkey
def suspend_stream(channel_arn):
    ivs_client = boto3.client('ivs')
 
    stream_key = ivs_client.list_stream_keys(channelArn=channel_arn)['streamKeys'][0]['arn']
 
    # Stopping the channel
    ivs_client.stop_stream(channelArn = channel_arn)
 
    # Delete the streamkey
    ivs_client.delete_stream_key(arn=stream_key)


# Parses the Hive response into JSON and checks if any detected banned class breaches the threshold defined
def is_banned_classes_detected(hive_response):
    # Parse the Hive response JSON into a dict of {class:score} 
    # For a video input, you can iterate over timestamps for each frame and take the max score. 
    hive_response_dict = hive_response.json()
    scores_dict = {x['class']:x['score'] for x in  response_dict['status'][0]['response']['output'][0]['classes']} 

    # Check if any of the banned_classes breaches threshold set
    for one_class in banned_classes:
        if scores_dict[one_class] >= vis_threshold:
            return true

    return false 


# Extracts the channel ARN from the event
def get_channel_arn_from_event(event):
    channel_region = event['Records'][0]['awsRegion']
    channel_id = event['Records'][0]['s3']['object']['key'].split('/')[3]
    account_id = event['Records'][0]['s3']['object']['key'].split('/')[2]

    return 'arn:aws:ivs:' + channel_region + ':' + account_id + ':channel/' + channel_id


# Example sync call to Hive
def get_hive_response(content_url):
    # Example Request (visual, synchronous endpoint, content hosted at URL):
    headers = {'Authorization': f'Token {vis_API_Key}'} # Example for visual moderation tasks
    data = {'url': content_url} # This is also where you would insert metadata (e.g., channel ARN) if desired.

    # Submit request to the synchronous API endpoint (best for live content)
    return requests.post('https://api.thehive.ai/api/v2/task/sync', headers=headers, data=data)

 
# Main callback function per event
def stream_moderation_service_visual(bucket, key, event):
    # Url to input key:
    content_url = 'https://' + CFDOMAIN + '/' + key

    # Extract channel_arn from event object
    channel_arn = get_channel_arn_from_event(event)

    # Call Hive with a sync request
    hive_response = get_hive_response(content_url)
  
    # Check the Hive response JSON against the thresholds defined
    if is_banned_classes_detected(hive_response):
        suspend_stream(channel_arn)


def handler(event, context):
    ''' Main handler function '''

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    stream_moderation_service_visual(bucket, key, event)