Twitter-adatok elemzése az Apache Hive és az Apache Hadoop on HDInsight használatával

Megtudhatja, hogyan használhatja az Apache Hive-t a Twitter-adatok feldolgozására. Az eredmény azoknak a Twitter-felhasználóknak a listája, akik a legtöbb tweetet küldték, amelyek egy adott szót tartalmaznak.

Fontos

A dokumentum lépéseit a HDInsight 3.6-on teszteltük.

Az adatok lekérése

A Twitter lehetővé teszi az egyes tweetek adatainak lekérését JavaScript Object Notation -dokumentumként (JSON) egy REST API-n keresztül. Az API-hitelesítéshez OAuth szükséges.

Twitter-alkalmazás létrehozása

  1. Egy webböngészőből jelentkezzen be a következőbe https://developer.twitter.com: . Ha nem rendelkezik Twitter-fiókkal , válassza a Regisztráció most hivatkozást.

  2. Válassza az Új alkalmazás létrehozása lehetőséget.

  3. Adja meg a Nevet, a Leírást, a Webhelyet. A Webhely mező url-címét is megadhatja. Az alábbi táblázat néhány használandó mintaértéket mutat be:

    Mező Érték
    Név MyHDInsightApp
    Description MyHDInsightApp
    Webhely https://www.myhdinsightapp.com
  4. Válassza az Igen, elfogadom, majd a Twitter-alkalmazás létrehozása lehetőséget.

  5. Válassza az Engedélyek lapot. Az alapértelmezett engedély az Írásvédett.

  6. Válassza a Kulcsok és hozzáférési jogkivonatok lapot.

  7. Válassza a Hozzáférési jogkivonat létrehozása lehetőséget.

  8. Válassza az OAuth tesztelése lehetőséget az oldal jobb felső sarkában.

  9. Jegyezze fel a fogyasztói kulcsot, a fogyasztói titkos kulcsot, a hozzáférési jogkivonatot és a hozzáférési jogkivonat titkos kulcsát.

Tweetek letöltése

Az alábbi Python-kód 10 000 tweetet tölt le a Twitterről, és menti őket egy tweets.txtnevű fájlba.

Megjegyzés

A következő lépések a HDInsight-fürtön lesznek végrehajtva, mivel a Python már telepítve van.

  1. A fürthöz való csatlakozáshoz használja az ssh-parancsot . Az alábbi parancs szerkesztéséhez cserélje le a CLUSTERNAME nevet a fürt nevére, majd írja be a parancsot:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. A Tweepy, a Progress bar és más szükséges csomagok telepítéséhez használja a következő parancsokat:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. Az alábbi paranccsal hozzon létre egy gettweets.py nevű fájlt:

    nano gettweets.py
    
  4. Szerkessze az alábbi kódot úgy, hogy lecseréli Your consumer secreta , Your consumer keya , Your access tokena és Your access token secret a értékét a twitter-alkalmazásból származó releváns információkra. Ezután illessze be a szerkesztett kódot a gettweets.py fájl tartalmaként.

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #Twitter app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    Tipp

    Módosítsa a témakörök szűrőt az utolsó sorban a népszerű kulcsszavak nyomon követéséhez. Ha a szkript futtatásakor népszerű kulcsszavakat használ, az gyorsabb adatrögzítést tesz lehetővé.

  5. Mentse a fájlt a Ctrl + X, majd az Y billentyűkombinációval.

  6. Futtassa a fájlt a következő paranccsal, és töltse le a tweeteket:

    python gettweets.py
    

    Megjelenik egy folyamatjelző. A tweetek letöltése akár 100%-os is lehet.

    Megjegyzés

    Ha a folyamatjelző hosszú időt vesz igénybe, módosítsa a szűrőt a népszerű témakörök nyomon követésére. Ha a szűrőben sok tweet található a témakörről, gyorsan megkaphatja a szükséges 100 tweetet.

Adatok feltöltése

Az adatok HDInsight-tárolóba való feltöltéséhez használja a következő parancsokat:

hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt

Ezek a parancsok olyan helyen tárolják az adatokat, amelyhez a fürt összes csomópontja hozzáférhet.

A HiveQL-feladat futtatása

  1. A következő paranccsal hozzon létre egy HiveQL-utasításokat tartalmazó fájlt:

    nano twitter.hql
    

    A fájl tartalmaként használja a következő szöveget:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from Twitter
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Nyomja le a Ctrl + X billentyűkombinációt, majd az Y billentyűt a fájl mentéséhez.

  3. Futtassa a fájlban található HiveQL-t a következő paranccsal:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
    

    Ez a parancs futtatja a twitter.hql fájlt. A lekérdezés befejezése után megjelenik egy jdbc:hive2//localhost:10001/> üzenet.

  4. A beeline parancssorból a következő lekérdezés használatával ellenőrizze, hogy az adatok importálása megtörtént-e:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    Ez a lekérdezés legfeljebb 10 tweetet ad vissza, amelyek az üzenet szövegében az Azure szót tartalmazzák.

    Megjegyzés

    Ha módosította a szűrőt a szkriptben, cserélje le az gettweets.pyAzure-t a használt szűrők egyikére.

Következő lépések

Megtanulta, hogyan alakíthat át strukturálatlan JSON-adatkészleteket strukturált Apache Hive-táblává . A HDInsight-alapú Hive-ról az alábbi dokumentumokban talál további információt: