有一种方法可以做到这一点,这只是一种解决方法。诀窍是使用KAFKA
将逻辑删除写入基础主题的值格式。
这是一个使用原始 DDL 的示例。
-- Insert a second row of data
INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (42, 'Life of Brian', 1986);
-- Query table
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE |ID |RELEASE_YEAR |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian |42 |1986 |
|Aliens |48 |1986 |
Limit Reached
Query terminated
现在声明一个新流,它将使用相同的密钥写入相同的 Kafka 主题:
CREATE STREAM MOVIES_DELETED (title VARCHAR KEY, DUMMY VARCHAR)
WITH (KAFKA_TOPIC='movies',
VALUE_FORMAT='KAFKA');
插入墓碑消息:
INSERT INTO MOVIES_DELETED (TITLE,DUMMY) VALUES ('Aliens',CAST(NULL AS VARCHAR));
再次查询表:
ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE |ID |RELEASE_YEAR |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian |42 |1986 |
检查基本主题
ksql> print movies;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/22 11:01:05.966 Z, key: Aliens, value: {"ID":48,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:02:00.194 Z, key: Life of Brian, value: {"ID":42,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:04:52.569 Z, key: Aliens, value: <null>, partition: 0