Analisar dados do Twitter com o Apache Hive e o Apache Hadoop no HDInsight
Saiba como utilizar o Apache Hive para processar dados do Twitter. O resultado é uma lista de utilizadores do Twitter que enviaram mais tweets que contêm uma determinada palavra.
Importante
Os passos neste documento foram testados no HDInsight 3.6.
Obter os dados
O Twitter permite-lhe obter os dados de cada tweet como um documento JSON (JavaScript Object Notation) através de uma API REST. O OAuth é necessário para autenticação na API.
Criar uma aplicação do Twitter
A partir de um browser, inicie sessão em https://developer.twitter.com. Selecione a ligação Inscrever-se agora se não tiver uma conta do Twitter.
Selecione Criar Nova Aplicação.
Introduza Nome, Descrição, Site. Pode criar um URL para o campo Site . A tabela seguinte mostra alguns valores de exemplo a utilizar:
Campo Valor Nome MyHDInsightApp Descrição MyHDInsightApp Site https://www.myhdinsightapp.com
Selecione Sim, concordo e, em seguida, selecione Criar a sua aplicação do Twitter.
Selecione o separador Permissões . A permissão predefinida é Só de leitura.
Selecione o separador Chaves e Tokens de Acesso .
Selecione Criar o meu token de acesso.
Selecione Testar OAuth no canto superior direito da página.
Anote a chave de consumidor, o Segredo do consumidor, o token de acesso e o segredo do token de acesso.
Transferir tweets
O seguinte código Python transfere 10 000 tweets do Twitter e guarda-os num ficheiro com o nometweets.txt.
Nota
Os passos seguintes são executados no cluster do HDInsight, uma vez que o Python já está instalado.
Utilize o comando ssh para ligar ao cluster. Edite o comando abaixo ao substituir CLUSTERNAME pelo nome do cluster e, em seguida, introduza o comando:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Utilize os seguintes comandos para instalar o Tweepy, a Barra de progresso e outros pacotes necessários:
sudo apt install python-dev libffi-dev libssl-dev sudo apt remove python-openssl python -m pip install virtualenv mkdir gettweets cd gettweets virtualenv gettweets source gettweets/bin/activate pip install tweepy progressbar pyOpenSSL requests[security]
Utilize o seguinte comando para criar um ficheiro com o nome gettweets.py:
nano gettweets.py
Edite o código abaixo ao substituir
Your consumer secret
,Your consumer key
,Your access token
eYour access token secret
pelas informações relevantes da sua aplicação do Twitter. Em seguida, cole o código editado como o conteúdo do ficheiro gettweets.py .#!/usr/bin/python from tweepy import Stream, OAuthHandler from tweepy.streaming import StreamListener from progressbar import ProgressBar, Percentage, Bar import json import sys #Twitter app information consumer_secret='Your consumer secret' consumer_key='Your consumer key' access_token='Your access token' access_token_secret='Your access token secret' #The number of tweets we want to get max_tweets=100 #Create the listener class that receives and saves tweets class listener(StreamListener): #On init, set the counter to zero and create a progress bar def __init__(self, api=None): self.num_tweets = 0 self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start() #When data is received, do this def on_data(self, data): #Append the tweet to the 'tweets.txt' file with open('tweets.txt', 'a') as tweet_file: tweet_file.write(data) #Increment the number of tweets self.num_tweets += 1 #Check to see if we have hit max_tweets and exit if so if self.num_tweets >= max_tweets: self.pbar.finish() sys.exit(0) else: #increment the progress bar self.pbar.update(self.num_tweets) return True #Handle any errors that may occur def on_error(self, status): print status #Get the OAuth token auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) #Use the listener class for stream processing twitterStream = Stream(auth, listener()) #Filter for these topics twitterStream.filter(track=["azure","cloud","hdinsight"])
Dica
Ajuste o filtro de tópicos na última linha para controlar palavras-chave populares. A utilização de palavras-chave populares no momento em que executa o script permite uma captura de dados mais rápida.
Utilize Ctrl + X e, em seguida, Y para guardar o ficheiro.
Utilize o seguinte comando para executar o ficheiro e transferir tweets:
python gettweets.py
É apresentado um indicador de progresso. Conta até 100% à medida que os tweets são transferidos.
Nota
Se a barra de progresso estiver a demorar muito tempo a avançar, deve alterar o filtro para controlar os tópicos mais populares. Quando existem muitos tweets sobre o tópico no seu filtro, pode obter rapidamente os 100 tweets necessários.
Carregar os dados
Para carregar os dados para o armazenamento do HDInsight, utilize os seguintes comandos:
hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt
Estes comandos armazenam os dados numa localização à qual todos os nós no cluster podem aceder.
Executar a tarefa do HiveQL
Utilize o seguinte comando para criar um ficheiro com instruções hiveQL :
nano twitter.hql
Utilize o seguinte texto como o conteúdo do ficheiro:
set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; -- Drop table, if it exists DROP TABLE tweets_raw; -- Create it, pointing toward the tweets logged from Twitter CREATE EXTERNAL TABLE tweets_raw ( json_response STRING ) STORED AS TEXTFILE LOCATION '/tutorials/twitter/data'; -- Drop and recreate the destination table DROP TABLE tweets; CREATE TABLE tweets ( id BIGINT, created_at STRING, created_at_date STRING, created_at_year STRING, created_at_month STRING, created_at_day STRING, created_at_time STRING, in_reply_to_user_id_str STRING, text STRING, contributors STRING, retweeted STRING, truncated STRING, coordinates STRING, source STRING, retweet_count INT, url STRING, hashtags array<STRING>, user_mentions array<STRING>, first_hashtag STRING, first_user_mention STRING, screen_name STRING, name STRING, followers_count INT, listed_count INT, friends_count INT, lang STRING, user_location STRING, time_zone STRING, profile_image_url STRING, json_response STRING ); -- Select tweets from the imported data, parse the JSON, -- and insert into the tweets table FROM tweets_raw INSERT OVERWRITE TABLE tweets SELECT cast(get_json_object(json_response, '$.id_str') as BIGINT), get_json_object(json_response, '$.created_at'), concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ', substr (get_json_object(json_response, '$.created_at'),27,4)), substr (get_json_object(json_response, '$.created_at'),27,4), case substr (get_json_object(json_response, '$.created_at'),5,3) when "Jan" then "01" when "Feb" then "02" when "Mar" then "03" when "Apr" then "04" when "May" then "05" when "Jun" then "06" when "Jul" then "07" when "Aug" then "08" when "Sep" then "09" when "Oct" then "10" when "Nov" then "11" when "Dec" then "12" end, substr (get_json_object(json_response, '$.created_at'),9,2), substr (get_json_object(json_response, '$.created_at'),12,8), get_json_object(json_response, '$.in_reply_to_user_id_str'), get_json_object(json_response, '$.text'), get_json_object(json_response, '$.contributors'), get_json_object(json_response, '$.retweeted'), get_json_object(json_response, '$.truncated'), get_json_object(json_response, '$.coordinates'), get_json_object(json_response, '$.source'), cast (get_json_object(json_response, '$.retweet_count') as INT), get_json_object(json_response, '$.entities.display_url'), array( trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))), array( trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))), trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), get_json_object(json_response, '$.user.screen_name'), get_json_object(json_response, '$.user.name'), cast (get_json_object(json_response, '$.user.followers_count') as INT), cast (get_json_object(json_response, '$.user.listed_count') as INT), cast (get_json_object(json_response, '$.user.friends_count') as INT), get_json_object(json_response, '$.user.lang'), get_json_object(json_response, '$.user.location'), get_json_object(json_response, '$.user.time_zone'), get_json_object(json_response, '$.user.profile_image_url'), json_response WHERE (length(json_response) > 500);
Prima Ctrl + X e, em seguida, prima Y para guardar o ficheiro.
Utilize o seguinte comando para executar o HiveQL contido no ficheiro:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
Este comando executa o ficheiro twitter.hql . Assim que a consulta estiver concluída, verá um
jdbc:hive2//localhost:10001/>
pedido.A partir da linha de comandos beeline, utilize a seguinte consulta para verificar se os dados foram importados:
SELECT name, screen_name, count(1) as cc FROM tweets WHERE text like "%Azure%" GROUP BY name,screen_name ORDER BY cc DESC LIMIT 10;
Esta consulta devolve um máximo de 10 tweets que contêm a palavra Azure no texto da mensagem.
Nota
Se tiver alterado o filtro no script, substitua o
gettweets.py
Azure por um dos filtros que utilizou.
Passos seguintes
Aprendeu a transformar um conjunto de dados JSON não estruturado numa tabela estruturada do Apache Hive . Para saber mais sobre o Hive no HDInsight, veja os seguintes documentos: