Google CloudOneMap APIPowerBI

Data Pipeline on G Cloud

By Lim Rong Yi
Picture of the author
Published on
Duration
1 month
Role
Data Engineer
Landing page
Landing page
Map of Singapore
Map of Singapore

Project Overview

As a real estate company, we aim to better advise our clients on their property decisions. To do so, we need to have a better understanding of the property market. This project aims to ingest, process and store HDB resale data on GCP, and visualize the data using PowerBI.


One Step Further

Instead of simply downloading the data from data.gov.sg and loading a static CSV file, we decided to do the following:

  • Create an automated ingestion pipeline to get the most updated data daily.
  • Use the OneMap API to get the latitude and longitude of each HDB block. This allows us to plot the data on a map, and to get the nearest MRT station.

Automated Ingestion Pipeline

Proposed Pipeline
Proposed Pipeline

During development, the team faced a few challenges:

  1. Query limit of 15 rows when scraping the table from data.gov.sg.
  2. Initial mapping of HDB blocks to latitude and longitude took 5-6 hours, even with parallel processing due to rate limiting. (However we only need to do this once)
  3. Pushing the data to Google Storage Bucket required a very specific method of authentication.
  4. Scheduling the job to run daily.

Derived Workaround and Code

  1. To bypass the query limit, we modified our code to download the CSV files and performed data wrangling in Python.
  2. Subsequent mapping of the coordinates took about 20 - 30 seconds, as we only need to map the new blocks.
  3. We used a service account to authenticate the Google Storage Bucket.
  4. The job was deployed on PythonAnywhere, and scheduled to run daily.
1. Downloading the CSV file
def run_hdb_scrape():
    # download latest csv
    print("Downloading File..")
    download_url = "https://data.gov.sg/dataset/7a339d20-3c57-4b11-a695-9348adfd7614/download"
    response = requests.get(download_url)

    with open('resale-flat-prices.zip', 'wb') as f:
        f.write(response.content)

    # unzip the file
    with zipfile.ZipFile('resale-flat-prices.zip', 'r') as zip_ref:
        zip_ref.extractall('.')

    # delete the zip file
    os.remove('resale-flat-prices.zip')
    print("Zip file removed")

    # delete the old csv files
    filenames_to_remove = ['resale-flat-prices-based-on-approval-date-1990-1999.csv','resale-flat-prices-based-on-approval-date-2000-feb-2012.csv','resale-flat-prices-based-on-registration-date-from-jan-2015-to-dec-2016.csv','resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.csv',
    'metadata-resale-flat-prices.txt'
                      ]

    for filename in filenames_to_remove:
        os.remove(filename)

    print("Files removed")

    os.rename('resale-flat-prices-based-on-registration-date-from-jan-2017-onwards.csv', 'hdb_resale_latest.csv')
2. Prepare the dataset for mapping
    # load the csv
    df_hdb_resale_latest = pd.read_csv('hdb_resale_latest.csv', low_memory=False)
    df_hdb_resale_current = pd.read_csv('hdb_resale.csv', low_memory=False)
    df_mrt_lrt = pd.read_csv('mrt_lrt_data.csv', low_memory=False)

    # create columns to match the current csv
    df_hdb_resale_latest['_id'] = df_hdb_resale_latest.index + 1
    df_hdb_resale_latest.sort_values(by=['_id'], ascending=False, inplace=True)
    df_hdb_resale_latest['Address'] = df_hdb_resale_latest['block'] + " " + df_hdb_resale_latest['street_name']

    # append to current df_resale
    df_hdb_resale_combined = pd.concat([df_hdb_resale_latest, df_hdb_resale_current])

    # drop duplicates
    df_hdb_resale_combined.drop_duplicates(subset=['_id'], keep='last', inplace=True)
3. Map the coordinates, postal code and nearest MRT station
    # Set up scraping parameters
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[ 500, 502, 503, 504 ])
    session.mount('http://', HTTPAdapter(max_retries=retries))
    session.mount('https://', HTTPAdapter(max_retries=retries))

    def geocode_one_map(address):
        session = requests.Session()
        req = session.get('https://developers.onemap.sg/commonapi/search', params={'searchVal': address, 'returnGeom': 'Y', 'getAddrDetails': 'Y', 'pageNum': 1})
        resultsdict = req.json()
        if resultsdict['found'] != 0:
            lat = resultsdict['results'][0]['LATITUDE']
            lng = resultsdict['results'][0]['LONGITUDE']
            postal_code = resultsdict['results'][0]['POSTAL']
            return lat, lng, postal_code
        return None, None, None

    # Filter rows where 'Latitude' is null
    null_rows = df_hdb_resale_combined[df_hdb_resale_combined['Latitude'].isnull()]

    try:
        # Filter rows where 'Latitude' is null
        null_rows = df_hdb_resale_combined[df_hdb_resale_combined['Latitude'].isnull()]

        # Apply geocoding function to the address ('block' and 'street_name') to get latitude, longitude, and postal code
        null_rows[['Latitude', 'Longitude', 'Postal Code']] = null_rows.apply(lambda row: pd.Series(geocode_one_map(f"{row['block']} {row['street_name']}")), axis=1)

        # Tag each row to the nearest MRT/LRT Station
        for home_index, home_row in null_rows.iterrows():
            min_distance = float('inf')
            nearest_station = None

            hdb_coord = (home_row['Latitude'], home_row['Longitude'])

            for train_index, train_row in df_mrt_lrt.iterrows():
                mrt_coord = (train_row['lat'], train_row['lng'])
                distance = geodesic(hdb_coord, mrt_coord).km

                if distance < min_distance:
                    min_distance = round(distance, 2)
                    nearest_station = train_row['station_name']

            null_rows.at[home_index, 'nearest_station'] = nearest_station
            null_rows.at[home_index, 'min_distance_km'] = min_distance

        # Update the original DataFrame with the updated values
        df_hdb_resale_combined.update(null_rows)

    # if there are no null values
    except ValueError:
        pass

    print("Lat, Long, postal code, nearest MRT station & distance mapped.")
