Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions pipeline/ptinput/funcs/fn_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *reCache) get(p string) (*regexp.Regexp, bool) {
func KVSplitChecking(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
if err := normalizeFuncArgsDeprecated(funcExpr, []string{
"key", "field_split_pattern", "value_split_pattern",
"trim_key", "trim_value", "include_keys", "prefix",
"trim_key", "trim_value", "include_keys", "prefix", "value_delimiters",
}, 1); err != nil {
return runtime.NewRunError(ctx, err.Error(), funcExpr.NamePos)
}
Expand Down Expand Up @@ -134,6 +134,15 @@ func KVSplitChecking(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlErro
funcExpr.Param[6].NodeType), funcExpr.NamePos)
}
}
// value_Delimiters
if funcExpr.Param[7] != nil {
switch funcExpr.Param[7].NodeType { //nolint:exhaustive
case ast.TypeListLiteral, ast.TypeIdentifier:
default:
return runtime.NewRunError(ctx, fmt.Sprintf("param value_delimiters expect ListInitExpr or Identifier, got %s",
funcExpr.Param[7].NodeType), funcExpr.NamePos)
}
}
return nil
}

Expand Down Expand Up @@ -253,7 +262,38 @@ func KVSplit(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
}
}

result := kvSplit(val, includeKeys, fieldSplit, valueSplit, trimKey, trimValue, prefix)
var valueDelimiters []string
if funcExpr.Param[7] != nil {
switch funcExpr.Param[7].NodeType { //nolint:exhaustive
case ast.TypeListLiteral, ast.TypeIdentifier:
v, dt, err := runtime.RunStmt(ctx, funcExpr.Param[7])
if err != nil {
return err
}
if dt != ast.List {
break
}
switch v := v.(type) {
case []any:
for _, k := range v {
if k, ok := k.(string); ok {
valueDelimiters = append(valueDelimiters, k)
}
}
if len(valueDelimiters)%2 != 0 {
return runtime.NewRunError(ctx, fmt.Sprintf("param value_Delimiters expect even number, got %d",
len(valueDelimiters)), funcExpr.NamePos)
}
default:
}

default:
return runtime.NewRunError(ctx, fmt.Sprintf("param value_Delimiters expect ListInitExpr or Identifier, got %s",
funcExpr.Param[7].NodeType), funcExpr.NamePos)
}
}

