Skip to content

taosdata/tsbs-flink-datasource

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

TSBS Flink Test Environment Setup and Usage Guide

1. Java Installation

1.1 Install OpenJDK 11

# Update package list
sudo apt update

# Install OpenJDK 11 (good compatibility with Flink)
sudo apt install openjdk-11-jdk -y

# Verify installation
java -version

1.2 Configure Environment Variables

# Find JDK installation path
JDK_PATH=$(readlink -f /usr/bin/java | sed 's:bin/java::')
echo "JDK installation path: $JDK_PATH"

# Set environment variables (add to ~/.bashrc)
echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64" >> ~/.bashrc
echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> ~/.bashrc

# Apply configuration
source ~/.bashrc

# Verify environment variables
echo $JAVA_HOME

2. Maven Installation

2.1 Install Maven

# Install Maven
sudo apt install maven -y

# Verify installation
mvn -version

2.2 Configure Maven Mirror and Java Version

Edit /etc/maven/settings.xmlfile:

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 
                              http://maven.apache.org/xsd/settings-1.0.0.xsd">
  
  <!-- Configure Alibaba Cloud mirror for faster downloads -->
  <mirrors>
    <mirror>
      <id>aliyunmaven</id>
      <name>Alibaba Cloud Public Repository</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <mirrorOf>*</mirrorOf>
    </mirror>
  </mirrors>

  <!-- Configure Java 11 compilation environment -->
  <profiles>
    <profile>
      <id>jdk-11</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <maven.compiler.compilerVersion>11</maven.compiler.compilerVersion>
      </properties>
    </profile>
  </profiles>
  
  <activeProfiles>
    <activeProfile>jdk-11</activeProfile>
  </activeProfiles>
</settings>

2.3 Test Maven Configuration

# Test if configuration works
mvn help:system

3. Flink Installation

3.1 Download and Install

# Download Flink from Tsinghua mirror
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz

# Verify file integrity
wget https://archive.apache.org/dist/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz.sha512
sha512sum -c flink-1.17.2-bin-scala_2.12.tgz.sha512

# Extract installation package
tar -xzf flink-1.17.2-bin-scala_2.12.tgz

# Create symbolic link (for easier management)
ln -s flink-1.17.2 flink

3.2 Configure Environment Variables

# Add to ~/.bashrc
echo "export FLINK_HOME=\$HOME/flink" >> ~/.bashrc
echo "export PATH=\$PATH:\$FLINK_HOME/bin" >> ~/.bashrc

# Apply configuration
source ~/.bashrc

3.3 Optimize Flink Configuration

Edit $FLINK_HOME/conf/flink-conf.yaml:

# Memory configuration
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m

# Task slot configuration
taskmanager.numberOfTaskSlots: 8
parallelism.default: 4

# Web UI configuration
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0

env.java.opts: "--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/sun.misc=ALL-UNNAMED"
akka.jvm-exit-on-fatal-error: false

4. Start Flink Cluster

4.1 Start Cluster Service

# Start Flink standalone cluster
$FLINK_HOME/bin/start-cluster.sh

# Check process status
jps

# Expected output should include:
# - StandaloneSessionClusterEntrypoint (JobManager)
# - TaskManagerRunner (TaskManager)

4.2 Verify Cluster Status

# Access Web UI (http://localhost:8081)
# Check available task slots count

# Start SQL Client
$FLINK_HOME/bin/sql-client.sh

4.3 Test Data Stream Processing

Execute test SQL in SQL Client:

-- Create test data source
CREATE TABLE page_visits (
    visit_id STRING,
    page_url STRING,
    user_agent STRING,
    `timestamp` TIMESTAMP(3),
    WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',
    'fields.visit_id.kind' = 'random',
    'fields.visit_id.length' = '10',
    'fields.page_url.kind' = 'random',
    'fields.page_url.length' = '5'
);

-- Create result table
CREATE TABLE page_visits_per_minute (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    pv BIGINT
) WITH ('connector' = 'print');

-- Execute window aggregation query
SELECT * FROM page_visits;

5. Compile Custom Data Source Project

Get and Build Project

# Clone project repository
git clone [email protected]:taosdata/tsbs-flink-datasource.git
cd tsbs-flink-datasource

# Compile project
mvn clean package -f package.pom.xml

# Verify build result
ls -la target/tsbs-flink-datasource-1.0-SNAPSHOT.jar

# Test executability
$FLINK_HOME/bin/flink run target/tsbs-flink-datasource-1.0-SNAPSHOT.jar --help

6. Debug Custom Data Source via Flink SQL

Open $FLINK_HOME/bin/sql-client.sh and execute the following SQL statements:

6.1 Register Custom Connector

-- Load custom connector JAR package
ADD JAR '/root/tsbs-flink-datasource/target/tsbs-flink-datasource-1.0-SNAPSHOT.jar';

-- Verify JAR package loading status
SHOW JARS;

6.2 Create Test Tables

-- Create readings data table
CREATE TABLE readings (
    `ts`                         TIMESTAMP(3),
    `latitude`                   DOUBLE,
    `longitude`                  DOUBLE,
    `elevation`                  DOUBLE,
    `velocity`                   DOUBLE,
    `heading`                    DOUBLE,
    `grade`                      DOUBLE,
    `fuel_consumption`           DOUBLE,
    `name`                       STRING,
    `fleet`                      STRING,
    `driver`                     STRING,
    `model`                      STRING,
    `device_version`             STRING,
    `load_capacity`             DOUBLE,
    `fuel_capacity`             DOUBLE,
    `nominal_fuel_consumption`  DOUBLE,
    WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
    'connector' = 'tsbs',
    'data-type' = 'readings',
    'path' = 'file:///root/tsbs-flink-datasource/src/main/resources/data/default_readings.csv'
);

-- Create diagnostics data table
CREATE TABLE diagnostics (
    ts                          TIMESTAMP(3),
    fuel_state                  DOUBLE,
    current_load               DOUBLE,
    status                     BIGINT,
    name                       VARCHAR(30),
    fleet                      VARCHAR(30),
    driver                     VARCHAR(30),
    model                      VARCHAR(30),
    device_version            VARCHAR(30),
    load_capacity             DOUBLE,
    fuel_capacity             DOUBLE,
    nominal_fuel_consumption  DOUBLE,
    WATERMARK FOR ts AS ts - INTERVAL '60' MINUTE
) WITH (
    'connector' = 'tsbs',
    'data-type' = 'diagnostics',
    'path' = 'file:///root/tsbs-flink-datasource/src/main/resources/data/default_diagnostics.csv'
);

6.3 Test data query

SELECT * FROM readings;

7. Execute Automated Testing

7.1 Test Framework Parameter Description

The project supports the following command-line parameters:

Parameter Short Description Default Value
--config -c Test case configuration file path Built-in default config
--data1 -d1 Readings data file path Built-in default data
--data2 -d2 Diagnostics data file path Built-in default data
--log-output -l Log file output path ./tsbs-flink-log.txt
--json-output -j JSON result file output path ./tsbs-flink-result.json
--scenario -s Execute specific test scenario All scenarios
--parallelism -p Flink parallelism level 4
--parallelism-config -pc Parallelism configuration file path Built-in default config
--shared-queue -q Use shared queue mode false
--help -h Show help information -
--version -v Show version information -

7.2 Execute Test Examples

# View help information
$FLINK_HOME/bin/flink run target/tsbs-flink-datasource-1.0-SNAPSHOT.jar --help

# Execute all test scenarios
$FLINK_HOME/bin/flink run target/tsbs-flink-datasource-1.0-SNAPSHOT.jar

# Execute specific test scenario with custom parallelism
$FLINK_HOME/bin/flink run target/tsbs-flink-datasource-1.0-SNAPSHOT.jar --scenario A1 --parallelism 2

# Use custom configuration and data files with separate log and JSON output
$FLINK_HOME/bin/flink run target/tsbs-flink-datasource-1.0-SNAPSHOT.jar \
    --config /path/to/custom_config.yaml \
    --data1 /path/to/readings.csv \
    --data2 /path/to/diagnostics.csv \
    --log-output ./custom-log.txt \
    --json-output ./custom-results.json \
    --parallelism 8 \
    --parallelism-config /path/to/parallelism_config.yaml

7.3 View Test Results

Test results will be output to:

  • Console: Real-time execution logs
  • Log file: Detailed test report with execution logs (default: tsbs-flink-log.txt)
  • JSON file: Structured test results in JSON format (default: tsbs-flink-result.json)

The log file includes execution status, execution time and other detailed information for each scenario, such as:

| Scenario ID | Classification | Out Records   | In Records | Start Time   | End Time     | Duration(ms) | Throughput(rec/s) | Status |
|-------------|----------------|---------------|------------|--------------|--------------|--------------|-------------------|--------|
| A1          | Summary        |             1 |         50 | 14:26:01.015 | 14:26:04.130 |         3115 |             16.05 | Passed |
| A2          | Summary        |             4 |         50 | 14:26:07.137 | 14:26:08.416 |         1279 |             39.09 | Passed |
| A3          | Summary        |             8 |         50 | 14:26:11.417 | 14:26:14.402 |         2985 |             16.75 | Passed |
| A4          | Summary        |             7 |         50 | 14:26:17.403 | 14:26:18.366 |          963 |             51.92 | Passed |
| A5          | Summary        |             5 |         50 | 14:26:21.367 | 14:26:22.316 |          949 |             52.69 | Passed |
| A6          | Summary        |             1 |        100 | 14:26:25.317 | 14:26:26.512 |         1195 |             83.68 | Passed |
| A7          | Summary        |             1 |         50 | 14:26:29.514 | 14:26:30.418 |          904 |             55.31 | Passed |
| A8          | Summary        |             1 |         50 | 14:26:33.419 | 14:26:34.560 |         1141 |             43.82 | Passed |
| A9          | Summary        |             0 |         50 | 14:26:37.561 | 14:26:38.596 |         1035 |             48.31 | Passed |
| F1          | Fleet          |             8 |         50 | 14:26:41.597 | 14:26:42.751 |         1154 |             43.33 | Passed |
| F2          | Fleet          |             8 |         50 | 14:26:45.752 | 14:26:48.487 |         2735 |             18.28 | Passed |
| F3          | Fleet          |             3 |         50 | 14:26:51.488 | 14:26:52.358 |          870 |             57.47 | Passed |
| F4          | Fleet          |             3 |        100 | 14:26:55.359 | 14:26:56.435 |         1076 |             92.94 | Passed |
| F5          | Fleet          |             3 |         50 | 14:26:59.436 | 14:27:00.281 |          845 |             59.17 | Passed |
| F6          | Fleet          |            94 |         50 | 14:27:03.282 | 14:27:04.136 |          854 |             58.55 | Passed |
| F7          | Fleet          |           109 |         50 | 14:27:07.137 | 14:27:08.062 |          925 |             54.05 | Passed |
| F8          | Fleet          |            94 |         50 | 14:27:11.063 | 14:27:12.005 |          942 |             53.08 | Passed |
| T1          | Vehicle        |             4 |         50 | 14:27:15.006 | 14:27:15.893 |          887 |             56.37 | Passed |
| T2          | Vehicle        |             3 |         50 | 14:27:18.893 | 14:27:19.799 |          906 |             55.19 | Passed |
| T3          | Vehicle        |             9 |         50 | 14:27:24.618 | 14:27:25.474 |          856 |             58.41 | Passed |
| T4          | Vehicle        |            11 |         50 | 14:27:28.475 | 14:27:29.368 |          893 |             55.99 | Passed |
| T5          | Vehicle        |             3 |         50 | 14:27:32.369 | 14:27:33.198 |          829 |             60.31 | Passed |
| T6          | Vehicle        |             4 |         50 | 14:27:36.199 | 14:27:37.054 |          855 |             58.48 | Passed |
| T7          | Vehicle        |             6 |        100 | 14:27:40.055 | 14:27:41.111 |         1056 |             94.70 | Passed |
| T8          | Vehicle        |             4 |         50 | 14:27:44.112 | 14:27:44.941 |          829 |             60.31 | Passed |
| T9          | Vehicle        |             1 |         50 | 14:27:47.942 | 14:27:48.785 |          843 |             59.31 | Passed |

The JSON file contains structured test results with the following format (based on actual test execution):

{
  "summary" : {
    "totalCases" : 2,
    "passedCases" : 2,
    "failedCases" : 0,
    "successRate" : "100.0",
    "totalStartTime" : "2025-11-12 14:26:01.004",
    "totalEndTime" : "2025-11-12 14:27:51.785",
    "totalDuration" : 110781,
    "averageDuration" : "1189.27",
    "totalDataRecords" : 1450,
    "overallThroughput" : "13.09",
    "slowestCase" : {
      "scenarioId" : "A1",
      "duration" : 3115
    }
  },
  "results" : [ {
    "scenarioId" : "A1",
    "classification" : "Summary",
    "recordsOutput" : 1,
    "recordsInput" : 50,
    "throughput" : "16.05",
    "startTime" : "2025-11-12 14:26:01.015",
    "endTime" : "2025-11-12 14:26:04.130",
    "duration" : 3115,
    "status" : "Passed"
  }, {
    "scenarioId" : "A2",
    "classification" : "Summary",
    "recordsOutput" : 4,
    "recordsInput" : 50,
    "throughput" : "39.09",
    "startTime" : "2025-11-12 14:26:07.137",
    "endTime" : "2025-11-12 14:26:08.416",
    "duration" : 1279,
    "status" : "Passed"
  } ]
}

About

A ​​Flink TableSource connector​​ for the Time Series Benchmark Suite (TSBS) CSV data

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages