Skip to content
Insights

Analysing NYC Taxi Data with PostgreSQL, TimescaleDB and Python

Introduction

In this blog, I will demonstrate how to ingest and analyze NYC Taxi trip data using PostgreSQL and TimescaleDB. We’ll explore some tips and tricks along the way and highlight the differences and potential advantages of using TimescaleDB over pure PostgreSQL in this scenario.

I’ll show that TimescaleDB can achieve 2x faster performance and a 3x lower storage requirement in a simple test case, all without any code modifications. Additionally, we’ll explore how to leverage the advanced features of PostgreSQL in conjunction with TimescaleDB to enhance system capabilities.

Background

PostgreSQL is the most fully featured and extensible open-source database available today. It has evolved beyond a traditional RDBMS to become a multi-modal engine, offering mature support for documents, geo-spatial data, and most recently, vectors to support AI applications via embedding search with pg_vector.

The variety of extensions is extensive and continually growing, thanks to the PostgreSQL core team’s long-standing efforts to enable extensibility across datatypes, table access methods (including indexes), and server-side integration.

In my view, PostgreSQL should be the default choice for new data applications until proven otherwise. There’s a significant trend among startups and scaleups, such as Supabase, Tembo, Neon, and Electric, centering around PostgreSQL.

This brings us to the core theme of this blog: Timescale. TimescaleDB, developed by Timescale, is an open-source PostgreSQL extension that adds advanced time-series data capabilities to PostgreSQL with a simple CREATE EXTENSION timescaledb command.

TimescaleDB introduces a suite of new capabilities for working with time-series data, including:

  • Transparent data compression
  • Rich time-series analysis functions in SQL
  • Continuous aggregates for efficient time-series aggregate views
  • Data retention and disposal utilities

I will guide you step-by-step through the setup and ingestion process, showcasing some of the key advantages of TimescaleDB and how additional value can be added by integrating other PostgreSQL extensions and capabilities.

Setup

For this experiment, I’m using a Docker Compose stack that includes PostgreSQL, TimescaleDB, and a single psql container for connecting to each database.

Here is a simplified version of the docker-compose.yml file.

--- services: pg: shm_size: 2gb build: context: postgres volumes: - ./data/:/data - ./postgres/pg_hba.conf:/var/lib/postgresql/pg_hba.conf - postgres-sockets:/var/run/postgresql ts: shm_size: 2gb build: context: timescale volumes: - ./data/:/data - ./timescale/pg_hba.conf:/var/lib/postgresql/pg_hba.conf - timescale-sockets:/var/run/postgresql - postgres-sockets:/pg_sockets psql: build: context: psql environment: PGUSER: postgres PGDATABASE: postgres PGHOST: /pg_sockets volumes: - ./data/:/data - ./scripts/:/scripts - postgres-sockets:/pg_sockets - timescale-sockets:/timescale_sockets command: "echo psql ready for manual load" volumes: postgres-sockets: timescale-sockets:

Above is also a trick to access each database using a unix socket, mounted on shared docker volumes. This enables slightly better performance and passwordless access within the docker stack for convenience, and additionally means no database passwords are stored in the docker-compose.yml or environment variables. The custom pg_hba.conf files are used in place to enable the unix socket access.

Yellow Taxi Trip Data

I’ll be downloading and ingesting the yellow trip taxi data for 2023. These are distributed as parquet files.

Schema

Having looked at the parquet headers, I’ll create the PostgreSQL schema as below:

create table yellow_tripdata ( vendorid int, tpep_pickup_datetime timestamptz not null, tpep_dropoff_datetime timestamptz, passenger_count int, trip_distance double precision, ratecodeid double precision, store_and_fwd_flag char(1), pulocationid int, dolocationid int, payment_type int, fare_amount double precision, extra double precision, mta_tax double precision, tip_amount double precision, tolls_amount double precision, improvement_surcharge double precision, total_amount double precision, congestion_surcharge double precision, airport_fee double precision );

For TimescaleDB, there is an extra step to create a hypertable:

select create_hypertable('yellow_tripdata','tpep_pickup_datetime');

This creates a hypertable partitioned on the tpep_pickup_datetime column. TimescaleDB will automatically create partitions for each time interval, which defaults to 7 day intervals.

Data Loading

I’ll load these into PostgreSQL using psql’s \copy from program. In general, using copy or \copy is an efficient way to bulk load data in PostgreSQL tables. There are some slightly faster more complex methods (e.g. pgloader) if you need maximum loading speed. PGLoader is not only a bit faster than copy but contains a number of features for data transformation and error handling. PostgreSQL 17 has caught up a little bit in the error handling stakes, with the long awaited for ON_ERROR clause which means \copy can finally handle bad rows in the input data.

I created a python script using pyarrow to crack open the parquet files and steam over stdout to psql like so:

#!/bin/bash for f in $(find /data -type f -name yellow_trip*.parquet | sort); do echo "loading $f" time /venv/bin/python /scripts/pq-csv.py $f | psql -c "\copy yellow_tripdata from pstdin with (format csv,header true)" done time psql -c "analyze;"

I initially used the python parquet-tools package to convert the parquet files to csv but found it was very slow. Using pyarrow is significantly faster.

Here is the key snippet of the python script:

import pyarrow.parquet as pq import pyarrow.csv as pc def parquet_to_csv_stream(parquet_file, csv_file): table = pq.read_table(parquet_file) with open(csv_file, 'wb') if csv_file != '-' else sys.stdout.buffer as f: pc.write_csv(table, f)

Notice that the files are loaded in order of months ascending, as it’s typically better to load time series data in time order when applying any indexing or compression, as we will later.

Initial Performance Test

Now we have 38 million rows of data loaded into PostgreSQL and TimescaleDB. For an initial performance test, we’ll query the data for the total number of trips per hour in 2023 like so:

select date_trunc('hour', tpep_pickup_datetime) as hour, count(*) from yellow_tripdata where tpep_pickup_datetime >= '2023-01-01 00:00:00' and tpep_pickup_datetime <= '2023-12-31 23:00:00' group by 1 order by 1;

Visualisation

Let’s have a quick look at this data as a heatmap. Each week is a row and each hour of the week is a column. The colour of each cell represents the number of trips in that hour of that hour.

Trips per hour heatmap

Aside from a few missing data points (particularly in week 38), we can spot a few key trends by eye.

  1. Sundays (hours 144-168) are generally quiter
  2. Thursday evenings (hours 96-120) are generally the busiest
  3. A slowdown in week 47 for Thanksgiving is evident
  4. The summer lull from weeks 27 to 36 is visible
  5. The Christmas week slowdown and new year spike are visible

For the remainder of this blog we’ll focus more on the database side as opposed to the data analysis.

## PostgreSQL In PostgreSQL this query takes ~3.5 seconds to run on my laptop over 5 runs as shown below. ```bash Timing PostgreSQL query Time: 3539.862 ms (00:03.540) Time: 3896.645 ms (00:03.897) Time: 3298.131 ms (00:03.298) Time: 3441.904 ms (00:03.442) Time: 3361.824 ms (00:03.362)

The abbrievated query plan is shown below (The explain plan times are different from the raw times due to timing overheads):

Sort (cost=2778006.08..2809465.97 rows=12583953 width=16) (actual time=5875.667..5876.352 rows=8758 loops=1) Output: (date_trunc('hour'::text, tpep_pickup_datetime)), (count(*)) Sort Key: (date_trunc('hour'::text, yellow_tripdata.tpep_pickup_datetime)) Sort Method: quicksort Memory: 727kB Buffers: shared hit=3 read=777740 -> HashAggregate (cost=1136738.86..1294038.28 rows=12583953 width=16) (actual time=5840.385..5873.989 rows=8758 loops=1) Output: (date_trunc('hour'::text, tpep_pickup_datetime)), count(*) Group Key: (date_trunc('hour'::text, yellow_tripdata.tpep_pickup_datetime)) Batches: 1 Memory Usage: 394257kB Buffers: shared read=777740 -> Gather (cost=0.00..945288.12 rows=38290148 width=8) (actual time=86.859..1989.899 rows=38306367 loops=1) Output: (date_trunc('hour'::text, tpep_pickup_datetime)) Workers Planned: 4 Workers Launched: 4 Buffers: shared read=777740 -> Parallel Seq Scan on public.yellow_tripdata (cost=0.00..945288.12 rows=9572537 width=8) (actual time=65.954..2134.525 rows=7661273 loops=5) Output: date_trunc('hour'::text, tpep_pickup_datetime) Filter: ((yellow_tripdata.tpep_pickup_datetime >= '2023-01-01 00:00:00+00'::timestamp with time zone) AND (yellow_tripdata.tpep_pickup_datetime <= '2023-12-31 23:00:00+00'::timestamp with time zone)) Rows Removed by Filter: 772 Buffers: shared read=777740 Worker 0: actual time=59.804..2658.429 rows=9185268 loops=1 JIT: Functions: 4 Options: Inlining true, Optimization true, Expressions true, Deforming true Timing: Generation 0.301 ms, Inlining 46.205 ms, Optimization 7.659 ms, Emission 5.882 ms, Total 60.047 ms Buffers: shared read=186433 Worker 1: actual time=61.590..2631.029 rows=9407126 loops=1 ........

4 parallel workers are launched to scan the table in an average of 2.1 seconds each and the rest of the time is spent in the aggregate and sorting. The majority of the time is spent in the Parallel Seq Scan, which is expected as we are reading almost every row in the table (except for a few bad rows with dates outside the range).

TimescaleDB

So now we’ll try exactly the same query on in a TimescaleDB Hypertable. Here are the timings from 5 runs:

Time: 3913.671 ms (00:03.914) Time: 3696.624 ms (00:03.697) Time: 3577.944 ms (00:03.578) Time: 3870.301 ms (00:03.870) Time: 3719.977 ms (00:03.720)