result := kvSplit(val, includeKeys, fieldSplit, valueSplit, trimKey, trimValue, prefix, valueDelimiters)
if len(result) == 0 {
ctx.Regs.ReturnAppend(false, ast.Bool)
return nil
Expand All @@ -268,7 +308,7 @@ func KVSplit(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
}

func kvSplit(str string, includeKeys []string, fieldSplit, valueSplit *regexp.Regexp,
trimKey, trimValue, prefix string,
trimKey, trimValue, prefix string, valueDelimiters []string,
) map[string]string {
if str == "" {
return nil
Expand All @@ -283,14 +323,19 @@ func kvSplit(str string, includeKeys []string, fieldSplit, valueSplit *regexp.Re
}

ks := map[string]struct{}{}

vd := map[string]string{}
for _, v := range includeKeys {
ks[v] = struct{}{}
}

for i := 0; i < len(valueDelimiters); i += 2 {
vd[valueDelimiters[i]] = valueDelimiters[i+1]
}

result := map[string]string{}
fields := fieldSplit.Split(str, -1)
for _, field := range fields {
separators := fieldSplit.FindAllString(str, -1)
for i, field := range fields {
keyValue := valueSplit.Split(field, 2)

if len(keyValue) == 2 {
Expand All @@ -307,10 +352,39 @@ func kvSplit(str string, includeKeys []string, fieldSplit, valueSplit *regexp.Re
continue
}
}

value := keyValue[1]
if last, ok := vd[string(value[0])]; ok {
j := i
if string(value[0]) == last {
count := strings.Count(value, last)
for count%2 != 0 {
if j+1 >= len(fields) {
break
}
value = value + separators[j] + fields[j+1]
count += strings.Count(fields[j+1], last)
j++
}
} else {
countA, countB := strings.Count(value, string(value[0])), strings.Count(value, last)
for countA > countB {
if j+1 >= len(fields) {
break
}
value = value + separators[j] + fields[j+1]
countA += strings.Count(fields[j+1], string(value[0]))
countB += strings.Count(fields[j+1], last)
j++
}
}
end := strings.LastIndex(value, last)
if end > 0 {
value = value[1:end]
}
}
// trim value
if trimValue != "" {
keyValue[1] = strings.Trim(keyValue[1], trimValue)
value = strings.Trim(value, trimValue)
}

// prefix + key
Expand All @@ -319,7 +393,7 @@ func kvSplit(str string, includeKeys []string, fieldSplit, valueSplit *regexp.Re
}

// append to result
result[keyValue[0]] = keyValue[1]
result[keyValue[0]] = value
}
}
return result
Expand Down
9 changes: 9 additions & 0 deletions pipeline/ptinput/funcs/fn_pt_kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ func TestPtKvsSet(t *testing.T) {
expect interface{}
fail bool
}{
{
name: "key1",
pl: `
if "real_call_time_float" in pt_kvs_keys(fields=false,tags=true) {
}
`,
keyName: "key2",
expect: nil,
},
{
name: "set1",
pl: `
Expand Down
3 changes: 2 additions & 1 deletion pipeline/ptinput/funcs/md/kv_split.en.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### `kv_split()` {#fn-kv_split}

Function prototype: `fn kv_split(key, field_split_pattern = " ", value_split_pattern = "=", trim_key = "", trim_value = "", include_keys = [], prefix = "") -> bool`
Function prototype: `fn kv_split(key, field_split_pattern = " ", value_split_pattern = "=", trim_key = "", trim_value = "", include_keys = [], prefix = "", value_delimiters = []) -> bool`

Function description: extract all key-value pairs from a string

Expand All @@ -13,6 +13,7 @@ Function parameters:
- `trim_key`: delete all the specified characters leading and trailing the extracted key; the default value is ""
- `trim_value`: remove all leading and trailing characters from the extracted value; the default value is ""
- `prefix`: add prefix to all keys
- `value_delimiters`: defines paired delimiters (e.g., ['[', ']']) for handling values wrapped by specified characters; default value is [], do not process any delimiters.

Example:

Expand Down
3 changes: 2 additions & 1 deletion pipeline/ptinput/funcs/md/kv_split.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### `kv_split()` {#fn-kv_split}

函数原型:`fn kv_split(key, field_split_pattern = " ", value_split_pattern = "=", trim_key = "", trim_value = "", include_keys = [], prefix = "") -> bool`
函数原型:`fn kv_split(key, field_split_pattern = " ", value_split_pattern = "=", trim_key = "", trim_value = "", include_keys = [], prefix = "", value_delimiters = []) -> bool`

函数说明:从字符串中提取出所有的键值对

Expand All @@ -13,6 +13,7 @@
- `trim_key`: 删除提取出的 key 的前导和尾随的所有指定的字符;默认值为 `""`
- `trim_value`: 删除提取出的 value 的前导和尾随的所有指定的字符;默认值为 `""`
- `prefix`: 给所有的 key 添加前缀字符串
- `value_delimiters`: 定义成对的分隔符(如 `['[', ']']`),用于处理value被指定字符包裹的情况;默认值为 `[]`,不处理任何分隔符

示例:

Expand Down
34 changes: 32 additions & 2 deletions pipeline/ptinput/funcs/utils_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,48 @@ func WrapFnCall(fn FnCall, paramDesc []*Param) runtime.FuncCall {
// Note that some functions do not take the value of the variable
// corresponding to the parameter, but its name.

vals := make([]any, len(funcExpr.Param))
var vals []any

lenP := len(paramDesc)
varP := false
lenF := len(funcExpr.Param)
paramMap := make(map[string]any, lenP)

if lenF < lenP {
vals = make([]any, lenP)

} else {
vals = make([]any, lenF)
}

if lenP > 0 {
if paramDesc[lenP-1].VariableP {
lenP -= 1
varP = true
}

for i := 0; i < lenP; i++ {
if val, err := getParam(ctx, paramDesc[i], funcExpr.Param[i]); err != nil {
if i < lenF {
input := funcExpr.Param[i]
if val, err := getParam(ctx, paramDesc[i], input); err != nil {
return err
} else if input != nil && input.NodeType == ast.TypeAssignmentExpr {
paramMap[input.AssignmentExpr().LHS.Identifier().String()] = val
} else {
paramMap[paramDesc[i].Name] = val
}
} else {
if val, err := getParam(ctx, paramDesc[i], nil); err != nil {
return err
} else if _, exist := paramMap[paramDesc[i].Name]; !exist {
paramMap[paramDesc[i].Name] = val
}
}
}
for i := 0; i < lenP; i++ {
if param, exist := paramMap[paramDesc[i].Name]; exist {
vals[i] = param
} else if val, err := getParam(ctx, paramDesc[i], nil); err != nil {
return err
} else {
vals[i] = val
Expand Down
Loading