ReadFromPubSub
和 WriteToPubsub
方法来确认你的输入和输出设置是否正确:package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
var (
input = flag.String("input", "", "Input Pub/Sub topic of the form \"projects//topics/\".")
output = flag.String("output", "", "Output Pub/Sub topic of the form \"projects//topics/\".")
)
func init() {
flag.Parse()
if *input == "" {
log.Exit(context.Background(), "No input pubsub topic defined. Use --input=")
}
if *output == "" {
log.Exit(context.Background(), "No output pubsub topic defined. Use --output=")
}
}
func main() {
beam.Init()
// Read messages from Pub/Sub.
messages := pubsubio.ReadFromPubSub(context.Background(), *input)
// Log messages to stdout.
beam.ParDo0(context.Background(), log.Printf, messages)
// Write messages back to Pub/Sub.
beam.ParDo0(context.Background(), func(msg []byte) []byte {
return msg
}, messages)
pubsubio.WriteToPubSub(context.Background(), *output, messages)
}
beam.Run
函数来启动管道。 pipelineResult := beam.Run(context.Background(), pipeline)
if err := pipelineResult.Wait(); err != nil {
log.Fatalf(context.Background(), "Failed to execute job: %v", err)
}
以上是两个解决问题的方法,