It’s actually a little slower! This is because we forgot to take an important step that is fundamental to the key value proposition of TimescaleDB. TimescaleDB hypertables can implement transaparent compression, which is particualarly suitable for compression because it is generally immutable after it has been loaded. This means older chunks can be effectively “frozen” and compressed into much smaller chunks. Given that in an aggregate query like this we read a lot of rows, the savings on IO bandwidth can be significant and not only reduce the storage cost but also increase the performance of these queries, even with the CPU overhead of decompression!

However, we need to either create a policy for compression, or explicitly compression the hypertable chunks. Thankfully, this is very straightforward and is implemented in pure SQL, like so:

select compress_chunk((chunk_schema || '.' || chunk_name)::regclass) from timescaledb_information.chunks where is_compressed is false and hypertable_name = 'yellow_tripdata';

Having compressed the columns, now we will retry the same query:

Time: 1781.096 ms (00:01.781) Time: 1876.659 ms (00:01.877) Time: 1951.794 ms (00:01.952) Time: 2034.039 ms (00:02.034) Time: 2095.667 ms (00:02.096)

Much better! Now around ~1.9 seconds, almost 2x faster than plain postgres! If we look at a fragment of the new query plan we can see the Timescale magic that enables this.

Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_2_90_chunk (cost=0.18..69.69 rows=379000 width=8) (actual time=64.068..108.231 rows=644782 loops=1) Output: _hyper_2_90_chunk.tpep_pickup_datetime Vectorized Filter: ((_hyper_2_90_chunk.tpep_pickup_datetime >= '2023-01-01 00:00:00+00'::timestamp with time zone) AND (_hyper_2_90_chunk.tpep_pickup_datetime <= '2023-12-31 23:00:00+00'::timestamp with time zone)) Bulk Decompression: true Buffers: shared hit=1273 read=1376 Worker 2: actual time=64.068..108.231 rows=644782 loops=1 Buffers: shared hit=1273 read=1376

TimescaleDB implements a custom scan to decompress the chunks on the fly, saving a lot of IO bandwidth and hence time. This is a great example of how the extensibility of PostgreSQL can be leveraged, in this case using the table access method API.

Storage

Given the often eye-watering cost of cloud storage, reducing the amout of storage required by your database can be a significant cost saving. Below is a comparison of the storage used by PostgreSQL and TimescaleDB for this year of Yellow Taxi data.

PostgreSQL Table Size pg_size_pretty ---------------- 6078 MB (1 row) Compressed TimescaleDB Table Size pg_size_pretty ---------------- 1866 MB (1 row)

The TimescaleDB storage requirements are 3.25x smaller! Imagine being able reduce your storage cost ~2/3 and get a 2x performance improvement at the same time! TimescaleDB has many cases where the performance improvements can be even higher, (e.g. continous aggregates as opposed to refreshed materialised views) but the fact that simple table scans are so much faster is incredibly powerful.

Archiving and Cold Storage

When working with time series data, it is often the case that the most recent data is the most frequently accessed and older data is required less frequently. TimescaleDB makes this easy with the concept of data retention policies. We can move the older chunks to a different PostgreSQL tablespace, or on to a different storage location entirely such as AWS S3 or cheaper slow storage. Some of the features are native to Timescale Cloud, but we can also develop similar features in self hosted TimescaleDB, given that the extension itself is open source.

Below I will show one method to manage older chunks via archiving to parquet files, but also make these older chunks available for query if necessary via a PostgreSQL Foreign Data Wrapper (FDW)!. This is a powerful feature of PostgreSQL that enables you to query data from a remote source as if it were a local table. This also illustrates how the the unique features of TimescaleDB can be combined with the extensibility of PostgreSQL to create a powerful data management solution.

Archiving to Parquet

For this example I have loaded 3 months of Yellow Taxi data from 2023, which gives us 20 7-day chunks with the default chunk policy (there are some bad rows outside the expected range in the source data).

postgres=# select * from show_chunks('yellow_tripdata'); ┌─────────────────────────────────────────┐ │ show_chunks │ ├─────────────────────────────────────────┤ │ _timescaledb_internal._hyper_1_1_chunk │ │ _timescaledb_internal._hyper_1_2_chunk │ │ _timescaledb_internal._hyper_1_3_chunk │ │ _timescaledb_internal._hyper_1_4_chunk │ │ _timescaledb_internal._hyper_1_5_chunk │ │ _timescaledb_internal._hyper_1_6_chunk │ │ _timescaledb_internal._hyper_1_7_chunk │ │ _timescaledb_internal._hyper_1_8_chunk │ │ _timescaledb_internal._hyper_1_9_chunk │ │ _timescaledb_internal._hyper_1_10_chunk │ │ _timescaledb_internal._hyper_1_11_chunk │ │ _timescaledb_internal._hyper_1_12_chunk │ │ _timescaledb_internal._hyper_1_13_chunk │ │ _timescaledb_internal._hyper_1_14_chunk │ │ _timescaledb_internal._hyper_1_15_chunk │ │ _timescaledb_internal._hyper_1_16_chunk │ │ _timescaledb_internal._hyper_1_17_chunk │ │ _timescaledb_internal._hyper_1_18_chunk │ │ _timescaledb_internal._hyper_1_19_chunk │ │ _timescaledb_internal._hyper_1_20_chunk │ └─────────────────────────────────────────┘ (20 rows) Time: 1.384 ms

