Skip to content

Commit afd7c72

Browse files
committed
First take
1 parent 8701e5e commit afd7c72

File tree

10 files changed

+906
-26
lines changed

10 files changed

+906
-26
lines changed

‎README.md‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1-
[![Tests on Linux, MacOS and Windows](https://github.com/bep/s3fileprocessor/workflows/Test/badge.svg)](https://github.com/bep/s3fileprocessor/actions?query=workflow:Test)
2-
[![Go Report Card](https://goreportcard.com/badge/github.com/bep/s3fileprocessor)](https://goreportcard.com/report/github.com/bep/s3fileprocessor)
3-
[![GoDoc](https://godoc.org/github.com/bep/s3fileprocessor?status.svg)](https://godoc.org/github.com/bep/s3fileprocessor)
1+
[![Tests on Linux, MacOS and Windows](https://github.com/bep/s3rpc/workflows/Test/badge.svg)](https://github.com/bep/s3rpc/actions?query=workflow:Test)
2+
[![Go Report Card](https://goreportcard.com/badge/github.com/bep/s3rpc)](https://goreportcard.com/report/github.com/bep/s3rpc)
3+
[![GoDoc](https://godoc.org/github.com/bep/s3rpc?status.svg)](https://godoc.org/github.com/bep/s3rpc)
4+
5+
6+
RPC via AWS S3 and SQS.

‎client.go‎

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package s3rpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"os"
8+
"path"
9+
"path/filepath"
10+
"strings"
11+
"time"
12+
13+
"github.com/aws/aws-sdk-go-v2/aws"
14+
"github.com/aws/aws-sdk-go-v2/credentials"
15+
"github.com/aws/aws-sdk-go-v2/service/s3"
16+
"github.com/aws/aws-sdk-go-v2/service/sqs"
17+
18+
"github.com/google/uuid"
19+
"golang.org/x/sync/errgroup"
20+
)
21+
22+
// NewClient creates a new client.
23+
func NewClient(opts ClientOptions) (*Client, error) {
24+
if err := opts.init(); err != nil {
25+
return nil, err
26+
}
27+
28+
awsCfg := aws.Config{
29+
Region: opts.Region,
30+
Credentials: credentials.NewStaticCredentialsProvider(opts.AccessKeyID, opts.SecretAccessKey, ""),
31+
}
32+
33+
if opts.Timeout == 0 {
34+
opts.Timeout = 5 * time.Minute
35+
}
36+
37+
if opts.Infof == nil {
38+
opts.Infof = func(format string, args ...interface{}) {
39+
fmt.Println("client: " + fmt.Sprintf(format, args...))
40+
}
41+
}
42+
43+
tempDir, err := os.MkdirTemp("", "s3rpc_client")
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
return &Client{
49+
timeout: opts.Timeout,
50+
common: &common{
51+
bucket: opts.Bucket,
52+
queue: opts.Queue,
53+
s3Client: s3.NewFromConfig(awsCfg),
54+
sqsClient: sqs.NewFromConfig(awsCfg),
55+
tempDir: tempDir,
56+
infof: opts.Infof,
57+
},
58+
}, nil
59+
60+
}
61+
62+
type Client struct {
63+
timeout time.Duration
64+
*common
65+
}
66+
67+
// ExecuteFilename executes the given op on a server with filename as its input.
68+
// This will block until the response is received or the timeout is reached.
69+
func (c *Client) ExecuteFilename(ctx context.Context, op, filename string) (Output, error) {
70+
id := uuid.New().String()
71+
key := fmt.Sprintf("%s/%s/%s_%s", toServer, op, id, filepath.Base(filename))
72+
73+
// First upload the file to the input folder.
74+
if err := c.upload(filename, key, nil); err != nil {
75+
return Output{}, fmt.Errorf("apply: %v", err)
76+
}
77+
78+
var output Output
79+
80+
// Now, wait for the response from server.
81+
ctx, cancel := context.WithTimeout(ctx, c.timeout)
82+
defer cancel()
83+
84+
g, ctx := errgroup.WithContext(ctx)
85+
g.Go(func() error {
86+
for {
87+
select {
88+
case <-ctx.Done():
89+
return nil
90+
default:
91+
//c.infof("Checking queue %q for new messages", c.queue)
92+
ms, err := c.Receive(ctx)
93+
if err != nil {
94+
return err
95+
}
96+
for _, m := range ms {
97+
if m.Bucket != c.bucket {
98+
return fmt.Errorf("expected bucket %q, got %q", c.bucket, m.Bucket)
99+
}
100+
101+
if !strings.Contains(m.Key, id) {
102+
if err := c.releaseMessage(ctx, m.ReceiptHandle); err != nil {
103+
return err
104+
}
105+
continue
106+
}
107+
108+
// We found the message we are looking for.
109+
// Delete the message from the queue and download the file from S3.
110+
if err := c.deleteMessage(ctx, m.ReceiptHandle); err != nil {
111+
return err
112+
}
113+
114+
return func() error {
115+
f, err := os.CreateTemp(c.tempDir, "*_"+path.Base(m.Key))
116+
if err != nil {
117+
return fmt.Errorf("tempfile: %w", err)
118+
}
119+
output.Filename = f.Name()
120+
defer f.Close()
121+
122+
c.infof("Download %q", m.Key)
123+
metaData, err := c.getObject(ctx, f, m.Key)
124+
if err != nil {
125+
return err
126+
}
127+
output.Metadata = metaData
128+
129+
// We don't need these anymore.
130+
// They will eventually also expire,
131+
// if the below should somehow fail,
132+
// so ignore any error.
133+
_ = c.deleteObject(ctx, m.Key)
134+
_ = c.deleteObject(ctx, key)
135+
return nil
136+
}()
137+
}
138+
}
139+
}
140+
})
141+
142+
if err := g.Wait(); err != nil {
143+
return Output{}, fmt.Errorf("apply: %v", err)
144+
}
145+
146+
return output, nil
147+
148+
}
149+
150+
func (c *Client) Close() error {
151+
return os.RemoveAll(c.tempDir)
152+
}
153+
154+
type ClientOptions struct {
155+
// The out queue to listen for responses from server.
156+
Queue string
157+
158+
// Timeout is the maximum time to wait for a response from the server.
159+
Timeout time.Duration
160+
161+
// Infof logs info messages.
162+
Infof func(format string, args ...interface{})
163+
164+
// The AWS config.
165+
AWSConfig
166+
}
167+
168+
func (opts *ClientOptions) init() error {
169+
if opts.Region == "" {
170+
opts.Region = defaultRegion
171+
}
172+
173+
if opts.AccessKeyID == "" {
174+
return errors.New("access key id is required")
175+
}
176+
177+
if opts.SecretAccessKey == "" {
178+
return errors.New("secret access key is required")
179+
}
180+
181+
if opts.Queue == "" {
182+
return fmt.Errorf("queue is required")
183+
}
184+
185+
return nil
186+
}

0 commit comments

Comments
 (0)