Spring Boot 项目操作 InfluxDB 可以通过官方 Java 客户端实现。由于 InfluxDB 1.x 和 2.x 的客户端 API 差异较大,以下分别提供两个版本的实现方案,以网约车场景中的车辆位置数据为例:
一、InfluxDB 1.x 版本实现
1. 添加依赖
在 pom.xml 中引入 1.x 客户端:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.23</version>
</dependency>
2. 配置 InfluxDB 连接
在 application.yml 中添加配置:
influxdb:
url: http://localhost:8086
username: admin
password: admin123
database: vehicle_db # 数据库名
3. 配置类(创建连接实例)
package com.bwie.config;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class InfluxDB1Config {
@Value("${influxdb.url}")
private String url;
@Value("${influxdb.username}")
private String username;
@Value("${influxdb.password}")
private String password;
@Value("${influxdb.database}")
private String database;
@Bean
public InfluxDB influxDB() {
// 创建连接
InfluxDB influxDB = InfluxDBFactory.connect(url, username, password);
// 设置默认数据库
influxDB.setDatabase(database);
// 开启批量写入(优化性能)
influxDB.enableBatch(
1000, // 批量大小
100, // 刷新间隔(毫秒)
TimeUnit.MILLISECONDS
);
return influxDB;
}
}
4. 实体类(车辆位置)
package com.bwie.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VehiclePositionVo {
private String carId; // 车辆ID(标签)
private Double latitude; // 纬度(字段)
private Double longitude; // 经度(字段)
private Instant timestamp; // 时间戳
}
5. 增删改查实现类
package com.bwie;
import com.bwie.vo.VehiclePositionVo;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@SpringBootTest
public class TestInfluxApp {
@Autowired
private InfluxDB influxDB;
private static final String MEASUREMENT = "vehicle_pos";
@Test
public void testInfluxDB() {
//--1 写入数据
VehiclePositionVo position = new VehiclePositionVo("car1", 39.9, 116.3, Instant.now());
Point point = Point.measurement(MEASUREMENT)
.tag("car_id", position.getCarId()) // 标签(带索引,适合查询)
.addField("lat", position.getLatitude()) // 字段
.addField("lon", position.getLongitude())
.time(position.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS) // 时间戳
.build();
influxDB.write(point);
//--2 查询数据
// InfluxQL查询语句
String query = String.format(
"SELECT * FROM %s WHERE carId = '%s' AND time >= %dms AND time <= %dms",
MEASUREMENT, "car1",
Instant.now().toEpochMilli()-1000000000L,
Instant.now().toEpochMilli()
);
QueryResult result = influxDB.query(new Query(query));
List<VehiclePositionVo> list = result.getResults().get(0).getSeries().get(0).getValues().stream()
.map(values -> {
VehiclePositionVo pos = new VehiclePositionVo();
pos.setCarId("car1");
pos.setLatitude((Double) values.get(1)); // 纬度在第2列
pos.setLongitude((Double) values.get(2)); // 经度在第3列
pos.setTimestamp(Instant.ofEpochMilli(((Number) values.get(0)).longValue())); // 时间戳在第1列
return pos;
})
.collect(Collectors.toList());
for (VehiclePositionVo positionVo : list) {
System.out.println(positionVo);
}
}
}
6. 其他参考代码
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class InfluxDB2Service {
private final InfluxDBClient influxDBClient;
private final String bucket;
private static final String MEASUREMENT = "vehicle_pos"; // 类似表名
public InfluxDB2Service(InfluxDBClient influxDBClient, @Value("${influxdb2.bucket}") String bucket) {
this.influxDBClient = influxDBClient;
this.bucket = bucket;
}
// 新增数据
public void insert(VehiclePosition position) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
Point point = Point.measurement(MEASUREMENT)
.addTag("carId", position.getCarId()) // 标签
.addField("latitude", position.getLatitude()) // 字段
.addField("longitude", position.getLongitude())
.time(position.getTimestamp(), WritePrecision.NS) // 时间戳(纳秒)
.build();
writeApi.writePoint(bucket, point);
}
}
// 批量插入
public void batchInsert(List<VehiclePosition> positions) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
List<Point> points = positions.stream()
.map(pos -> Point.measurement(MEASUREMENT)
.addTag("carId", pos.getCarId())
.addField("latitude", pos.getLatitude())
.addField("longitude", pos.getLongitude())
.time(pos.getTimestamp(), WritePrecision.NS)
.build())
.collect(Collectors.toList());
writeApi.writePoints(bucket, points);
}
}
// 查询数据(使用Flux语法)
public List<VehiclePosition> queryByCarId(String carId, Instant start, Instant end) {
// Flux查询语句
String flux = String.format("""
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "%s" and r.carId == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
""", bucket, start, end, MEASUREMENT, carId);
List<FluxTable> tables = influxDBClient.getQueryApi().query(flux);
return tables.stream()
.flatMap(table -> table.getRecords().stream())
.map(record -> {
VehiclePosition pos = new VehiclePosition();
pos.setCarId(carId);
pos.setLatitude(record.getValueByKey("latitude").toString());
pos.setLongitude(record.getValueByKey("longitude").toString());
pos.setTimestamp(record.getTime());
return pos;
})
.collect(Collectors.toList());
}
// 删除数据
public void deleteByCarId(String carId, Instant start, Instant end) {
influxDBClient.getDeleteApi().delete(
start, end,
String.format("carId=\"%s\"", carId), // 条件
bucket,
influxDBClient.getOrganization()
);
}
}
核心说明
- 数据模型差异:
- 1.x 用
database+measurement组织数据 - 2.x 用
bucket(桶)+measurement组织数据,且必须通过token认证
- 查询语言差异:
- 1.x 使用类 SQL 的 InfluxQL
- 2.x 使用管道式的 Flux 语言,功能更强大但学习成本较高
- 更新操作:
- InfluxDB 不支持直接更新数据,需通过“删除旧数据 + 插入新数据”实现
- 时序数据特性决定了更适合“写后读”,而非频繁修改
- 性能优化:
- 批量写入(
enableBatch)可大幅提升写入性能 - 合理设置标签(Tag)和字段(Field):查询频繁的字段用 Tag(带索引)
根据项目中使用的 InfluxDB 版本选择对应实现,上述代码可直接集成到 Spring Boot 项目中,用于网约车车辆轨迹等时序数据的存储和查询。