We can also see the start and end ranges of the chunks using the timescaledb_information schema (the bad data is now more obvious):

postgres=# select chunk_schema, chunk_name, range_start, range_end from timescaledb_information.chunks where hypertable_name = 'yellow_tripdata' order by range_start; ┌───────────────────────┬───────────────────┬────────────────────────┬────────────────────────┐ │ chunk_schema │ chunk_name │ range_start │ range_end │ ├───────────────────────┼───────────────────┼────────────────────────┼────────────────────────┤ │ _timescaledb_internal │ _hyper_1_16_chunk │ 2000-12-28 00:00:00+00 │ 2001-01-04 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_15_chunk │ 2002-12-26 00:00:00+00 │ 2003-01-02 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_3_chunk │ 2008-12-25 00:00:00+00 │ 2009-01-01 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_10_chunk │ 2009-01-01 00:00:00+00 │ 2009-01-08 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_19_chunk │ 2014-11-13 00:00:00+00 │ 2014-11-20 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_2_chunk │ 2022-10-20 00:00:00+00 │ 2022-10-27 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_1_chunk │ 2022-12-29 00:00:00+00 │ 2023-01-05 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_4_chunk │ 2023-01-05 00:00:00+00 │ 2023-01-12 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_5_chunk │ 2023-01-12 00:00:00+00 │ 2023-01-19 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_6_chunk │ 2023-01-19 00:00:00+00 │ 2023-01-26 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_7_chunk │ 2023-01-26 00:00:00+00 │ 2023-02-02 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_8_chunk │ 2023-02-02 00:00:00+00 │ 2023-02-09 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_9_chunk │ 2023-02-09 00:00:00+00 │ 2023-02-16 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_11_chunk │ 2023-02-16 00:00:00+00 │ 2023-02-23 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_12_chunk │ 2023-02-23 00:00:00+00 │ 2023-03-02 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_13_chunk │ 2023-03-02 00:00:00+00 │ 2023-03-09 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_14_chunk │ 2023-03-09 00:00:00+00 │ 2023-03-16 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_17_chunk │ 2023-03-16 00:00:00+00 │ 2023-03-23 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_18_chunk │ 2023-03-23 00:00:00+00 │ 2023-03-30 00:00:00+00 │ │ _timescaledb_internal │ _hyper_1_20_chunk │ 2023-03-30 00:00:00+00 │ 2023-04-06 00:00:00+00 │ └───────────────────────┴───────────────────┴────────────────────────┴────────────────────────┘ (20 rows) Time: 5.083 ms

Let’s imagine we want to archive chunk older than 2023-01-01. We can achieve this with the following SQL snippet from a wrapper function I wrote called archive_old_chunks_to_parquet():

for chunk_schema, chunk_name, range_start, range_end in ( select chunk.chunk_schema, chunk.chunk_name, to_char(chunk.range_start, 'YYYY-MM-DD_HH24-MI-SS') as range_start, to_char(chunk.range_end, 'YYYY-MM-DD_HH24-MI-SS') as range_end from show_chunks(_hypertable_name, older_than => _archive_older_than) c inner join timescaledb_information.chunks chunk on c = (chunk.chunk_schema || '.' || chunk.chunk_name)::regclass ) loop raise notice 'Archiving chunk %.% from % to %', chunk_schema, chunk_name, range_start, range_end; execute format($c$ copy (select * from %2$I.%3$I) to program '/venv/bin/python /tmp/csv_to_parquet.py %1$s %6$s-%4$s-%5$s.parquet' with (format csv, header true); $c$, _archive_dir, chunk_schema, chunk_name, range_start, range_end, _hypertable_name); end loop; for deleted_chunk in (select d::text from drop_chunks(_hypertable_name, older_than => _archive_older_than) d) loop raise notice 'Dropped chunk %', deleted_chunk; end loop;

Here we leverage the older_than parameter of the show_chunks and drop_chunks functions to select and drop the older chunks. We then use the copy to program feature of PostgreSQL to stream the chunk data to a python script that writes the data to a parquet file with the time ranges included in the filename.

Here is what this looks like if we archive the chunks older than 2023-01-01 using the utility function:

postgres=# select archive_old_chunks_to_parquet('yellow_tripdata','2023-01-01','/tmp/data/'); NOTICE: Archiving chunk _timescaledb_internal._hyper_1_2_chunk from 2022-10-20_00-00-00 to 2022-10-27_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_3_chunk from 2008-12-25_00-00-00 to 2009-01-01_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_10_chunk from 2009-01-01_00-00-00 to 2009-01-08_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_15_chunk from 2002-12-26_00-00-00 to 2003-01-02_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_16_chunk from 2000-12-28_00-00-00 to 2001-01-04_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_19_chunk from 2014-11-13_00-00-00 to 2014-11-20_00-00-00 NOTICE: Dropped chunk _timescaledb_internal._hyper_1_2_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_3_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_10_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_15_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_16_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_19_chunk ┌───────────────────────────────┐ │ archive_old_chunks_to_parquet │ ├───────────────────────────────┤ │ │ └───────────────────────────────┘ (1 row) Time: 1982.026 ms (00:01.982) postgres=# select count(*) from show_chunks('yellow_tripdata'); ┌───────┐ │ count │ ├───────┤ │ 14 │ └───────┘ (1 row) Time: 0.790 ms

And now we have 14 chunks remaining in our hypertable and 6 chunks archived to parquet files. Thankfully, the drop_chunks function is transactional, as with all PostgreSQL DDL, so if there is an error in the archiving process, the chunks will not be dropped and the data will remain in the hypertable.

Querying Archived Data

So what if we want to be able to query the archived data? We can use the parquet_s3_fdw extension to achieve this. This extension can read from the local filesystem or S3 but in this instance we will test use the local filesystem.

First we install the extension and create a server and schema from the foreign tables, where each parquet file is a table:

create extension parquet_s3_fdw; create server parquet_s3_srv foreign data wrapper parquet_s3_fdw; create schema parquet_files; IMPORT FOREIGN SCHEMA "/tmp/data/" FROM SERVER parquet_s3_srv INTO parquet_files;

This will populate the schema parquet_files with foreign tables for each parquet file in the directory /tmp/data/ where we just acrhived the 6 chunks.

And now we can see each foreign table in the schema:

postgres=# select table_name from information_schema.tables where table_schema = 'parquet_files'; ┌─────────────────────────────────────────────────────────┐ │ table_name │ ├─────────────────────────────────────────────────────────┤ │ yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00 │ │ yellow_tripdata-2022-10-20_00-00-00-2022-10-27_00-00-00 │ │ yellow_tripdata-2000-12-28_00-00-00-2001-01-04_00-00-00 │ │ yellow_tripdata-2014-11-13_00-00-00-2014-11-20_00-00-00 │ │ yellow_tripdata-2009-01-01_00-00-00-2009-01-08_00-00-00 │ │ yellow_tripdata-2008-12-25_00-00-00-2009-01-01_00-00-00 │ └─────────────────────────────────────────────────────────┘ (6 rows)

We can query these parquet files now as if they were normal tables, but there is one issue, the data type discovery is not automatic here. As we can see below, the default data types are all text, which is not ideal and means we can’t simply “union” these with the main hypertable.

postgres=# select * from parquet_files."yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00" limit 1; ┌─[ RECORD 1 ]──────────┬────────────────────────┐ │ vendorid │ 2 │ │ tpep_pickup_datetime │ 2003-01-01 00:48:43+00 │ │ tpep_dropoff_datetime │ 2003-01-01 01:01:48+00 │ │ passenger_count │ 3 │ │ trip_distance │ 3.29 │ │ ratecodeid │ 1 │ │ store_and_fwd_flag │ N │ │ pulocationid │ 231 │ │ dolocationid │ 40 │ │ payment_type │ 2 │ │ fare_amount │ 17.7 │ │ extra │ 0 │ │ mta_tax │ 0.5 │ │ tip_amount │ 0 │ │ tolls_amount │ 0 │ │ improvement_surcharge │ 1 │ │ total_amount │ 21.7 │ │ congestion_surcharge │ 2.5 │ │ airport_fee │ 0 │ └───────────────────────┴────────────────────────┘ Time: 3.483 ms postgres=# \gdesc ┌───────────────────────┬──────┐ │ Column │ Type │ ├───────────────────────┼──────┤ │ vendorid │ text │ │ tpep_pickup_datetime │ text │ │ tpep_dropoff_datetime │ text │ │ passenger_count │ text │ │ trip_distance │ text │ │ ratecodeid │ text │ │ store_and_fwd_flag │ text │ │ pulocationid │ text │ │ dolocationid │ text │ │ payment_type │ text │ │ fare_amount │ text │ │ extra │ text │ │ mta_tax │ text │ │ tip_amount │ text │ │ tolls_amount │ text │ │ improvement_surcharge │ text │ │ total_amount │ text │ │ congestion_surcharge │ text │ │ airport_fee │ text │ └───────────────────────┴──────┘ (19 rows)

But we can fix this by apply casts from the main hypertable to the foreign tables and build a “casted” view of the parquet files like so (again from a wrapper function called create_casted_view_of_parquet()):

select ('create or replace view ' || view_name || ' as select ' || string_agg( format('%1$I::%2$s', column_name, data_type), E',\n' order by ordinal_position ) || E'\nfrom ' || foreign_schema || '.' || quote_ident(foreign_table_name) ) into view_definition from information_schema.columns where table_name = internal_table_name;

Once the view is created we can then union each view which the main hypertable to create a single view of the data using this SQL snippet from the wrapper function build_union_all_view_of_parquet():

select 'create or replace view ' || view_name || ' as select * from ' || internal_table_name || string_agg( create_casted_view_of_parquet( internal_table_name, foreign_schema, t.table_name ), E'\nunion all select * from ' ) into view_definition from information_schema.tables t where table_schema = foreign_schema and t.table_name ~* internal_table_name;

So we when put everything together, we can very simply:

  1. Archive older chunks to parquet files
  2. Create a schema of foreign tables from the parquet files
  3. Create a view of each parquet file with the correct data types
  4. Create a view that unions all the parquet views with the main hypertable

And here is how that looks when executed:

postgres=# select archive_old_chunks_to_parquet('yellow_tripdata','2023-01-01','/tmp/data/'); NOTICE: Archiving chunk _timescaledb_internal._hyper_1_2_chunk from 2022-10-20_00-00-00 to 2022-10-27_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_3_chunk from 2008-12-25_00-00-00 to 2009-01-01_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_10_chunk from 2009-01-01_00-00-00 to 2009-01-08_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_15_chunk from 2002-12-26_00-00-00 to 2003-01-02_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_16_chunk from 2000-12-28_00-00-00 to 2001-01-04_00-00-00 NOTICE: Archiving chunk _timescaledb_internal._hyper_1_19_chunk from 2014-11-13_00-00-00 to 2014-11-20_00-00-00 NOTICE: Dropped chunk _timescaledb_internal._hyper_1_2_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_3_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_10_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_15_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_16_chunk NOTICE: Dropped chunk _timescaledb_internal._hyper_1_19_chunk ┌───────────────────────────────┐ │ archive_old_chunks_to_parquet │ ├───────────────────────────────┤ │ │ └───────────────────────────────┘ (1 row) Time: 2010.483 ms (00:02.010) postgres=# select build_union_all_view_of_parquet('yellow_tripdata', 'parquet_files', '/tmp/data/'); NOTICE: view "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_casted" created successfully NOTICE: view "yellow_tripdata-2022-10-20_00-00-00-2022-10-27_00-00-00_casted" created successfully NOTICE: view "yellow_tripdata-2000-12-28_00-00-00-2001-01-04_00-00-00_casted" created successfully NOTICE: view "yellow_tripdata-2014-11-13_00-00-00-2014-11-20_00-00-00_casted" created successfully NOTICE: view "yellow_tripdata-2009-01-01_00-00-00-2009-01-08_00-00-00_casted" created successfully NOTICE: view "yellow_tripdata-2008-12-25_00-00-00-2009-01-01_00-00-00_casted" created successfully NOTICE: view yellow_tripdata_union_all created successfully ┌─────────────────────────────────┐ │ build_union_all_view_of_parquet │ ├─────────────────────────────────┤ │ │ └─────────────────────────────────┘ (1 row) Time: 41.749 ms

And now we can query the data from the union view of the main hypertable and the parquet files. If we look at the query plan we can see that the parquet files are being scanned using the parquet_s3_fdw extension. I have also added a filter to the query fare_amount > 10 to show the cast is applied to the foreign table in the chunk view.

postgres=# explain select * from yellow_tripdata_union_all where fare_amount > 10; ┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ Gather (cost=1000.00..238483.41 rows=25 width=132) │ │ Workers Planned: 4 │ │ -> Parallel Append (cost=0.00..237480.91 rows=1532858 width=132) │ │ -> Parallel Seq Scan on _hyper_1_14_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_cast_10" (cost=0.00..18994.08 rows=165870 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_13_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_9" (cost=0.00..18771.45 rows=162835 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_17_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_cast_11" (cost=0.00..18489.20 rows=163258 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_9_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_6" (cost=0.00..18308.26 rows=159538 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_18_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_cast_12" (cost=0.00..18251.88 rows=160205 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_12_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_8" (cost=0.00..18203.77 rows=156810 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_6_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_3" (cost=0.00..17769.67 rows=150593 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_5_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_2" (cost=0.00..17572.09 rows=149272 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_7_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_4" (cost=0.00..17560.17 rows=149374 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_8_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_5" (cost=0.00..17491.49 rows=149213 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_11_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_7" (cost=0.00..17359.49 rows=149691 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_4_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_caste_1" (cost=0.00..16706.45 rows=141223 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_1_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_casted" (cost=0.00..8316.53 rows=89544 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Seq Scan on _hyper_1_20_chunk "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00_cast_13" (cost=0.00..6022.05 rows=65450 width=126) │ │ Filter: (fare_amount > '10'::double precision) │ │ -> Parallel Foreign Scan on "yellow_tripdata-2022-10-20_00-00-00-2022-10-27_00-00-00" (cost=0.00..0.02 rows=1 width=132) │ │ Filter: ((fare_amount)::double precision > '10'::double precision) │ │ Reader: Single File │ │ Row groups: 1 │ │ -> Parallel Foreign Scan on "yellow_tripdata-2008-12-25_00-00-00-2009-01-01_00-00-00" (cost=0.00..0.02 rows=1 width=132) │ │ Filter: ((fare_amount)::double precision > '10'::double precision) │ │ Reader: Single File │ │ Row groups: 1 │ │ -> Parallel Foreign Scan on "yellow_tripdata-2000-12-28_00-00-00-2001-01-04_00-00-00" (cost=0.00..0.01 rows=0 width=132) │ │ Filter: ((fare_amount)::double precision > '10'::double precision) │ │ Reader: Single File │ │ Row groups: 1 │ │ -> Parallel Foreign Scan on "yellow_tripdata-2014-11-13_00-00-00-2014-11-20_00-00-00" (cost=0.00..0.00 rows=0 width=132) │ │ Filter: ((fare_amount)::double precision > '10'::double precision) │ │ Reader: Single File │ │ Row groups: 1 │ │ -> Parallel Foreign Scan on "yellow_tripdata-2009-01-01_00-00-00-2009-01-08_00-00-00" (cost=0.00..0.00 rows=0 width=132) │ │ Filter: ((fare_amount)::double precision > '10'::double precision) │ │ Reader: Single File │ │ Row groups: 1 │ │ JIT: │ │ Functions: 38 │ │ Options: Inlining false, Optimization false, Expressions true, Deforming true │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ (54 rows) Time: 9.410 ms

This looks pretty good! But with one glaring issue. If we apply a time filter to this view, PostgreSQL will happily scan all the tables even when we could theoretically gaurantee that no rows in a particular parquet could match this condition.

Pruning Archived Data

Ideally we could imform the PostgreSQL planner about the time ranges (e.g. the 7-day chunks) for each parquet file. And there should be a way to achieve this using check constraints on the foreign table. Normally check constraints are applied to a single column or row after insert or update to a regular table. In the context of a foreign table, these constraints are not validated but assumed to be correct by the planner, so maybe a check constraint may work like this:

alter foreign table "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00" add constraint check_time_range check (tpep_pickup_datetime::timestamptz >= '2002-12-26 00:00:00' and tpep_pickup_datetime::timestamptz < '2003-01-02 00:00:00');

Unfortunately this approach doesn’t work in our case as the underlying types in the FDW are text, and we need to cast them to timestamptz which is not an immutable cast (because of timezone and date setting complications)! Looking at the PostgreSQL source code in src/backend/optimizer/util/plancat.c we see this comment in the relation_excluded_by_constraints() function which explains why this doesn’t work:

/* * We do not currently enforce that CHECK constraints contain only * immutable functions, so it's necessary to check here. We daren't draw * conclusions from plan-time evaluation of non-immutable functions. Since * they're ANDed, we can just ignore any mutable constraints in the list, * and reason about the rest. */ safe_constraints = NIL; foreach(lc, constraint_pred) { Node *pred = (Node *) lfirst(lc); if (!contain_mutable_functions(pred)) safe_constraints = lappend(safe_constraints, pred); }

During the process of writing this blog I thought of a different solution to this. PostgreSQL can’t exclude the FDW relations because of the mutable casts, but what if there is a way to encode this information in the wrapper view for each parquet file? It turns out there is a way to achieve this by adding dummy conditions to the view, so that PostgreSQL can optimise out scanning of the view due to provably false filters at evaluation time that in means the FDW relations are never scanned at all.

Here is how this looks in one of the parquet wrapper views:

