Retour à la liste des articles

Tuto : Développer un connecteur custom pour Fivetran

Kévin Bénard

Kévin Bénard

Analytics Engineer spécialisé dans dbt, j'accompagne mes clients dans la création de pipelines de données robustes et faciles à maintenir. Mon expertise se concentre sur l'amélioration de la qualité des données pour garantir des résultats fiables et exploitables.

Guide pratique pour étendre les capacités de Fivetran

Un connecteur Fivetran n'est pas toujours disponible pour toutes les sources de données. Face à cette situation, le développement d'un connecteur personnalisé devient nécessaire. Découvrons pas à pas comment procéder.

Étape 1 : Identifier le besoin

Pour l'exercice, je vais commencer par vous exposer mon problème : j'aime beaucoup aller voir des spectacles et pièces de théâtre et je trouve souvent mes idées de sorties sur BilletReduc. Sauf que je trouve que leur site web est très mauvais pour découvrir de nouveaux spectacles. Désolé si un de leurs développeurs passe par ici … Mais il est par exemple impossible de trier les spectacles par note moyenne ou nombre de notes par exemple.

Important : Avant de développer un connecteur custom, vérifiez qu'il n'existe pas déjà une solution disponible dans le catalogue Fivetran.

Je vais donc m'aider de Fivetran pour récupérer les données du site et pouvoir les exploiter de mon côté pour trouver ma future sortie. Et come vous pouvez vous en douter, Fivetran ne propose pas un connecteur pour BilletReduc et je vais donc devoir le développer.

Étape 2 : Premier script python

Quand je rencontre ce genre de problème. La première chose que je fais (après avoir mis à plat le problème), c’est de faire un premier script python qui remontera les données dont j’ai besoin. Voici le code de ce script. Il est assez simple, il va scraper le site web et remonter toutes les informations dont j’ai besoin (nombre de notes, valeurs des notes, lieux, …).

Vous pouvez voir sur une des pages d’évènement BilletReduc tout ce que je peux récupérer : https://www.billetreduc.com/320951/evt.htm.

import requests
from bs4 import BeautifulSoup
import datetime

def get_events():

    events_array = []
    base_url = "<https://www.billetreduc.com>"

    current_date = datetime.date.today() + datetime.timedelta(days=1)
    current_date_string = current_date.strftime('%Y-%m-%d')

    page_number = 1

    # Boucle qui permet de passer à la page suivante tant qu'il y a des évènements
    while True:

        url = f'{base_url}/s.htm?day={current_date}&gp=1&Lpg={page_number}'

        page = requests.get(url)
        soup = BeautifulSoup(page.content, "html.parser")

        # Récupère le lien qui mène à l'évènement
        events = soup.select('.leTitre a')

        if len(events) == 0 : break

        # Boucle qui permet de scraper tous les évènements de la date donnée
        for event in events:
            event_url = f'{base_url}{event["href"]}'
            name = event.text

            event_page = requests.get(event_url)
            event_soup = BeautifulSoup(event_page.content, "html.parser")

            # On récupère toutes les informations qui nous interessent sur l'élément
            place_element = event_soup.select_one('.lieug a')
            place = place_element.text if place_element else None

            ratingon5_element = event_soup.select_one('.ratingon5')
            ratingon5 = ratingon5_element.text if ratingon5_element else None

            ratingCount_element = event_soup.select_one('[itemprop|="ratingCount"]')
            ratingCount = ratingCount_element.text if ratingCount_element else None

            bravo_percentage = 0
            good_percentage = 0
            average_percentage = 0
            disappointed_percentage = 0

            critbar = event_soup.select_one('#critbar')
            if critbar and len(critbar) > 0 :
                evalutations = critbar.select('b')
                if len(evalutations) == 4 :
                    bravo_percentage = evalutations[0].text.replace('%', '')
                    good_percentage = evalutations[1].text.replace('%', '')
                    average_percentage = evalutations[2].text.replace('%', '')
                    disappointed_percentage = evalutations[3].text.replace('%', '')

            # On stocke dans le tableau toutes les informations de l'évènement
            events_array.append({
                'name': name,
                'date': current_date_string,
                'url': event_url,
                'place': place,
                'ratingon5': ratingon5,
                'ratingCount': ratingCount,
                'bravo_percentage': bravo_percentage,
                'good_percentage': good_percentage,
                'average_percentage': average_percentage,
                'disappointed_percentage': disappointed_percentage
            })

        # On passe à la page suivante
        page_number += 1

    return events_array

À retenir : Testez toujours votre script de base avant d'ajouter la logique Fivetran.

Étape 3 : Adapter pour respecter le format attendu par Fivetran

Une fois, ce script fonctionnel, je vais l’adapter pour qu’il puisse remonter ce dont Fivetran a besoin pour pouvoir exploiter les données remontées. Pour cela, la documentation nous dit qu’il a besoin d’un objet JSON avec plusieurs champs :

  • state : contient le statut de l’exécution. Cela fait office de checkpoint, il sera retourné lors de la prochaine exécution pour reprendre la récupération des données. Cela permet de ne pas avoir à récupérer toutes les données à chaque fois. Cela peut, par exemple, être l’id du dernier élément récupéré lors de l’exécution. Et à la prochaine exécution, on ne remontera que les éléments dont l’id est supérieur à celui-là.
  • insert : spécifie toutes les données à insérer en base de données.
  • delete (optionnel) : spécifie les lignes à supprimer de la base. Fivetran ne les supprime pas réellement mais donne la valeur true au champ _fivetran_deleted .
  • schema (optionnel) : spécifie les clés primaires de chaque objet retourné. Si le schéma n’est pas spécifié, Fivetran va ajouter les données à la table et non les mettre à jour.
  • hasMore : indique à Fivetran si il y a encore des données à remonter. Tant que la valeur de ce champs est à true, Fivetran va rééxecuter la fonction. C’est utile si on a un gros volume de données à remonter.
return {
  "state": {
      # On remonte la date que l'on vient de traiter pour qu'à la prochaine execution, on puisse traiter les jours suivants
      "last_day": current_date_string
  },
  "insert": {
      # On demande à Fivetran d'insérer tous les events scrapés
      "events": events_array
  },
  "schema": {
      "events": {
          # On définit les clés primaires de la table pour que Fivetran puisse updater les données
          "primary_key": ['url', 'date']
      }
  },
  # On indique à Fivetran de rappeler la fonction si on n'a pas encore atteint la date finale définie
  "hasMore" : True if current_date <= end_datetime else False
}

Étape 4 : Finaliser le script en utilisant les données envoyées par Fivetran

Nous allons également modifier le script pour prendre en compte les paramètres envoyés en entrée par Fivetran :

  • agent : un objet informationnel
  • state : l’objet JSON que nous avons envoyé lors du dernier appel au connecteur. Lors de la première synchronisation ou lors d’un full re-sync, cet objet a pour valeur {} (il n’est jamais NULL)
  • secrets (optionnel) : un objet JSON qui contient toutes les informations confidentielles (mots de passe, clés d’API, …). Ces informations sont saisies dans Fivetran lors du paramétrage du connecteur.
  • customPayload (optionnel) : un objet JSON qui est également défini lors du paramétrage du connecteur dans Fivetran.
  • setup_test (optionnel) : un booléen qui indique si c’est un test de connexion du connecteur.
  • sync_id (optionnel) : le Fivetran sync identifier (UUID). Il peut être retrouvé dans les logs d’exécution dans Fivetran. Ça peut aider pour les debugs.
import requests
from bs4 import BeautifulSoup
import datetime

def get_events(r):

    # On récupère l'objet JSON de la requête
    request = r.get_json()

    events_array = []
    base_url = "<https://www.billetreduc.com>"

    start_date = datetime.date.today()
    if 'last_day' in request['state'] :
        start_date = request['state']['last_day']
    start_datetime = datetime.datetime.strptime(start_date, '%Y-%m-%d')

    end_datetime = datetime.datetime.today() + datetime.timedelta(days=10)

    current_date = start_datetime + datetime.timedelta(days=1)
    current_date_string = current_date.strftime('%Y-%m-%d')

    page_number = 1

    # Boucle qui permet de passer à la page suivante tant qu'il y a des évènements
    while True:

        # Si c'est une execution du test, nous ne récupérons pas les données, c'est seulement pour tester la connectivité
        if 'setup_test' in request and request['setup_test'] == True : break

        url = f'{base_url}/s.htm?day={current_date}&gp=1&Lpg={page_number}'

        page = requests.get(url)
        soup = BeautifulSoup(page.content, "html.parser")

        # Récupère le lien qui mène à l'évènement
        events = soup.select('.leTitre a')

        if len(events) == 0 : break

        # Boucle qui permet de scraper tous les évènements de la date donnée
        for event in events:
            event_url = f'{base_url}{event["href"]}'
            name = event.text

            event_page = requests.get(event_url)
            event_soup = BeautifulSoup(event_page.content, "html.parser")

            # On récupère toutes les informations qui nous interessent sur l'élément
            place_element = event_soup.select_one('.lieug a')
            place = place_element.text if place_element else None

            ratingon5_element = event_soup.select_one('.ratingon5')
            ratingon5 = ratingon5_element.text if ratingon5_element else None

            ratingCount_element = event_soup.select_one('[itemprop|="ratingCount"]')
            ratingCount = ratingCount_element.text if ratingCount_element else None

            bravo_percentage = 0
            good_percentage = 0
            average_percentage = 0
            disappointed_percentage = 0

            critbar = event_soup.select_one('#critbar')
            if critbar and len(critbar) > 0 :
                evalutations = critbar.select('b')
                if len(evalutations) == 4 :
                    bravo_percentage = evalutations[0].text.replace('%', '')
                    good_percentage = evalutations[1].text.replace('%', '')
                    average_percentage = evalutations[2].text.replace('%', '')
                    disappointed_percentage = evalutations[3].text.replace('%', '')

            # On stocke dans le tableau toutes les informations de l'évènement
            events_array.append({
                'name': name,
                'date': current_date_string,
                'url': event_url,
                'place': place,
                'ratingon5': ratingon5,
                'ratingCount': ratingCount,
                'bravo_percentage': bravo_percentage,
                'good_percentage': good_percentage,
                'average_percentage': average_percentage,
                'disappointed_percentage': disappointed_percentage
            })

        # On passe à la page suivante
        page_number += 1

    return {
        "state": {
            # On remonte la date que l'on vient de traiter pour qu'à la prochaine execution, on puisse traiter les jours suivants
            "last_day": current_date_string
        },
        "insert": {
            # On demande à Fivetran d'insérer tous les events scrapés
            "events": events_array
        },
        "schema": {
            "events": {
                # On définit les clés primaires de la table pour que Fivetran puisse updater les données
                "primary_key": ['url', 'date']
            }
        },
        # On indique à Fivetran de rappeler la fonction si on n'a pas encore atteint la date finale définie
        "hasMore" : True if current_date <= end_datetime else False
    }

Étape 5 : Mettre en ligne le connecteur

Maintenant que nous remontons l’objet JSON dont Fivetran a besoin, nous avons trois options qui s’offrent à nous pour rendre cette fonction accessible à Fivetran : AWS Lambda, Azure Functions ou Google Cloud Functions.

Le fonctionnement est très similaire entre les 3 fournisseurs mais ici, je vais utiliser Google Cloud Function et suivre les différentes étapes de configuration présentes dans la documentation. Je ne vais pas rentrer dans le détail de la configuration car c’est propre à l’environnement GCP et que tout est expliqué dans la documentation.

Après la configuration, nous avons une fonction Google Cloud accessible par Fivetran. La configuration dans Fivetran est minimaliste car nous n’avons aucun mot de passe ni payload à passer à la fonction.

!/img/tuto-developper-un-connecteur-custom-pour-fivetran/fivetran_custom_connector.png

Nous pouvons tester la fonction (ça appellera la fonction avec setup_test à True) et maintenant utiliser ce connecteur comme tout autre connecteur Fivetran et profiter des fonctionnalités de l’outil (synchronisation, logs, alertes, champs Fivetran dans la table, …).

!/img/tuto-developper-un-connecteur-custom-pour-fivetran/bigquery_data.png

Bonus : Quel spectacle aller voir ?

Maintenant que ces données remontent automatiquement dans BigQuery, j’ai développé une petite visualisation dans Tableau Software pour pouvoir identifier les meilleurs spectacles du moment et choisir ma sortie de samedi (oui, je me sers vraiment de ces données …).

!/img/tuto-developper-un-connecteur-custom-pour-fivetran/tableau_visualization.png

Mon objectif ici était de vous présenter la logique de création de connecteur custom dans Fivetran. J’espère que c’est maintenant plus clair pour vous si vous ne connaissiez pas cette possibilité. Vos besoins ne concerneront sûrement pas la récupération des données BilletReduc mais la logique de développement est la même qu’il s’agisse de récupérer les données via du Web Scrapping ou encore des appels API.

La logique est facilement compréhensible mais comme toujours dans ce domaine, il y a beaucoup d’obstacles qui peuvent complexifier le développement de ces connecteurs :

  • Timeout des Cloud Functions
  • Taille maximale des objets renvoyés par les Cloud Functions
  • Gestion des erreurs de l’API requêtée

Je couvrirai sûrement ces aspects dans le futur mais si vous avez déjà envie de discuter de vos problématiques Data, n’hésitez pas à me contacter pour discuter plus en détail de vos projets.