4. Clean Up
    # Convert the 'Postal Code' column to string
    df_hdb_resale_combined['Postal Code'] = df_hdb_resale_combined['Postal Code'].astype(str)
    df_hdb_resale_combined['Postal Code'] = df_hdb_resale_combined['Postal Code'].apply(lambda postal_code: postal_code.zfill(6))
    df_hdb_resale_combined.to_csv('hdb_resale.csv', index=False)
    os.remove('hdb_resale_latest.csv')
    print("Working files removed.")
5. Push to Google Storage Bucket
    # upload to google cloud storage
    print("Uploading to cloud now...")
    storage_client = storage.Client.from_service_account_json('client-secret.json')
    bucket = storage_client.bucket('hdb-resale-bucket')
    blob = bucket.blob("hdb_resale.csv")
    blob.upload_from_filename('hdb_resale.csv')
    print("HDB Resale data updated.")

Data Visualisation

With the coordinates, we can now plot the visualisations:

Mapping of Coordinates (white dots represent MRT/LRT stations)
Mapping of Coordinates (white dots represent MRT/LRT stations)
Main PBI Dashboard
Main PBI Dashboard

By the main landing dashboard alone, it can be inferred that URA, HDB has meticulously planned the distribution of the HDB flats, to ensure that each area has its own cluster of various flat types, with a certain ratio.

It's also interesting to point out the "herd" mentality, where resale units are usually in the same cluster, almost like a "domino effect".

Breakdown of Transactions
Breakdown of Transactions
Patterns & Trends
Patterns & Trends

Another point worth mentioning was that the resale prices generally move in parallel, and every region follows the same trend. While the base price differs by location, the average and median price gap between each region stays roughly the same throughout 2017 – 2023.

Using the average sales, we know that:

2017201820192020202120222023
Central524,403533,315515,306520567588406618428633013
East454,733460,398443,078460115513591551265568518
North375,136365,664371,013391788452304502011519995
NE456,614453,761450,072464441512013560210582892
West409,783404,422401,204426903484261522981548177

Calculating the gap between year region for 2023, taking North as base 100% (for calculation convenience in +ve rather than -ve), we get the following:

CentralEastNEWest
Gap vs North+21.73%+9.33%+5.42%+12.09%

A simple formula to calculate can be as such:

Predicted=Base Value+(Gap*Base Value)+(Growth Rate (*±2.25)*Base Value)

To account for the growth, we can calculate the Year-over-year (YoY) as such in PowerBI:

YoY Sales = 
VAR sales_amt = [avg_sales]
VAR current_year = MAX(Query1[year])
VAR previous_sales = CALCULATE( [avg_sales], Query1[year] = current_year -1 )
VAR YoYGrowth = 
    DIVIDE (
        sales_amt - previous_sales, previous_sales
    )

RETURN 
IF (
    HASONEVALUE( Query1[year] ), 
    IF (
        NOT ISBLANK( sales_amt ) && NOT ISBLANK( previous_sales ), YoYGrowth
    )
)

Table of YoY Sales:

(%)201820192020202120222023
Central+0.02-0.03+0.01+0.13+0.05+0.02
East+0.01-0.04+0.04+0.12+0.07+0.03
North-0.030.01+0.06+0.15+0.11+0.04
NE-0.01-0.01+0.03+0.10+0.09+0.04
West-0.01-0.01+0.06+0.13+0.08+0.05

We can now provide an estimate of the average sales for 2023:

20232024 (High)2024 (Low)
Central633,013656,389609,590
East568,518603610533,410
North519,995566,794473,195
NE582,892629,661536,062
West548,177606,678489,679

Learning Points

Through this project, the team was able to observe and apply the concept of “open innovation” in practice, using real world data and examples. For example, Chesbrough noted that open innovation is essential to solve complex problems by introducing suggestions, data, ideas, or people from entirely outside of the organization (2004). In this project, the team used OneMap’s open data to map the coordinates and postal code to the HDB units.

Without the availability of such resources, it will be impossible to always get the latest data with the mapped coordinates. In an organizational context, it could cost them financial loss in terms of man-hours committed to manually mapping this data.

In the personal perspective, despite the unforeseen circumstances, I am glad that I managed to complete this project. I have learnt a lot from this project, especially on how important it is to ensure that the data is properly ingested and ready for downstream analysis. I have also applied what I learnt in Digipen on data engineering to a real-world project.