View definition: SELECT vendorid, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, pulocationid, dolocationid, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, archive_timerange FROM ( SELECT "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".vendorid::integer AS vendorid, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".tpep_pickup_datetime::timestamp with time zone AS tpep_pickup_datetime, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".tpep_dropoff_datetime::timestamp with time zone AS tpep_dropoff_datetime, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".passenger_count::integer AS passenger_count, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".trip_distance::double precision AS trip_distance, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".ratecodeid::double precision AS ratecodeid, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".store_and_fwd_flag::character(1) AS store_and_fwd_flag, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".pulocationid::integer AS pulocationid, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".dolocationid::integer AS dolocationid, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".payment_type::integer AS payment_type, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".fare_amount::double precision AS fare_amount, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".extra::double precision AS extra, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".mta_tax::double precision AS mta_tax, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".tip_amount::double precision AS tip_amount, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".tolls_amount::double precision AS tolls_amount, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".improvement_surcharge::double precision AS improvement_surcharge, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".total_amount::double precision AS total_amount, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".congestion_surcharge::double precision AS congestion_surcharge, "yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00".airport_fee::double precision AS airport_fee, int8range('1040860800'::bigint, '1041465600'::bigint) AS archive_timerange FROM parquet_files."yellow_tripdata-2002-12-26_00-00-00-2003-01-02_00-00-00") unnamed_subquery WHERE archive_timerange = int8range('1040860800'::bigint, '1041465600'::bigint);

As you can see I have added a extra column archive_timerange and a wrapper filter defined by the epoch time of the parquet file min and max time ranges. I also add a column to the top level union view which add a archive_timerange to the underlying hypertable view, with range -inf to +inf, as it is only useful for the parquet views but still needs to be present for a valid union/query. I am using PostgreSQL range types here to naturally encode the ranges, which also makes the subsequent filters more pleasant to write than with multiple columns or integer types. If we now add a filter on archive_timerange from the top level union using the range overlap operator &&, PostgreSQL may realise that it can completely exclude scanning this view! This is akin to creating a makeshift Block Range Index (BRIN) which is an index type in PostgreSQL that can (amongst other modes of operation) track the min and max values of a column within a page to potentially exclude reading it. Let’s see if this approach works by applying an example filter to our top level union.

select count(*) from yellow_tripdata_union_all where tpep_pickup_datetime between '2020-01-01' and '2023-02-01' -- the actual filter applied to both hytertable and parquet views and archive_timerange && int8range_from_dates('2020-01-01', '2023-02-01'); -- optional filter that prunes the parquet views

The int8range_from_dates() utility function simply wraps up creating our range constant filter, this will also work with open ranges like ‘2023-01-01’ to ‘infinity’ or ‘-infinity’ to ‘2023-01-01’. The subsequent execution plan is shown below:

QUERY PLAN Aggregate (cost=120712.83..120712.84 rows=1 width=8) -> Append (cost=0.42..113044.16 rows=3067468 width=0) -> Index Only Scan using _hyper_1_1_chunk_yellow_tripdata_tpep_pickup_datetime_idx on _hyper_1_1_chunk (cost=0.42..10564.33 rows=323429 width=0) Index Cond: ((tpep_pickup_datetime >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND (tpep_pickup_datetime <= '2023-02-01 00:00:00+00'::timestamp with time zone)) -> Index Only Scan using _hyper_1_4_chunk_yellow_tripdata_tpep_pickup_datetime_idx on _hyper_1_4_chunk (cost=0.42..21854.86 rows=685088 width=0) Index Cond: ((tpep_pickup_datetime >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND (tpep_pickup_datetime <= '2023-02-01 00:00:00+00'::timestamp with time zone)) -> Index Only Scan using _hyper_1_5_chunk_yellow_tripdata_tpep_pickup_datetime_idx on _hyper_1_5_chunk (cost=0.42..22916.08 rows=719223 width=0) Index Cond: ((tpep_pickup_datetime >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND (tpep_pickup_datetime <= '2023-02-01 00:00:00+00'::timestamp with time zone)) -> Index Only Scan using _hyper_1_6_chunk_yellow_tripdata_tpep_pickup_datetime_idx on _hyper_1_6_chunk (cost=0.42..23028.00 rows=728542 width=0) Index Cond: ((tpep_pickup_datetime >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND (tpep_pickup_datetime <= '2023-02-01 00:00:00+00'::timestamp with time zone)) -> Index Only Scan using _hyper_1_7_chunk_yellow_tripdata_tpep_pickup_datetime_idx on _hyper_1_7_chunk (cost=0.42..19343.43 rows=611185 width=0) Index Cond: ((tpep_pickup_datetime >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND (tpep_pickup_datetime <= '2023-02-01 00:00:00+00'::timestamp with time zone)) -> Subquery Scan on "*SELECT* 6" (cost=0.00..0.11 rows=1 width=0) -> Foreign Scan on "yellow_tripdata-2022-10-20_00-00-00-2022-10-27_00-00-00" (cost=0.00..0.11 rows=0 width=164) Filter: (((tpep_pickup_datetime)::timestamp with time zone >= '2020-01-01 00:00:00+00'::timestamp with time zone) AND ((tpep_pickup_datetime)::timestamp with time zone <= '2023-02-01 00:00:00+00'::timestamp with time zone)) Reader: Single File Row groups: 1

As you can see the query now appropriately prunes the hypertable scan (from 14 partitions to 5) and also the parquet views (from 6 files to 1)! Obviously this adds a little extra burden on the query writer to include a correct range filter, but the benefits are clear particularly with a large amount of archived data.

Conclusions

TimescaleDB is a powerful extension to PostgreSQL that allows for efficient time-series data storage and querying. We’ve shown that is faster and requires much less storage than vanilla PostgreSQL for time-series data, which can save time, money and improve user experience.

By leveraging TimescaleDB and the extensibility of PostgreSQL itself we’ve seen how to archive old data to parquet files and query them using the parquet_s3_fdw extension. Finally we’ve shown how to optimise the query planner to avoid scanning the parquet files when the time range filter can be applied to the hypertable and parquet views.

TimescaleDB has big advantages for developer experience and productivity allowing time-series analysis to performed entirely in SQL and with the full repertoire of the PostgreSQL ecosystem.

Share this article
Published
by P Spence
21/06/2024

Fancy a chat?

Get in touch