import pandas as pd
import numpy as np
import math
import gdown
import matplotlib.pyplot as plt
#import whisperx
import gc
import transformers
import webvtt
import os
import ffmpeg
import random
import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
from moviepy.editor import *
import torchaudio
from sklearn.neighbors import kneighbors_graph
import networkx as nx
import cv2
import glob
import re
import math
from sentence_transformers import SentenceTransformer
from deepface import DeepFace
#from src.models import Wav2Vec2ForSpeechClassification, HubertForSpeechClassification
from transformers import AutoConfig, Wav2Vec2FeatureExtractor
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from transformers import RobertaTokenizerFast, BertForSequenceClassification
[docs]class VEMProcessor:
"""
Here is the main class that makes this module, VEMProcessor
"""
def __init__(self):
"""
To initialize it, pass the video fto be analyzed as an argument
"""
self.segmenter = VideoSegmenter()
self.opinion_model_transc = OpinionExtractionModel()
self.opinion_model_transc.set_model(modality = "transcript")
self.opinion_model_audio = OpinionExtractionModel()
self.opinion_model_audio.set_model(modality = "audio")
self.opinion_model_video = OpinionExtractionModel()
self.opinion_model_video.set_model(modality = "video")
self.opinion_extractor_transc = OpinionExtractor([],self.opinion_model_transc)
self.opinion_extractor_audio = OpinionExtractor([],self.opinion_model_audio)
self.opinion_extractor_video = OpinionExtractor([],self.opinion_model_video)
self.multimodal_extractor = MultimodalOpinionExtractor([])
self.emotion_map_generator = EmotionMapGenerator([]) # Adjust segment_block_size as needed
[docs] def process_video(self, video_file, segment_block_size = 10):
"""
Use this to run the models for all the modalities(transcript, audio, video and multimodal) and generate the heatmaps.
Args:
**video_file (mp4)**: video to be analized.
**segment_block_size (int)**: size of the block used in each frame of the heatmap.
Return:
Nothing.
In case you also want the dataframes generated with all the emotions, object_of_class.segmented_video contains it.
"""
# Step 1: Segment the video
self.segmented_video = self.segmenter.segment_video(video_file)
# Step 2: Extract opinions
self.opinion_extractor_transc = OpinionExtractor([],self.opinion_model_transc)
self.opinion_extractor_audio = OpinionExtractor([],self.opinion_model_audio)
self.opinion_extractor_video = OpinionExtractor([],self.opinion_model_video)
self.opinion_extractor_transc.segmenter_result = self.segmented_video
self.opinion_extractor_audio.segmenter_result = self.segmented_video
self.opinion_extractor_video.segmenter_result = self.segmented_video
self.opinion_extractor_transc.extract_opinions()
self.opinion_extractor_audio.extract_opinions()
self.opinion_extractor_video.extract_opinions()
# Step 3: Extract multimodal opinions
self.multimodal_extractor.segmented_with_emotion = self.segmented_video
self.multimodal_extractor.extract_multimodal_opinions()
# Step 4: Generate emotion map
self.emotion_map_generator.segments_with_emotion = self.segmented_video
self.emotion_map_generator.graph = self.multimodal_extractor.G_multimodal
self.emotion_map_generator.generate_emotion_map("transcript",segment_block_size)
self.emotion_map_generator.generate_emotion_map("audio",segment_block_size)
self.emotion_map_generator.generate_emotion_map("video",segment_block_size)
self.emotion_map_generator.generate_emotion_map("multimodal",segment_block_size)
[docs]class VideoSegmenter:
"""
Class responsible for extracting the transcription from the video, segmenting it in phrases with the
timestamps and name of the parts contained in a dataframe
"""
def __init__(self):
logging.basicConfig(filename="newfile.log",
format='%(asctime)s %(message)s',filemode='w')
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
pass
[docs] def segment_video(self, video_file):
"""
Receives an MP4 file and returns a list of video segments.
Each segment is represented as a series in a dataframe: (start_time, end_time, transcript_text, segment_file.mp4)
Args:
**video_file (str)**: video to be analyzed.
Return:
Dataframe with all the segments.
"""
os.makedirs(video_file[0:3] + "parts", exist_ok=True)
# Segment the video and extract transcript for each segment
# Store each segment with its start time, end time, transcript, and save it as a new file
self.logger.info("Transcripting the video")
dataframe = __transcript(video_file)
self.logger.info("Done")
video = VideoFileClip(video_file)
maximo = dataframe.shape[0]
names = []
self.logger.info("Segmenting the video")
for i in range(0, maximo):
#Usando os timestamps da transcricao, corto o video separando aproximadamente cada frase
startPos = dataframe[0][i]
endPos = dataframe[1][i]
clip = video.subclip(startPos, endPos)
part_name = video_file[0:3] + "parts/part_"+str(i)+".mp4"
names.append(part_name)
clip.write_videofile(part_name, codec='libx264', fps=video.fps)
video.close()
dataframe['segment_file'] = names
dataframe.rename(columns={0:"Start",1:"End",2:"Transcript"})
self.logger.info("Done")
return dataframe
pass
#Criar 3, um para cada categoria
#uso dos modelos acima
[docs]class EmotionMapGenerator:
"""
Class responsible for generating the heatmaps visualization of the classifications.
Args:
**segmented_with_emotion (dataframe)**: sentences to be analyzed with the audio and
transcript classification already been done.
**graph (networkx graph)**: graph generated by the multimodal model
Return:
Nothing.
"""
def __init__(self, segments_with_emotion, graph = None):
self.segments_with_emotion = segments_with_emotion
self.graph = graph
url1 = 'https://drive.google.com/uc?id=1yJBHU8Zl4MuoQfJqkPgVDwJfYRJDPUAH'
output = 'emotions_coord.xlsx'
gdown.download(url1, output, quiet=False)
self.emotions_coord = pd.read_excel(output)
logging.basicConfig(filename="newfile.log",
format='%(asctime)s %(message)s',filemode='w')
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
[docs] def generate_emotion_map(self, modality, segment_block_size = 5):
# Divide segments into blocks
# Calculate emotion distribution for each block
# Generate emotion wheel for each block
# Create a video showing the evolution of emotion map over time
"""
Generate an emotion wheel for each block of segments in the format of mp4 and png.
Args:
**modality (string)**: for which modality is the heatmap to be created (the opinion excration of the
respective modality have to be done befor)
**segment_block_size (int)**: how many phrases are shown in the same frame of the heatmap.
Return:
Nothing.
"""
if modality == 'multimodal':
x = []
y = []
x_img = []
y_img = []
__GCP(self.graph,mi=1,audio_weight=0.4, text_weight=0.6,max_iter=30)
for index in self.segments_with_emotion.index:
v = self.graph.nodes[index]['f']
x.append(v[0])
y.append(v[1])
if (np.abs(v[0]) > 0.1 or np.abs(v[1]) > 0.1):
x_img.append(v[0])
y_img.append(v[1])
else:
df = get_labels(self.segments_with_emotion, modality)
label = modality + '_label'
df.loc[df[label] == 'no_face', [label]] = 'neutral'
resp = list(df[label].apply(__generate_coord, args = (self.emotions_coord,)))
temp = pd.DataFrame.from_records(resp, columns=['x', 'y'])
df = pd.concat([df, temp], axis=1)
array_x = df['x'].to_numpy()
x = array_x.tolist()
array_y = df['y'].to_numpy()
y = array_y.tolist()
dfimage = df[df[label]!='neutral']
dfimage = dfimage.reset_index(drop=True)
array_x_img = dfimage['x'].to_numpy()
x_img = array_x_img.tolist()
array_y_img = dfimage['y'].to_numpy()
y_img = array_y_img.tolist()
vid_name = self.segments_with_emotion["segment_file"][0]
vid_name = vid_name[0:3]
os.makedirs("tempjpgs", exist_ok=True)
os.makedirs(vid_name + "heatmaps", exist_ok=True)
__plot_heatmap(x_img, y_img, self.emotions_coord, modality, vid_name)
id_ini = 0
id_fim = segment_block_size
atual = 1
quant = self.segments_with_emotion.shape[0]
tam = quant / segment_block_size
for i in range(math.ceil(tam)):
x_temp = np.array(x[id_ini: id_fim])
y_temp = np.array(y[id_ini: id_fim])
if all(val == 0 for val in x_temp) and all(val == 0 for val in y_temp):
x_temp = np.array([0])
y_temp = np.array([0])
else:
x_temp = [i for i,j in zip(x_temp,y_temp) if (i != 0 and j != 0)]
y_temp = [j for i,j in zip(x[id_ini: id_fim],y_temp) if (i != 0 and j != 0)]
plt.title("Emotions from " + str(id_ini) + " to " + str(id_fim-1) + " block")
plt.savefig("tempjpgs/output" + str(atual) + ".jpg")
plt.close()
id_ini += segment_block_size
id_fim += segment_block_size
atual += 1
if(id_fim > quant):
id_fim = quant
img_array = []
for filename in sorted(glob.glob('tempjpgs/*.jpg') , key=__numericalSort):
img = cv2.imread(filename)
height, width, layers = img.shape
size = (width,height)
img_array.append(img)
os.remove(filename)
os.rmdir("tempjpgs")
rate = 1
out = cv2.VideoWriter(vid_name +"heatmaps/"+ modality + '_heatmap.mp4',cv2.VideoWriter_fourcc(*'XVID'), rate, size)
for i in range(len(img_array)):
for j in range(2):
out.write(img_array[i])
out.release()
myvideo = VideoFileClip(vid_name + "heatmaps/"+modality + '_heatmap.mp4')
self.logger.info("Video created")
return ipython_display(myvideo)
#Função para extrair do dicionario retornado pelo goemotions a emoção mais provável e sua probabilidade
def __emocao_provavel(frase, emot_pipe):
emotion_labels = emot_pipe(frase)
maximo = emotion_labels[0][0]["score"]
emocao = emotion_labels[0][0]["label"]
for dict in emotion_labels[0]:
if dict["score"] > maximo:
maximo = dict["score"]
emocao = dict["label"]
return emocao, maximo
#Funcoes auxiliares para a geração do heatmap
[docs]def get_labels(dataframe, modality = "all"):
#Returns the dataframe with all phrases, classificated or not. With modality, you can select to show only the classification for the
#modality selected in case of multiple classifications. Doesn't work for multimodal since there is no classification
if(modality == "transcript"):
return dataframe[[0,1,2,'transcript_label','transcript_prob']]
elif(modality == "audio"):
return dataframe[[0,1,2,'audio_label','audio_prob']]
elif(modality == "video"):
return dataframe[[0,1,2,'video_label','video_prob']]
return dataframe
numbers = re.compile(r"(\d+)")
def __numericalSort(value):
parts = numbers.split(value)
parts[1::2] = map(int, parts[1::2])
return parts
def __generate_coord(label, coords):
index = coords.loc[coords['Emotion'] == label].index[0]
x = coords.iloc[index]['X']
y = coords.iloc[index]['Y']
return (x,y)
def __kde_quartic(d,h):
dn=d/h
P=(15/16)*(1-dn**2)**2
return P
def __plot_heatmap(x, y, emotions_coord, modality, vid_name):
#Definindo tamanho do grid e do raio(h)
grid_size=0.02
h=0.5
#Tomando valores de máximos e mínimos de X e Y.
x_min=-1
x_max=1
y_min=-1
y_max=1
#Construindo grid
x_grid=np.arange(x_min-h,x_max+h,grid_size)
y_grid=np.arange(y_min-h,y_max+h,grid_size)
x_mesh,y_mesh=np.meshgrid(x_grid,y_grid)
#Determinando ponto central do grid
xc=x_mesh+(grid_size/2)
yc=y_mesh+(grid_size/2)
intensity_list=[]
for j in range(len(xc)):
intensity_row=[]
for k in range(len(xc[0])):
kde_value_list=[]
for i in range(len(x)):
#Calculando distância
d=math.sqrt((xc[j][k]-x[i])**2+(yc[j][k]-y[i])**2)
if d<=h:
p=__kde_quartic(d,h)
else:
p=0
kde_value_list.append(p)
#Soma os valores de intensidade
p_total=sum(kde_value_list)
intensity_row.append(p_total)
intensity_list.append(intensity_row)
#Saída do Heatmap
plt.figure(figsize=(7,7))
intensity=np.array(intensity_list)
plt.pcolormesh(x_mesh,y_mesh,intensity,cmap='YlOrRd') #https://matplotlib.org/stable/tutorials/colors/colormaps.html
#fig, ax = plt.subplots()
x_emo = emotions_coord.X.to_list()
y_emo = emotions_coord.Y.to_list()
plt.scatter(x_emo, y_emo)
for i, row in emotions_coord.iterrows():
plt.annotate(row['Emotion'], (x_emo[i], y_emo[i]))
plt.xlim(-1, 1)
plt.ylim(-1,1)
ax = plt.gca()
ax.add_patch(plt.Circle((0, 0), 1, color='black', fill=False))
plt.axvline(x = 0, color = 'black', label = 'Arousal')
plt.axhline(y = 0, color = 'black', label = 'Valence')
#plt.colorbar()
plt.plot(x,y,'x',color='white')
plt.savefig(vid_name + "heatmaps/" + modality + "heatmap.png")
#Funcoes auxiliares para a transcricao
def __transcript(video, method = "whisperx", min_time = 1):
#Transform the audio of a video into a dataframe with it's phrases in text form separated by time frames
bashCommand = "whisperx --compute_type float32 --output_format vtt " + video
os.system(bashCommand)
dataframe = __set_vtt(video.replace("mp4", "vtt"))
dataframe = dataframe[dataframe.apply(lambda x: __time_diff(x[1], x[0]), axis=1) > min_time]
dataframe.reset_index(inplace = True, drop = True)
return dataframe
def __set_vtt(arquivo):
#Alternate form to load a video dataframe via it's vtt, which can be generated previously with the whisper transcription
L = []
for caption in webvtt.read(arquivo):
L.append([caption.start,caption.end,str(caption.text)])
dataframe = pd.DataFrame(L)
return dataframe
def __traduz(frase, pten_pipeline):
#Translate the phrases from portuguese to english in order to use the text classification model
traducao = pten_pipeline(frase)
traducao = list(traducao[0].values())
return traducao[0]
def __time_diff(fim, init):
time_fim = __to_seconds(fim)
time_init = __to_seconds(init)
return time_fim - time_init
def __speech_file_to_array_fn(path, sampling_rate):
speech_array, _sampling_rate = torchaudio.load(path)
resampler = torchaudio.transforms.Resample(_sampling_rate, sampling_rate)
speech = resampler(speech_array).squeeze().numpy()
return speech
def __encoder_text_adj(sentence, encoder):
return encoder.encode([sentence])[0]
#Funcoes auxiliares para a funcionalidade audio
def __predict(path, sampling_rate, device, config, feature_extractor, model):
speech = __speech_file_to_array_fn(path, sampling_rate)
inputs = feature_extractor(speech, sampling_rate=sampling_rate, return_tensors="pt", padding=True)
inputs = {key: inputs[key].to(device) for key in inputs}
with torch.no_grad():
logits = model(**inputs).logits
scores = F.softmax(logits, dim=1).detach().cpu().numpy()[0]
outputs = [{"Emotion": config.id2label[i], "Score": f"{round(score * 100, 3):.1f}%"} for i, score in
enumerate(scores)]
return outputs
# extrai os embeddings da predição feita
def __encoder_audio(path, sampling_rate, device, feature_extractor, model, mean_pool=True):
speech = __speech_file_to_array_fn(path, sampling_rate)
inputs = feature_extractor(speech, sampling_rate=sampling_rate, return_tensors="pt", padding=True)
inputs = {key: inputs[key].to(device) for key in inputs}
with torch.no_grad():
out = model(**inputs).hidden_states[-1]
if mean_pool:
return np.array(torch.mean(out, dim=1).cpu())[0]
else:
return np.array(out.cpu())[0]
#Funcoes auxiliares para a funcionalidade multimodal
def __GCP(G, max_iter=100, audio_weight=0.2, text_weight=0.8, mi=1, min_diff=0.05):
# inicializando
L_nodes = []
for n in G.nodes():
G.nodes[n]['f'] = np.average([G.nodes[n]['text'],G.nodes[n]['audio']],axis=0,weights=[text_weight, audio_weight])
L_nodes.append(n)
for i in range(0,max_iter):
random.shuffle(L_nodes)
# propagando
diff = 0
for node in L_nodes:
f_new = np.array([0.0, 0.0])
count = 0
for neighbor in G.neighbors(node):
f_new += G.nodes[neighbor]['f']
count += 1
f_new /= count
f_pseudolabeling = np.average([G.nodes[node]['text'],G.nodes[node]['audio']],axis=0,weights=[text_weight, audio_weight])
pl = G.nodes[node]['pseudolabeling']*mi
f_new = f_pseudolabeling*pl + f_new*(1-pl)
diff += np.linalg.norm(G.nodes[node]['f']-f_new)
G.nodes[node]['f']=f_new
print("Iteration #"+str(i+1)+" Q(F)="+str(diff))
if diff <= min_diff: break
#Funcoes uteis
def __to_seconds(horario):
horario_separado = horario.split(":")
seconds = 3600*int(horario_separado[0]) + 60*int(horario_separado[1]) + float(horario_separado[2])
return seconds